博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Google Protobuf在Netty中的使用
阅读量:7100 次
发布时间:2019-06-28

本文共 14554 字,大约阅读时间需要 48 分钟。

[toc]


Google Protobuf在Netty中的使用

程序代码来自于《Netty权威指南》第8章,已经加了注释,不过需要注意的是,使用的proto源代码是在中生成的,关于protobuf代码自动生成工具的使用可以参考这篇文章。

例子中,通过×××ProtobufVarint32FrameDecoder和编码器ProtobufVarint32LengthFieldPrepender的使用已经解决了半包问题,测试时可以把其注释掉,这样就可以演示Netty中使用Protobuf出现的TCP粘包问题。

同时,通过protobuf的使用,也可以深刻感受到,其在Netty中的使用确实非常简单,编解码、半包问题,只需要添加相关的处理器即可,而且它可以方便地实现跨语言的远程服务调用。(protobuf本身提供了对不同语言的支持)

但其实在使用时会发现有一个问题,就是编解码的对象是需要使用其生成的特定的proto对象来进行操作的,也就是说,需要编写.proto文件,再通过protoc来生成相应语言的代码文件,显然这样做还是会有些麻烦(虽然其实也还好,不算麻烦),有没有方便点的方法呢?后面通过protostuff的使用即可解决这个问题。

服务端

SubReqServer.java

package cn.xpleaf.subscribe;import cn.xpleaf.protobuf.SubscribeReqProto;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class SubReqServer {    public void bind(int port) throws Exception {        // 配置服务端的NIO线程组        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG, 1024)                // 添加日志处理器                .handler(new LoggingHandler(LogLevel.INFO))                .childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加ProtobufVarint32FrameDecoder,主要用于Protobuf的半包处理 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 添加ProtobufDecoder×××,它的参数是com.google.protobuf.MessageLite // 实际上就是要告诉ProtobufDecoder需要解码的目标类是什么,否则仅仅从字节数组中是 // 无法判断出要解码的目标类型信息的(服务端需要解析的是客户端请求,所以是Req) ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance())); /** * 来自源码的代码注释,用于Protobuf的半包处理 * * An encoder that prepends the the Google Protocol Buffers *
Base * 128 Varints integer length field. For example: *
                         * BEFORE ENCODE (300 bytes)       AFTER ENCODE (302 bytes)                         * +---------------+               +--------+---------------+                         * | Protobuf Data |-------------->| Length | Protobuf Data |                         * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |                         * +---------------+               +--------+---------------+                         * 
* */ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 添加ProtobufEncoder编码器,这样就不需要对SubscribeResp进行手工编码 ch.pipeline().addLast(new ProtobufEncoder()); // 添加业务处理handler ch.pipeline().addLast(new SubReqServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0) { try { port = Integer.valueOf(port); } catch (NumberFormatException e) { // TODO: handle exception } } new SubReqServer().bind(port); }}

SubReqServerHandler.java

package cn.xpleaf.subscribe;import cn.xpleaf.protobuf.SubscribeReqProto;import cn.xpleaf.protobuf.SubscribeRespProto;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class SubReqServerHandler extends ChannelInboundHandlerAdapter {    /**     * 由于ProtobufDecoder已经对消息进行了自动解码,因此接收到的订购请求消息可以直接使用     * 对用户名进行校验,校验通过后构造应答消息返回给客户端,由于使用了ProtobufEncoder,     * 所以不需要对SubscribeRespProto.SubscribeResp进行手工编码     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg;        String username = req.getUserName();        if("xpleaf".equalsIgnoreCase(username)) {            System.out.println("Service accept client subscribe req : [" + req.toString() + "]");            ctx.writeAndFlush(resp(req.getSubReqID()));        }    }    /**     * 构建SubscribeRespProto.SubscribeResp对象     * @param subReqID     * @return     */    private SubscribeRespProto.SubscribeResp resp(int subReqID) {        SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();        builder.setSubReqID(subReqID);        builder.setRespCode(0);        builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");        return builder.build();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // 发生异常,关闭链路        ctx.close();    }}

客户端

SubReqClient.java

package cn.xpleaf.subscribe;import cn.xpleaf.protobuf.SubscribeRespProto;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class SubReqClient {    public void connect(String host, int port) throws Exception {        // 配置客户端NIO线程组        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap();            b.group(group).channel(NioSocketChannel.class)                .option(ChannelOption.TCP_NODELAY, true)                // 设置TCP连接超时时间                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)                .handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 添加ProtobufVarint32FrameDecoder,主要用于Protobuf的半包处理 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 添加ProtobufDecoder×××,它的参数是com.google.protobuf.MessageLite // 实际上就是要告诉ProtobufDecoder需要解码的目标类是什么,否则仅仅从字节数组中是 // 无法判断出要解码的目标类型信息的(客户端需要解析的是服务端请求,所以是Resp) ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance())); /** * 来自源码的代码注释,用于Protobuf的半包处理 * * An encoder that prepends the the Google Protocol Buffers *
Base * 128 Varints integer length field. For example: *
                         * BEFORE ENCODE (300 bytes)       AFTER ENCODE (302 bytes)                         * +---------------+               +--------+---------------+                         * | Protobuf Data |-------------->| Length | Protobuf Data |                         * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |                         * +---------------+               +--------+---------------+                         * 
* */ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 添加ProtobufEncoder编码器,这样就不需要对SubscribeResp进行手工编码 ch.pipeline().addLast(new ProtobufEncoder()); // 添加业务处理handler ch.pipeline().addLast(new SubReqClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args != null && args.length > 0) { try { port = Integer.valueOf(port); } catch (NumberFormatException e) { // 采用默认值 } } new SubReqClient().connect("localhost", port); }}

SubReqClientHandler.java

package cn.xpleaf.subscribe;import java.util.ArrayList;import java.util.List;import cn.xpleaf.protobuf.SubscribeReqProto;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class SubReqClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) {        for(int i = 0; i < 10; i++) {            ctx.write(subReq(i));        }        ctx.flush();    }    /**     * 构建SubscribeReqProto.SubscribeReq对象     * @param i     * @return     */    private SubscribeReqProto.SubscribeReq subReq(int i) {        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();        builder.setSubReqID(i);        builder.setUserName("xpleaf");        builder.setProductName("Netty Book For Protobuf");        List
address = new ArrayList<>(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChange"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Service accept server subscribe response : [" + msg + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}

测试

服务端输出如下:

Service accept client subscribe req : [subReqID: 0userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 1userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 2userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 3userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 4userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 5userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 6userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 7userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 8userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 9userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]

客户端输出如下:

Service accept server subscribe response : [subReqID: 0respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 1respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 2respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 3respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 4respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 5respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 6respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 7respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 8respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 9respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]

转载于:https://blog.51cto.com/xpleaf/2071715

你可能感兴趣的文章
spring getbean 方法分析(很实用!)
查看>>
Jquery autocomplete插件
查看>>
《老梁四大名著情商课》笔记- 刚上班,别做林黛玉,也别做孙悟空
查看>>
你真的了解分层架构吗?——写给被PetShop"毒害"的朋友们
查看>>
Asp.net 动态为TreeView创建结点
查看>>
dedecms 系统的 data/rssmap.html不存在!更新了也没有。。。
查看>>
博文共赏:Node.js静态文件服务器实战
查看>>
CS安装卸载测试总结(转)
查看>>
深入理解JavaScript系列(18):面向对象编程之ECMAScript实现(推荐)
查看>>
iphone开发之轻松搞定原生socket 编程,阻塞与非阻塞,收发自如
查看>>
ColdFusion select option 用法,看看哪种适合你的
查看>>
Amazium - 响应式 CSS 框架 - 开源中国
查看>>
使用Vitamio打造自己的Android万能播放器(5)——在线播放(播放优酷视频)
查看>>
iis7 发布mvc 遇到的HTTP错误 403.14-Forbidden Web 服务器被配置为不列出此目录的内容...
查看>>
PHP通过Thrift操作Hbase
查看>>
Sql Server导入Access数据库报不可识别的数据库格式 Microsoft JET Database Engine
查看>>
存储设备形成的层次结构
查看>>
http://knockoutjs.com/工作杂记
查看>>
Http协议中的Header与Body
查看>>
Android项目环境搭建
查看>>