博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty实践
阅读量:4334 次
发布时间:2019-06-07

本文共 72299 字,大约阅读时间需要 240 分钟。

  Netty是JBOSS针对网络开发的一套应用框架,它也是在NIO的基础上发展起来的。netty基于异步的事件驱动,具有高性能、高扩展性等特性,它提供了统一的底层协议接口,使得开发者从底层的网络协议(比如 TCP/IP、UDP)中解脱出来。

  TCP传输面向的是字节流,存在粘包半包问题。Netty提供了三种基本的解码类(显然只有读数据时才会有该问题)来解决粘包拆包问题:LineBasedFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder。其中,前二者所读码流到达指定长度或遇到分隔符时认为结束,若读到的数据大于指定长度则抛TooLongFrameException并忽略之前读到的码流;最后一者每次读固定长度码流。此外,也针对特定协议提供了一些解决该问题的解码类,如ProtobufVarint32FrameDecoder)

  Netty封装得很好,使得使用起来比较容易,按照“套路”填代码即可。给几个示例:

1、示例

Maven依赖:

1         
2
io.netty
3
netty-all
4
5.0.0.Alpha2
5
6 7
8
com.google.protobuf
9
protobuf-java
10
2.5.0
11
View Code

1.1、服务端/客户端

Server:

1 public class SimpleChatServer { 2  3     private static int port = 8080; 4  5     public SimpleChatServer(int port) { 6         this.port = port; 7     } 8  9     public void run() throws Exception {10 11         EventLoopGroup bossGroup = new NioEventLoopGroup();12         EventLoopGroup workerGroup = new NioEventLoopGroup();13         try {14             ServerBootstrap b = new ServerBootstrap();15             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128)16                     .childHandler(new SimpleChatServerInitializer()).childOption(ChannelOption.SO_KEEPALIVE, true);17 18             System.out.println("server 启动了");19 20             // 绑定端口,开始接收进来的连接21             // b.bind(11122);//可以绑定多个端口22             ChannelFuture f = b.bind(port).sync();23 24             // 等待服务器 socket 关闭 。25             // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。26             f.channel().closeFuture().sync();27         } finally {28             workerGroup.shutdownGracefully();29             bossGroup.shutdownGracefully();30 31             System.out.println("server 关闭了");32         }33     }34 35     public static void main(String[] args) throws Exception {36         new SimpleChatServer(port).run();37     }38 }
SimpleChatServer
1 public class SimpleChatServerInitializer extends ChannelInitializer
{ 2 private static final StringDecoder DECODER = new StringDecoder(); 3 private static final StringEncoder ENCODER = new StringEncoder(); 4 5 @Override 6 public void initChannel(SocketChannel ch) throws Exception {
// Pipeline里的Handler是从底层开始向上添加的,故流动方向为后添加的输出给先添加的、或先添加的读入给后添加的 7 ChannelPipeline pipeline = ch.pipeline(); 8 9 // 添加ChannelHandler,顺序是敏感的;名字任意,不冲突即可,也可以不指定名字10 11 // Netty提供了三种解码器解决粘包拆包问题:LineBasedFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder。12 // 其中,前二者所读码流到达指定长度或遇到分隔符时认为结束,若读到的数据大于指定长度则抛TooLongFrameException并忽略之前读到的码流;最后一者每次读固定长度码流。13 // 也可以继承ByteToMessageDecoder自己处理14 pipeline.addLast("FrameDecoder", new ProtobufVarint32FrameDecoder());15 pipeline.addLast("StringDecoder", DECODER);16 17 // 解码只会应用于读数据时、编码只会应用于写数据时,因此解码器与编码器添加的先后顺序在客户端和服务端中可不同,但编码器添加的顺序须桶,解码器亦然。18 pipeline.addLast("StringEncoder", ENCODER);19 20 pipeline.addLast("handler", new SimpleChatServerHandler());21 22 System.out.println("client " + ch.remoteAddress() + " 连接上");23 }24 }
SimpleChatServerInitializer
1 public class SimpleChatServerHandler extends SimpleChannelInboundHandler
{ 2 3 public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 4 5 // 一个客户端连上再断开时,六个事件的触发顺序:加入、(连接上(在SimpleChatServerInitializer中))、在线、异常、掉线、离开 6 @Override 7 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 在ctx加入本Handler时触发,一般在此做初始化工作,如创建buf 8 Channel incoming = ctx.channel(); 9 for (Channel channel : channels) {10 channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");11 }12 System.out.println("client " + incoming.remoteAddress() + " 加入");13 channels.add(ctx.channel());14 }15 16 @Override17 public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 当客户端和服务端建立tcp成功之后,Netty的NIO线程会调用channelActive18 Channel incoming = ctx.channel();19 System.out.println("client " + incoming.remoteAddress() + " 在线");20 }21 22 @Override23 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {24 Channel incoming = ctx.channel();25 System.err.println("client " + incoming.remoteAddress() + " 异常:" + cause.getMessage());26 // 当出现异常就关闭连接27 // cause.printStackTrace();28 ctx.close();29 }30 31 @Override32 public void channelInactive(ChannelHandlerContext ctx) throws Exception {33 Channel incoming = ctx.channel();34 System.out.println("client " + incoming.remoteAddress() + " 掉线");35 }36 37 @Override38 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 从ctx移除本Handler时触发39 Channel incoming = ctx.channel();40 for (Channel channel : channels) {41 channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");42 }43 System.out.println("client " + incoming.remoteAddress() + " 离开");44 channels.remove(ctx.channel());45 }46 47 // 优先级高于messageReceived方法,有了这个方法就会屏蔽messageReceived方法48 // @Override49 // public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {50 // System.out.println("channelRead");51 // Channel incoming = ctx.channel();52 // for (Channel channel : channels) {53 // if (channel != incoming){54 // channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + msg + "\n");55 // } else {56 // channel.writeAndFlush("server: " + msg + "\n");57 // }58 // }59 // }60 61 @Override62 protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {63 Channel incoming = ctx.channel();64 System.out.println("**" + incoming.remoteAddress() + " send: " + msg);65 for (Channel channel : channels) {66 if (channel != incoming) {67 // System.out.println("[" + incoming.remoteAddress() + "] " + msg);68 channel.writeAndFlush("[" + incoming.remoteAddress() + "] " + msg + "\n");69 } else {70 // System.out.println("server: " + msg);71 channel.writeAndFlush("server: " + msg + "\n");72 }73 }74 }75 76 }
SimpleChatServerHandler

Client:

1 public class SimpleChatClient { 2  3     private String host; 4     private int port; 5  6     public SimpleChatClient(String host, int port) { 7         this.host = host; 8         this.port = port; 9     }10 11     public void run() throws Exception {12         EventLoopGroup group = new NioEventLoopGroup();13         try {14             Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class)15                     .handler(new SimpleChatClientInitializer());16             Channel channel = bootstrap.connect(host, port).sync().channel();17 18             Scanner sc = new Scanner(System.in);19             System.out.println("please enter...");20             boolean exit = false;21             // 输入exit,退出系统22             while (!exit) {23                 String str = sc.next();24                 channel.writeAndFlush(str + "\r\n");25                 if (str.equalsIgnoreCase("exit")) {26                     exit = true;27                     channel.close();28                 }29             }30             sc.close();31         } catch (Exception e) {32             e.printStackTrace();33         } finally {34             group.shutdownGracefully();35         }36     }37 38     public static void main(String[] args) throws Exception {39         new SimpleChatClient("localhost", 8080).run();40     }41 }
SimpleChatClient
1 public class SimpleChatClientInitializer extends ChannelInitializer
{ 2 private static final StringDecoder DECODER = new StringDecoder(); 3 private static final StringEncoder ENCODER = new StringEncoder(); 4 5 @Override 6 public void initChannel(SocketChannel ch) throws Exception {
// Pipeline里的Handler是从底层开始向上添加的,故流动方向为后添加的输出给先添加的、或先添加的读入给后添加的 7 ChannelPipeline pipeline = ch.pipeline(); 8 9 // 添加ChannelHandler,顺序是敏感的;名字任意,不冲突即可,也可以不指定名字10 11 // Netty提供了三种解码器解决粘包拆包问题:LineBasedFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder。12 // 其中,前二者所读码流到达指定长度或遇到分隔符时认为结束,若读到的数据大于指定长度则抛TooLongFrameException并忽略之前读到的码流;最后一者每次读固定长度码流。13 // 也可以继承ByteToMessageDecoder自己处理14 pipeline.addLast("FrameDecoder", new ProtobufVarint32FrameDecoder());15 pipeline.addLast("StringDecoder", DECODER);16 17 // 解码只会应用于读数据时、编码只会应用于写数据时,因此解码器与编码器添加的先后顺序在客户端和服务端中可不同,但编码器添加的顺序须桶,解码器亦然。18 pipeline.addLast("StringEncoder", ENCODER);19 20 pipeline.addLast("handler", new SimpleChatClientHandler());21 }22 }
SimpleChatClientInitializer
1 public class SimpleChatClientHandler extends SimpleChannelInboundHandler
{ 2 3 // 优先级高于messageReceived方法,有了这个方法就会屏蔽messageReceived方法 4 // @Override 5 // public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 6 // System.out.println(msg.toString()); 7 // } 8 9 @Override10 protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {11 System.out.println(msg);12 }13 }
SimpleChatClientHandler

 

以下几个是官方示例():

1.2、DiscardServer

服务端接收客户端的消息,不返回任何信息:

1 class DiscardServerHandler extends ChannelHandlerAdapter { // (1) 2  3     // @Override 4     // public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) 5     // // 以静默方式丢弃接收的数据 6     // ((ByteBuf) msg).release(); // (3)ByteBuf属于引用计数的对象,必须通过release()方法显式释放。 7     // } 8  9     @Override10     public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)11         ByteBuf in = (ByteBuf) msg;12         try {13             System.out.println(in.toString(Charset.defaultCharset()));14             while (in.isReadable()) { // (1)15                 System.out.print((char) in.readByte());16                 System.out.flush();17             } // 这个低效的循环可以简化为 System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII));18         } finally {19             ReferenceCountUtil.release(msg);// (2) // 或者写为 in.release();20         }21         // 或者直接打印22         System.out.println("Yes, A new client in = " + ctx.name());23     }24 25     @Override26     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)27         // 出现异常时关闭连接。28         cause.printStackTrace();29         ctx.close();30     }31 }32 33 public class DiscardServer {
// 可以用telnet连接并输入进行测试,客户端没有任何输出34 private int port;35 36 public DiscardServer(int port37 38 ) {39 this.port = port;40 }41 42 public void run() throws Exception {43 EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)NioEventLoopGroup是处理I/O操作的多线程事件循环。第一个通常称为“boss”,接受传入连接。 第二个通常称为“worker”,当“boss”接受连接并且向“worker”注册接受连接,则“worker”处理所接受连接的流量。44 // 使用多少个线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。45 EventLoopGroup workerGroup = new NioEventLoopGroup();46 try {47 ServerBootstrap b = new ServerBootstrap(); // (2)48 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)该类用于实例化新的通道以接受传入连接49 .childHandler(new ChannelInitializer
() { // (4)ChannelInitializer是一个特殊的处理程序,用于帮助用户配置新的通道。 很可能要通过添加一些处理程序。随着应用程序变得复杂,可能会向管道中添加更多处理程序50 @Override51 public void initChannel(SocketChannel ch) throws Exception {52 ch.pipeline().addLast(new DiscardServerHandler());// 每次有新连接都会创建一个新的DiscardServerHandler处理之,,而不是只用一个来处理所有的53 }54 }).option(ChannelOption.SO_BACKLOG, 128) // (5)指定Channel实现的参数55 .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)56 57 System.out.println("server 启动了");58 59 // Bind and start to accept incoming connections.60 // b.bind(11122);//可以绑定多个端口61 ChannelFuture f = b.bind(port).sync(); // (7)62 63 // Wait until the server socket is closed.64 // In this example, this does not happen, but you can do that to gracefully65 // shut down your server.66 f.channel().closeFuture().sync();67 } finally {68 workerGroup.shutdownGracefully();69 bossGroup.shutdownGracefully();70 71 System.out.println("server 关闭了");72 }73 }74 75 public static void main(String[] args) throws Exception {76 int port;77 if (args.length > 0) {78 port = Integer.parseInt(args[0]);79 } else {80 port = 8080;81 }82 new DiscardServer(port).run();// 可以用telnet连接并输入进行测试,客户端没有任何输出83 }84 }
DiscardServer

1.3、TimeServer_StremBased

时间服务器:服务端收到客户端连接就往客户端发生时间,发完后就关闭连接;客户端连接上服务端,收到消息后就关闭连接,读的一方需要处理 粘包拆包 问题

1 //http://www.yiibai.com/netty/netty-time-server.htmlclass  2 /**  3  * 时间服务器
4 * 服务端收到客户端连接就往客户端发生时间,发完后就关闭连接;客户端连接上服务端,收到消息后就关闭连接。
5 * 只有读者会有粘包拆包的问题,所以这里只有客户端可能有该问题。 6 */ 7 class TimeServerHandler_StreamBased extends ChannelHandlerAdapter { 8 // 因为时间服务器将忽略任何接收到的数据,但是一旦建立连接就发送消息,所以我们不能使用channelRead()方法。而是覆盖channelActive()方法。 9 @Override 10 public void channelActive(final ChannelHandlerContext ctx) { // (1) 11 final ByteBuf time = ctx.alloc().buffer(4); // (2) 12 13 // 在Java NIO中发送消息之前需调用java.nio.ByteBuffer.flip(),但Netty ByteBuf没有这样的方法,它只有两个指针;一个用于读取操作,另一个用于写入操作。当您向ByteBuf写入内容时,写入索引会增加,而读取器索引不会更改。读取器索引和写入器索引分别表示消息的开始和结束位置。 14 time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); 15 16 // ChannelHandlerContext.write()(和writeAndFlush())方法返回一个ChannelFuture。 17 // ChannelFuture表示尚未发生的I/O操作。这意味着,任何请求的操作可能尚未执行,因为所有操作在Netty中是异步的。因此,需要在ChannelFuture完成后调用close()方法。注意,close()也可能不会立即关闭连接,并返回一个ChannelFuture。 18 final ChannelFuture f = ctx.writeAndFlush(time); // (3) 19 20 // 当写请求完成时,我们如何得到通知?添加监听器 21 f.addListener(new ChannelFutureListener() { 22 @Override 23 public void operationComplete(ChannelFuture future) { 24 assert f == future; 25 ctx.close();// 发完就关闭连接 26 } 27 }); // (4)// 可以使用预定义的监听器来简化代码:f.addListener(ChannelFutureListener.CLOSE); 28 } 29 30 @Override 31 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 32 cause.printStackTrace(); 33 ctx.close(); 34 } 35 } 36 37 public class TimeServer_StreamBased { 38 private int port; 39 40 public TimeServer_StreamBased(int port) { 41 this.port = port; 42 } 43 44 public void run() throws Exception { 45 EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)NioEventLoopGroup是处理I/O操作的多线程事件循环。第一个通常称为“boss”,接受传入连接。 第二个通常称为“worker”,当“boss”接受连接并且向“worker”注册接受连接,则“worker”处理所接受连接的流量。 46 // 使用多少个线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。 47 EventLoopGroup workerGroup = new NioEventLoopGroup(); 48 try { 49 ServerBootstrap b = new ServerBootstrap(); // (2) 50 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)该类用于实例化新的通道以接受传入连接 51 .childHandler(new ChannelInitializer
() { // (4)ChannelInitializer是一个特殊的处理程序,用于帮助用户配置新的通道。 很可能要通过添加一些处理程序。随着应用程序变得复杂,可能会向管道中添加更多处理程序 52 @Override 53 public void initChannel(SocketChannel ch) throws Exception { 54 ch.pipeline().addLast(new TimeServerHandler_StreamBased());// 每次有新连接就创建一个新的该handler来处理,而不是只用一个来处理所有的 55 } 56 }).option(ChannelOption.SO_BACKLOG, 128) // (5)指定Channel实现的参数 57 .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) 58 59 System.out.println("server 启动了"); 60 61 // Bind and start to accept incoming connections. 62 // b.bind(11122);//可以绑定多个端口 63 ChannelFuture f = b.bind(port).sync(); // (7) 64 65 // Wait until the server socket is closed. 66 // In this example, this does not happen, but you can do that to gracefully 67 // shut down your server. 68 f.channel().closeFuture().sync(); 69 } finally { 70 workerGroup.shutdownGracefully(); 71 bossGroup.shutdownGracefully(); 72 73 System.out.println("server 关闭了"); 74 } 75 } 76 77 public static void main(String[] args) throws Exception { 78 int port = 8080; 79 80 new TimeServer_StreamBased(port).run();// 可以用telnet连接并输入进行测试,客户端输出二进制的32位整数,所以为乱码 81 } 82 } 83 84 class TimeClient_StreamBased { 85 public static void main(String[] args) throws Exception { 86 String host = "localhost"; 87 int port = 8080; 88 EventLoopGroup workerGroup = new NioEventLoopGroup(); 89 90 try { 91 Bootstrap b = new Bootstrap(); // (1)Bootstrap与ServerBootstrap类似,只是它用于非服务器通道,例如客户端或无连接通道。 92 b.group(workerGroup); // (2)如果只指定一个EventLoopGroup,它将同时用作boss组和worker组。boss组和worker组不是用于客户端。 93 b.channel(NioSocketChannel.class); // (3)不使用NioServerSocketChannel,而是使用NioSocketChannel来创建客户端通道。 94 b.option(ChannelOption.SO_KEEPALIVE, true); // (4)这里不像我们使用的ServerBootstrap,所以不使用childOption(),因为客户端SocketChannel没有父类。 95 b.handler(new ChannelInitializer
() { 96 @Override 97 public void initChannel(SocketChannel ch) throws Exception { 98 // 以下两种方式都可以解决读取粘包拆包的问题 99 ch.pipeline().addLast(new TimeClientHandler_1_TimeDecoder(), new TimeClientHandler_1_withproblem());100 // ch.pipeline().addLast(new TimeClientHandler_2());101 }102 });103 104 // Start the client.105 ChannelFuture f = b.connect(host, port).sync(); // (5)应该调用connect()方法,而不是bind()方法106 107 // Wait until the connection is closed.108 f.channel().closeFuture().sync();109 } finally {110 workerGroup.shutdownGracefully();111 }112 }113 }114 115 /**116 * 此Handler单独使用的话存在粘包拆包的问题117 */118 class TimeClientHandler_1_withproblem extends ChannelHandlerAdapter {119 @Override120 public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 粘包拆包问题,在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中。不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列。这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节。121 // 因此,不能保证读的是输入的完整。不过,由于返回的是32位int,数据量小,所以这里很少可能发生此问题。122 ByteBuf m = (ByteBuf) msg; // (1)123 try {124 long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;// 每次读取到m的字节数不一定是4,所以存在粘包拆包的问题125 Date currentTime = new Date(currentTimeMillis);126 System.out.println("Default Date Format:" + currentTime.toString());127 128 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");129 String dateString = formatter.format(currentTime);130 // 转换一下成中国人的时间格式131 System.out.println("Date Format:" + dateString);132 133 ctx.close();134 } finally {135 m.release();136 }137 }138 139 @Override140 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {141 cause.printStackTrace();142 ctx.close();143 }144 }145 146 /**147 * 与TimeClientHandler_1_withproblem搭配使用,解决粘包拆包问题,且易扩展
148 *
149 * 在客户端的ClientHandler之前加上一层,使得字节够了才往ClientHandler传,以解决粘包拆包的问题150 */151 class TimeClientHandler_1_TimeDecoder extends ByteToMessageDecoder { // (1)ByteToMessageDecoder是ChannelHandlerAdapter的一个实现,它使得处理碎片问题变得容易152 @Override153 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
out) { // (2)154 if (in.readableBytes() < 4) {155 return; // (3)156 }157 out.add(in.readBytes(4)); // (4)158 }159 }160 161 /**162 * 此的Handler能解决读取粘包拆包问题,但可扩展性差
163 *
164 * 在客户端的ClientHandler里解决粘包拆包问题:字节够了才读165 */166 class TimeClientHandler_2 extends ChannelHandlerAdapter {167 private ByteBuf buf;168 169 @Override170 public void handlerAdded(ChannelHandlerContext ctx) {171 buf = ctx.alloc().buffer(4); // (1)初始化172 }173 174 @Override175 public void handlerRemoved(ChannelHandlerContext ctx) {176 buf.release(); // (1)初始化的释放177 buf = null;178 }179 180 @Override181 public void channelRead(ChannelHandlerContext ctx, Object msg) {182 ByteBuf m = (ByteBuf) msg;183 buf.writeBytes(m); // (2)184 m.release();185 186 if (buf.readableBytes() >= 4) { // (3)187 long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;188 System.out.println(new Date(currentTimeMillis));189 ctx.close();190 }191 }192 193 @Override194 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {195 cause.printStackTrace();196 ctx.close();197 }198 }
TimeServer_StreamBased

1.4、TimeServer_POJOBased

时间服务器:同上,但通过POJO来通信,同样,读的一方需要处理 粘包拆包 的问题

1 /**  2  * 时间服务器
3 * 服务端收到客户端连接就往客户端发送时间,发完后就关闭连接;客户端连接上服务端,收到消息后就关闭连接。
4 * 由于只有服务端往客户端发数据,所以服务端只要encode、客户端只要decode 5 */ 6 7 class UnixTime { 8 private final long value; 9 10 public UnixTime() { 11 this(System.currentTimeMillis() / 1000L + 2208988800L); 12 } 13 14 public UnixTime(long value) { 15 this.value = value; 16 } 17 18 public long value() { 19 return value; 20 } 21 22 @Override 23 public String toString() { 24 Date date = new Date((value() - 2208988800L) * 1000L); 25 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 26 String dateString = formatter.format(date); 27 return dateString; 28 } 29 } 30 31 class TimeEncoder_POJOBased extends MessageToByteEncoder
{ 32 @Override 33 protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { 34 out.writeInt((int) msg.value()); 35 } 36 } 37 38 class TimeServerHandler_POJOBased extends ChannelHandlerAdapter { 39 @Override 40 public void channelActive(ChannelHandlerContext ctx) {
// 当客户端和服务端建立tcp成功之后,Netty的NIO线程会调用channelActive 41 ChannelFuture f = ctx.writeAndFlush(new UnixTime()); 42 f.addListener(ChannelFutureListener.CLOSE); 43 } 44 45 @Override 46 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 在ctx加入本Handler时触发,一般在此做初始化工作,如创建buf 47 Channel incoming = ctx.channel(); 48 System.out.println("client " + incoming.remoteAddress() + " 加入"); 49 } 50 51 @Override 52 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 从ctx移除本Handler时触发 53 Channel incoming = ctx.channel(); 54 System.out.println("client " + incoming.remoteAddress() + " 离开"); 55 } 56 57 @Override 58 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 59 cause.printStackTrace(); 60 ctx.close(); 61 } 62 } 63 64 public class TimeServer_POJOBased { 65 private int port; 66 67 public TimeServer_POJOBased(int port) { 68 this.port = port; 69 } 70 71 public void run() throws Exception { 72 // NioEventLoopGroup类是个线程组,包含一组NIO线程,用于网络事件的处理(实际上它就是Reactor线程组)。 创建的2个线程组,1个用于服务端接收客户端的连接,另一个用于SocketChannel的 网络读写 73 EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 74 EventLoopGroup workerGroup = new NioEventLoopGroup(); 75 try { 76 // ServerBootstrap类是启动NIO服务器的辅助启动类 77 ServerBootstrap b = new ServerBootstrap(); // (2) 78 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3) 79 .option(ChannelOption.SO_BACKLOG, 128).childHandler(new ChannelInitializer
() { // (4) 80 @Override 81 public void initChannel(SocketChannel ch) throws Exception { 82 // Pipeline里的Handler是从底层开始向上叠加的,即后者输出给前者、或前者读出的给后者 83 // 由于只有服务端往客户端发数据,所以服务端只要encode、客户端只要decode 84 ch.pipeline().addLast(new TimeEncoder_POJOBased(), new TimeServerHandler_POJOBased()); 85 } 86 })// (5) 87 .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) 88 89 System.out.println("server 启动了"); 90 91 // 绑定端口,开始接收进来的连接 92 // b.bind(11122);//可以绑定多个端口 93 ChannelFuture f = b.bind(port).sync(); // (7) 94 95 // 等待服务器 socket 关闭 。 96 // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。 97 f.channel().closeFuture().sync(); 98 } finally { 99 workerGroup.shutdownGracefully();100 bossGroup.shutdownGracefully();101 System.out.println("server 关闭了");102 }103 }104 105 public static void main(String[] args) throws Exception {106 int port;107 if (args.length > 0) {108 port = Integer.parseInt(args[0]);109 } else {110 port = 8080;111 }112 new TimeServer_POJOBased(port).run();113 }114 }115 116 class TimeClient_POJOBased {117 118 public static void main(String[] args) throws Exception {119 120 String host = "127.0.0.1";121 int port = 8080;122 EventLoopGroup workerGroup = new NioEventLoopGroup();123 124 try {125 Bootstrap b = new Bootstrap(); // (1)126 b.group(workerGroup); // (2)127 b.channel(NioSocketChannel.class); // (3)128 b.option(ChannelOption.SO_KEEPALIVE, true); // (4)129 b.handler(new ChannelInitializer
() {130 @Override131 public void initChannel(SocketChannel ch) throws Exception {132 // Pipeline里的Handler是从底层开始向上添加的,故流动方向为后添加的输出给先添加的、或先添加的读入给后添加的133 // 由于只有服务端往客户端发数据,所以服务端只要encode、客户端只要decode134 ch.pipeline().addLast(new TimeDecoder_POJOBased(), new TimeClientHandler_POJOBased());135 }136 });137 138 // 启动客户端139 ChannelFuture f = b.connect(host, port).sync(); // (5)140 141 // 等待连接关闭142 f.channel().closeFuture().sync();143 } finally {144 workerGroup.shutdownGracefully();145 }146 }147 }148 149 class TimeDecoder_POJOBased extends ByteToMessageDecoder {150 @Override151 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
out) throws Exception {152 // TODO Auto-generated method stub153 if (in.readableBytes() < 4) {
// 只有读者存在粘包半包问题,这里不少于4个字节时才处理,以避免该问题154 return;155 }156 out.add(new UnixTime(in.readUnsignedInt()));157 }158 }159 160 class TimeClientHandler_POJOBased extends ChannelHandlerAdapter {161 // @Override162 public void channelRead(ChannelHandlerContext ctx, Object msg) {163 UnixTime m = (UnixTime) msg;164 System.out.println(m);165 ctx.close();166 }167 168 @Override169 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {170 cause.printStackTrace();171 ctx.close();172 }173 }
TimeServer_POJOBased

1.5、结合Protobuf

  (就是把自动生成的类当成一个JavaBean来用,但是该JavaBean比普通的Bean更强大,如包含了与Protobuf格式间进行序列化/反序列化的方法

Netty提供了与Protobuf相关的 解决粘包半包问题的编解码器(ByteBuf与ByteBuf间) 以及 ByteBuf与自定义Protobuf类间的编解码器 :

  • ProtobufVarint32FrameDecoder()
  • ProtobufVarint32LengthFieldPrepender()
  • ProtobufDecoder(Custom Protobuf Class)
  • ProtobufEncoder()

示例(需要添加Protobuf依赖,):

1 public final class PersonProbuf {  2     private PersonProbuf() {  3     }  4   5     public static void registerAllExtensions(com.google.protobuf.ExtensionRegistry registry) {  6     }  7   8     public interface PersonOrBuilder extends com.google.protobuf.MessageOrBuilder {  9  10         // optional int64 id = 1; 11         /** 12          * optional int64 id = 1; 13          * 14          * 
 15          *可选的字段,为64位整数.. 16          * 
17 */ 18 boolean hasId(); 19 20 /** 21 * optional int64 id = 1; 22 * 23 *
 24          *可选的字段,为64位整数.. 25          * 
26 */ 27 long getId(); 28 29 // optional string name = 2; 30 /** 31 * optional string name = 2; 32 */ 33 boolean hasName(); 34 35 /** 36 * optional string name = 2; 37 */ 38 java.lang.String getName(); 39 40 /** 41 * optional string name = 2; 42 */ 43 com.google.protobuf.ByteString getNameBytes(); 44 45 // optional string sex = 3; 46 /** 47 * optional string sex = 3; 48 */ 49 boolean hasSex(); 50 51 /** 52 * optional string sex = 3; 53 */ 54 java.lang.String getSex(); 55 56 /** 57 * optional string sex = 3; 58 */ 59 com.google.protobuf.ByteString getSexBytes(); 60 61 // optional string tel = 4; 62 /** 63 * optional string tel = 4; 64 */ 65 boolean hasTel(); 66 67 /** 68 * optional string tel = 4; 69 */ 70 java.lang.String getTel(); 71 72 /** 73 * optional string tel = 4; 74 */ 75 com.google.protobuf.ByteString getTelBytes(); 76 } 77 78 /** 79 * Protobuf type {
@code Person} 80 */ 81 public static final class Person extends com.google.protobuf.GeneratedMessage implements PersonOrBuilder { 82 // Use Person.newBuilder() to construct. 83 private Person(com.google.protobuf.GeneratedMessage.Builder
builder) { 84 super(builder); 85 this.unknownFields = builder.getUnknownFields(); 86 } 87 88 private Person(boolean noInit) { 89 this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); 90 } 91 92 private static final Person defaultInstance; 93 94 public static Person getDefaultInstance() { 95 return defaultInstance; 96 } 97 98 public Person getDefaultInstanceForType() { 99 return defaultInstance;100 }101 102 private final com.google.protobuf.UnknownFieldSet unknownFields;103 104 @java.lang.Override105 public final com.google.protobuf.UnknownFieldSet getUnknownFields() {106 return this.unknownFields;107 }108 109 private Person(com.google.protobuf.CodedInputStream input,110 com.google.protobuf.ExtensionRegistryLite extensionRegistry)111 throws com.google.protobuf.InvalidProtocolBufferException {112 initFields();113 int mutable_bitField0_ = 0;114 com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet115 .newBuilder();116 try {117 boolean done = false;118 while (!done) {119 int tag = input.readTag();120 switch (tag) {121 case 0:122 done = true;123 break;124 default: {125 if (!parseUnknownField(input, unknownFields, extensionRegistry, tag)) {126 done = true;127 }128 break;129 }130 case 8: {131 bitField0_ |= 0x00000001;132 id_ = input.readInt64();133 break;134 }135 case 18: {136 bitField0_ |= 0x00000002;137 name_ = input.readBytes();138 break;139 }140 case 26: {141 bitField0_ |= 0x00000004;142 sex_ = input.readBytes();143 break;144 }145 case 34: {146 bitField0_ |= 0x00000008;147 tel_ = input.readBytes();148 break;149 }150 }151 }152 } catch (com.google.protobuf.InvalidProtocolBufferException e) {153 throw e.setUnfinishedMessage(this);154 } catch (java.io.IOException e) {155 throw new com.google.protobuf.InvalidProtocolBufferException(e.getMessage()).setUnfinishedMessage(this);156 } finally {157 this.unknownFields = unknownFields.build();158 makeExtensionsImmutable();159 }160 }161 162 public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() {163 return PersonProbuf.internal_static_Person_descriptor;164 }165 166 protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() {167 return PersonProbuf.internal_static_Person_fieldAccessorTable168 .ensureFieldAccessorsInitialized(PersonProbuf.Person.class, PersonProbuf.Person.Builder.class);169 }170 171 public static com.google.protobuf.Parser
PARSER = new com.google.protobuf.AbstractParser
() {172 public Person parsePartialFrom(com.google.protobuf.CodedInputStream input,173 com.google.protobuf.ExtensionRegistryLite extensionRegistry)174 throws com.google.protobuf.InvalidProtocolBufferException {175 return new Person(input, extensionRegistry);176 }177 };178 179 @java.lang.Override180 public com.google.protobuf.Parser
getParserForType() {181 return PARSER;182 }183 184 private int bitField0_;185 // optional int64 id = 1;186 public static final int ID_FIELD_NUMBER = 1;187 private long id_;188 189 /**190 *
optional int64 id = 1;191 *192 *
193          *可选的字段,为64位整数..194          * 
195 */196 public boolean hasId() {197 return ((bitField0_ & 0x00000001) == 0x00000001);198 }199 200 /**201 *
optional int64 id = 1;202 *203 *
204          *可选的字段,为64位整数..205          * 
206 */207 public long getId() {208 return id_;209 }210 211 // optional string name = 2;212 public static final int NAME_FIELD_NUMBER = 2;213 private java.lang.Object name_;214 215 /**216 *
optional string name = 2;217 */218 public boolean hasName() {219 return ((bitField0_ & 0x00000002) == 0x00000002);220 }221 222 /**223 *
optional string name = 2;224 */225 public java.lang.String getName() {226 java.lang.Object ref = name_;227 if (ref instanceof java.lang.String) {228 return (java.lang.String) ref;229 } else {230 com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref;231 java.lang.String s = bs.toStringUtf8();232 if (bs.isValidUtf8()) {233 name_ = s;234 }235 return s;236 }237 }238 239 /**240 *
optional string name = 2;241 */242 public com.google.protobuf.ByteString getNameBytes() {243 java.lang.Object ref = name_;244 if (ref instanceof java.lang.String) {245 com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);246 name_ = b;247 return b;248 } else {249 return (com.google.protobuf.ByteString) ref;250 }251 }252 253 // optional string sex = 3;254 public static final int SEX_FIELD_NUMBER = 3;255 private java.lang.Object sex_;256 257 /**258 *
optional string sex = 3;259 */260 public boolean hasSex() {261 return ((bitField0_ & 0x00000004) == 0x00000004);262 }263 264 /**265 *
optional string sex = 3;266 */267 public java.lang.String getSex() {268 java.lang.Object ref = sex_;269 if (ref instanceof java.lang.String) {270 return (java.lang.String) ref;271 } else {272 com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref;273 java.lang.String s = bs.toStringUtf8();274 if (bs.isValidUtf8()) {275 sex_ = s;276 }277 return s;278 }279 }280 281 /**282 *
optional string sex = 3;283 */284 public com.google.protobuf.ByteString getSexBytes() {285 java.lang.Object ref = sex_;286 if (ref instanceof java.lang.String) {287 com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);288 sex_ = b;289 return b;290 } else {291 return (com.google.protobuf.ByteString) ref;292 }293 }294 295 // optional string tel = 4;296 public static final int TEL_FIELD_NUMBER = 4;297 private java.lang.Object tel_;298 299 /**300 *
optional string tel = 4;301 */302 public boolean hasTel() {303 return ((bitField0_ & 0x00000008) == 0x00000008);304 }305 306 /**307 *
optional string tel = 4;308 */309 public java.lang.String getTel() {310 java.lang.Object ref = tel_;311 if (ref instanceof java.lang.String) {312 return (java.lang.String) ref;313 } else {314 com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref;315 java.lang.String s = bs.toStringUtf8();316 if (bs.isValidUtf8()) {317 tel_ = s;318 }319 return s;320 }321 }322 323 /**324 *
optional string tel = 4;325 */326 public com.google.protobuf.ByteString getTelBytes() {327 java.lang.Object ref = tel_;328 if (ref instanceof java.lang.String) {329 com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);330 tel_ = b;331 return b;332 } else {333 return (com.google.protobuf.ByteString) ref;334 }335 }336 337 private void initFields() {338 id_ = 0L;339 name_ = "";340 sex_ = "";341 tel_ = "";342 }343 344 private byte memoizedIsInitialized = -1;345 346 public final boolean isInitialized() {347 byte isInitialized = memoizedIsInitialized;348 if (isInitialized != -1)349 return isInitialized == 1;350 351 memoizedIsInitialized = 1;352 return true;353 }354 355 public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException {356 getSerializedSize();357 if (((bitField0_ & 0x00000001) == 0x00000001)) {358 output.writeInt64(1, id_);359 }360 if (((bitField0_ & 0x00000002) == 0x00000002)) {361 output.writeBytes(2, getNameBytes());362 }363 if (((bitField0_ & 0x00000004) == 0x00000004)) {364 output.writeBytes(3, getSexBytes());365 }366 if (((bitField0_ & 0x00000008) == 0x00000008)) {367 output.writeBytes(4, getTelBytes());368 }369 getUnknownFields().writeTo(output);370 }371 372 private int memoizedSerializedSize = -1;373 374 public int getSerializedSize() {375 int size = memoizedSerializedSize;376 if (size != -1)377 return size;378 379 size = 0;380 if (((bitField0_ & 0x00000001) == 0x00000001)) {381 size += com.google.protobuf.CodedOutputStream.computeInt64Size(1, id_);382 }383 if (((bitField0_ & 0x00000002) == 0x00000002)) {384 size += com.google.protobuf.CodedOutputStream.computeBytesSize(2, getNameBytes());385 }386 if (((bitField0_ & 0x00000004) == 0x00000004)) {387 size += com.google.protobuf.CodedOutputStream.computeBytesSize(3, getSexBytes());388 }389 if (((bitField0_ & 0x00000008) == 0x00000008)) {390 size += com.google.protobuf.CodedOutputStream.computeBytesSize(4, getTelBytes());391 }392 size += getUnknownFields().getSerializedSize();393 memoizedSerializedSize = size;394 return size;395 }396 397 private static final long serialVersionUID = 0L;398 399 @java.lang.Override400 protected java.lang.Object writeReplace() throws java.io.ObjectStreamException {401 return super.writeReplace();402 }403 404 public static PersonProbuf.Person parseFrom(com.google.protobuf.ByteString data)405 throws com.google.protobuf.InvalidProtocolBufferException {406 return PARSER.parseFrom(data);407 }408 409 public static PersonProbuf.Person parseFrom(com.google.protobuf.ByteString data,410 com.google.protobuf.ExtensionRegistryLite extensionRegistry)411 throws com.google.protobuf.InvalidProtocolBufferException {412 return PARSER.parseFrom(data, extensionRegistry);413 }414 415 public static PersonProbuf.Person parseFrom(byte[] data)416 throws com.google.protobuf.InvalidProtocolBufferException {417 return PARSER.parseFrom(data);418 }419 420 public static PersonProbuf.Person parseFrom(byte[] data,421 com.google.protobuf.ExtensionRegistryLite extensionRegistry)422 throws com.google.protobuf.InvalidProtocolBufferException {423 return PARSER.parseFrom(data, extensionRegistry);424 }425 426 public static PersonProbuf.Person parseFrom(java.io.InputStream input) throws java.io.IOException {427 return PARSER.parseFrom(input);428 }429 430 public static PersonProbuf.Person parseFrom(java.io.InputStream input,431 com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException {432 return PARSER.parseFrom(input, extensionRegistry);433 }434 435 public static PersonProbuf.Person parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException {436 return PARSER.parseDelimitedFrom(input);437 }438 439 public static PersonProbuf.Person parseDelimitedFrom(java.io.InputStream input,440 com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException {441 return PARSER.parseDelimitedFrom(input, extensionRegistry);442 }443 444 public static PersonProbuf.Person parseFrom(com.google.protobuf.CodedInputStream input)445 throws java.io.IOException {446 return PARSER.parseFrom(input);447 }448 449 public static PersonProbuf.Person parseFrom(com.google.protobuf.CodedInputStream input,450 com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException {451 return PARSER.parseFrom(input, extensionRegistry);452 }453 454 public static Builder newBuilder() {455 return Builder.create();456 }457 458 public Builder newBuilderForType() {459 return newBuilder();460 }461 462 public static Builder newBuilder(PersonProbuf.Person prototype) {463 return newBuilder().mergeFrom(prototype);464 }465 466 public Builder toBuilder() {467 return newBuilder(this);468 }469 470 @java.lang.Override471 protected Builder newBuilderForType(com.google.protobuf.GeneratedMessage.BuilderParent parent) {472 Builder builder = new Builder(parent);473 return builder;474 }475 476 /**477 * Protobuf type {
@code Person}478 */479 public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder
480 implements PersonProbuf.PersonOrBuilder {481 public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() {482 return PersonProbuf.internal_static_Person_descriptor;483 }484 485 protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() {486 return PersonProbuf.internal_static_Person_fieldAccessorTable487 .ensureFieldAccessorsInitialized(PersonProbuf.Person.class, PersonProbuf.Person.Builder.class);488 }489 490 // Construct using PersonProbuf.Person.newBuilder()491 private Builder() {492 maybeForceBuilderInitialization();493 }494 495 private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {496 super(parent);497 maybeForceBuilderInitialization();498 }499 500 private void maybeForceBuilderInitialization() {501 if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {502 }503 }504 505 private static Builder create() {506 return new Builder();507 }508 509 public Builder clear() {510 super.clear();511 id_ = 0L;512 bitField0_ = (bitField0_ & ~0x00000001);513 name_ = "";514 bitField0_ = (bitField0_ & ~0x00000002);515 sex_ = "";516 bitField0_ = (bitField0_ & ~0x00000004);517 tel_ = "";518 bitField0_ = (bitField0_ & ~0x00000008);519 return this;520 }521 522 public Builder clone() {523 return create().mergeFrom(buildPartial());524 }525 526 public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() {527 return PersonProbuf.internal_static_Person_descriptor;528 }529 530 public PersonProbuf.Person getDefaultInstanceForType() {531 return PersonProbuf.Person.getDefaultInstance();532 }533 534 public PersonProbuf.Person build() {535 PersonProbuf.Person result = buildPartial();536 if (!result.isInitialized()) {537 throw newUninitializedMessageException(result);538 }539 return result;540 }541 542 public PersonProbuf.Person buildPartial() {543 PersonProbuf.Person result = new PersonProbuf.Person(this);544 int from_bitField0_ = bitField0_;545 int to_bitField0_ = 0;546 if (((from_bitField0_ & 0x00000001) == 0x00000001)) {547 to_bitField0_ |= 0x00000001;548 }549 result.id_ = id_;550 if (((from_bitField0_ & 0x00000002) == 0x00000002)) {551 to_bitField0_ |= 0x00000002;552 }553 result.name_ = name_;554 if (((from_bitField0_ & 0x00000004) == 0x00000004)) {555 to_bitField0_ |= 0x00000004;556 }557 result.sex_ = sex_;558 if (((from_bitField0_ & 0x00000008) == 0x00000008)) {559 to_bitField0_ |= 0x00000008;560 }561 result.tel_ = tel_;562 result.bitField0_ = to_bitField0_;563 onBuilt();564 return result;565 }566 567 public Builder mergeFrom(com.google.protobuf.Message other) {568 if (other instanceof PersonProbuf.Person) {569 return mergeFrom((PersonProbuf.Person) other);570 } else {571 super.mergeFrom(other);572 return this;573 }574 }575 576 public Builder mergeFrom(PersonProbuf.Person other) {577 if (other == PersonProbuf.Person.getDefaultInstance())578 return this;579 if (other.hasId()) {580 setId(other.getId());581 }582 if (other.hasName()) {583 bitField0_ |= 0x00000002;584 name_ = other.name_;585 onChanged();586 }587 if (other.hasSex()) {588 bitField0_ |= 0x00000004;589 sex_ = other.sex_;590 onChanged();591 }592 if (other.hasTel()) {593 bitField0_ |= 0x00000008;594 tel_ = other.tel_;595 onChanged();596 }597 this.mergeUnknownFields(other.getUnknownFields());598 return this;599 }600 601 public final boolean isInitialized() {602 return true;603 }604 605 public Builder mergeFrom(com.google.protobuf.CodedInputStream input,606 com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException {607 PersonProbuf.Person parsedMessage = null;608 try {609 parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);610 } catch (com.google.protobuf.InvalidProtocolBufferException e) {611 parsedMessage = (PersonProbuf.Person) e.getUnfinishedMessage();612 throw e;613 } finally {614 if (parsedMessage != null) {615 mergeFrom(parsedMessage);616 }617 }618 return this;619 }620 621 private int bitField0_;622 623 // optional int64 id = 1;624 private long id_;625 626 /**627 *
optional int64 id = 1;628 *629 *
630              *可选的字段,为64位整数..631              * 
632 */633 public boolean hasId() {634 return ((bitField0_ & 0x00000001) == 0x00000001);635 }636 637 /**638 *
optional int64 id = 1;639 *640 *
641              *可选的字段,为64位整数..642              * 
643 */644 public long getId() {645 return id_;646 }647 648 /**649 *
optional int64 id = 1;650 *651 *
652              *可选的字段,为64位整数..653              * 
654 */655 public Builder setId(long value) {656 bitField0_ |= 0x00000001;657 id_ = value;658 onChanged();659 return this;660 }661 662 /**663 *
optional int64 id = 1;664 *665 *
666              *可选的字段,为64位整数..667              * 
668 */669 public Builder clearId() {670 bitField0_ = (bitField0_ & ~0x00000001);671 id_ = 0L;672 onChanged();673 return this;674 }675 676 // optional string name = 2;677 private java.lang.Object name_ = "";678 679 /**680 *
optional string name = 2;681 */682 public boolean hasName() {683 return ((bitField0_ & 0x00000002) == 0x00000002);684 }685 686 /**687 *
optional string name = 2;688 */689 public java.lang.String getName() {690 java.lang.Object ref = name_;691 if (!(ref instanceof java.lang.String)) {692 java.lang.String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();693 name_ = s;694 return s;695 } else {696 return (java.lang.String) ref;697 }698 }699 700 /**701 *
optional string name = 2;702 */703 public com.google.protobuf.ByteString getNameBytes() {704 java.lang.Object ref = name_;705 if (ref instanceof String) {706 com.google.protobuf.ByteString b = com.google.protobuf.ByteString707 .copyFromUtf8((java.lang.String) ref);708 name_ = b;709 return b;710 } else {711 return (com.google.protobuf.ByteString) ref;712 }713 }714 715 /**716 *
optional string name = 2;717 */718 public Builder setName(java.lang.String value) {719 if (value == null) {720 throw new NullPointerException();721 }722 bitField0_ |= 0x00000002;723 name_ = value;724 onChanged();725 return this;726 }727 728 /**729 *
optional string name = 2;730 */731 public Builder clearName() {732 bitField0_ = (bitField0_ & ~0x00000002);733 name_ = getDefaultInstance().getName();734 onChanged();735 return this;736 }737 738 /**739 *
optional string name = 2;740 */741 public Builder setNameBytes(com.google.protobuf.ByteString value) {742 if (value == null) {743 throw new NullPointerException();744 }745 bitField0_ |= 0x00000002;746 name_ = value;747 onChanged();748 return this;749 }750 751 // optional string sex = 3;752 private java.lang.Object sex_ = "";753 754 /**755 *
optional string sex = 3;756 */757 public boolean hasSex() {758 return ((bitField0_ & 0x00000004) == 0x00000004);759 }760 761 /**762 *
optional string sex = 3;763 */764 public java.lang.String getSex() {765 java.lang.Object ref = sex_;766 if (!(ref instanceof java.lang.String)) {767 java.lang.String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();768 sex_ = s;769 return s;770 } else {771 return (java.lang.String) ref;772 }773 }774 775 /**776 *
optional string sex = 3;777 */778 public com.google.protobuf.ByteString getSexBytes() {779 java.lang.Object ref = sex_;780 if (ref instanceof String) {781 com.google.protobuf.ByteString b = com.google.protobuf.ByteString782 .copyFromUtf8((java.lang.String) ref);783 sex_ = b;784 return b;785 } else {786 return (com.google.protobuf.ByteString) ref;787 }788 }789 790 /**791 *
optional string sex = 3;792 */793 public Builder setSex(java.lang.String value) {794 if (value == null) {795 throw new NullPointerException();796 }797 bitField0_ |= 0x00000004;798 sex_ = value;799 onChanged();800 return this;801 }802 803 /**804 *
optional string sex = 3;805 */806 public Builder clearSex() {807 bitField0_ = (bitField0_ & ~0x00000004);808 sex_ = getDefaultInstance().getSex();809 onChanged();810 return this;811 }812 813 /**814 *
optional string sex = 3;815 */816 public Builder setSexBytes(com.google.protobuf.ByteString value) {817 if (value == null) {818 throw new NullPointerException();819 }820 bitField0_ |= 0x00000004;821 sex_ = value;822 onChanged();823 return this;824 }825 826 // optional string tel = 4;827 private java.lang.Object tel_ = "";828 829 /**830 *
optional string tel = 4;831 */832 public boolean hasTel() {833 return ((bitField0_ & 0x00000008) == 0x00000008);834 }835 836 /**837 *
optional string tel = 4;838 */839 public java.lang.String getTel() {840 java.lang.Object ref = tel_;841 if (!(ref instanceof java.lang.String)) {842 java.lang.String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();843 tel_ = s;844 return s;845 } else {846 return (java.lang.String) ref;847 }848 }849 850 /**851 *
optional string tel = 4;852 */853 public com.google.protobuf.ByteString getTelBytes() {854 java.lang.Object ref = tel_;855 if (ref instanceof String) {856 com.google.protobuf.ByteString b = com.google.protobuf.ByteString857 .copyFromUtf8((java.lang.String) ref);858 tel_ = b;859 return b;860 } else {861 return (com.google.protobuf.ByteString) ref;862 }863 }864 865 /**866 *
optional string tel = 4;867 */868 public Builder setTel(java.lang.String value) {869 if (value == null) {870 throw new NullPointerException();871 }872 bitField0_ |= 0x00000008;873 tel_ = value;874 onChanged();875 return this;876 }877 878 /**879 *
optional string tel = 4;880 */881 public Builder clearTel() {882 bitField0_ = (bitField0_ & ~0x00000008);883 tel_ = getDefaultInstance().getTel();884 onChanged();885 return this;886 }887 888 /**889 *
optional string tel = 4;890 */891 public Builder setTelBytes(com.google.protobuf.ByteString value) {892 if (value == null) {893 throw new NullPointerException();894 }895 bitField0_ |= 0x00000008;896 tel_ = value;897 onChanged();898 return this;899 }900 901 // @@protoc_insertion_point(builder_scope:Person)902 }903 904 static {905 defaultInstance = new Person(true);906 defaultInstance.initFields();907 }908 909 // @@protoc_insertion_point(class_scope:Person)910 }911 912 private static com.google.protobuf.Descriptors.Descriptor internal_static_Person_descriptor;913 private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_Person_fieldAccessorTable;914 915 public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() {916 return descriptor;917 }918 919 private static com.google.protobuf.Descriptors.FileDescriptor descriptor;920 static {921 java.lang.String[] descriptorData = {922 "\n\006person\"<\n\006Person\022\n\n\002id\030\001 \001(\003\022\014\n\004name\030\002"923 + " \001(\t\022\013\n\003sex\030\003 \001(\t\022\013\n\003tel\030\004 \001(\tB\016B\014Person"924 + "Probuf" };925 com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {926 public com.google.protobuf.ExtensionRegistry assignDescriptors(927 com.google.protobuf.Descriptors.FileDescriptor root) {928 descriptor = root;929 internal_static_Person_descriptor = getDescriptor().getMessageTypes().get(0);930 internal_static_Person_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable(931 internal_static_Person_descriptor, new java.lang.String[] { "Id", "Name", "Sex", "Tel", });932 return null;933 }934 };935 com.google.protobuf.Descriptors.FileDescriptor.internalBuildGeneratedFileFrom(descriptorData,936 new com.google.protobuf.Descriptors.FileDescriptor[] {}, assigner);937 }938 939 // @@protoc_insertion_point(outer_class_scope)940 }
PersonProbuf(自动生成)
1 public class ReqClient { 2  3     public void connect(String host, int port) throws Exception { 4         // 配置服务端的NIO线程组 5         EventLoopGroup group = new NioEventLoopGroup(); 6  7         try { 8             // Bootstrap 类,是启动NIO服务器的辅助启动类 9             Bootstrap b = new Bootstrap();10             b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)11                     .handler(new ChannelInitializer
() {12 @Override13 public void initChannel(SocketChannel ch) throws Exception {14 // 解码器在读时才用,编码器在写时才用15 16 // 解码类17 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());// frame解码得到ByteBuf18 ch.pipeline().addLast(new ProtobufDecoder(PersonProbuf.Person.getDefaultInstance()));// ByteBuf解码得到自定义的Protobuf类19 // 编码类20 ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());21 ch.pipeline().addLast(new ProtobufEncoder());22 23 ch.pipeline().addLast(new ReqClientHandler());24 25 }26 });27 28 // 发起异步连接操作29 ChannelFuture f = b.connect(host, port).sync();30 System.out.println("客户端启动.");31 32 // 等待客服端链路关闭33 f.channel().closeFuture().sync();34 } finally {35 group.shutdownGracefully();36 }37 }38 39 public static void main(String[] args) throws Exception {40 int port = 8080;41 if (args != null && args.length > 0) {42 try {43 port = Integer.valueOf(args[0]);44 } catch (NumberFormatException ex) {45 }46 }47 new ReqClient().connect("127.0.0.1", port);48 }49 }
ReqClient
1 public class ReqClientHandler extends ChannelHandlerAdapter {
// 由于添加了编码器和解码器,这里输出和读入的都已是自定义的Protobuf类型 2 3 @Override 4 public void channelActive(ChannelHandlerContext ctx) { 5 for (int i = 0; i < 2; i++) { 6 ctx.write(request(i)); 7 } 8 ctx.flush(); 9 }10 11 private PersonProbuf.Person request(int id) {12 PersonProbuf.Person.Builder builder = PersonProbuf.Person.newBuilder();13 builder.setId(id);14 builder.setName("orange");15 builder.setSex("man");16 builder.setTel("999");17 18 return builder.build();19 }20 21 @Override22 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {23 System.out.println("receive server response:[" + msg + "]");24 }25 26 @Override27 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {28 ctx.flush();29 }30 31 @Override32 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {33 cause.printStackTrace();34 ctx.close();35 }36 }
ReqClientHandler
1 public class ReqServer { 2  3     public void bind(int port) throws Exception { 4         EventLoopGroup bossGroup = new NioEventLoopGroup(); 5         EventLoopGroup WorkerGroup = new NioEventLoopGroup(); 6  7         try { 8             ServerBootstrap b = new ServerBootstrap(); 9             b.group(bossGroup, WorkerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)10                     .childHandler(new ChannelInitializer
() {11 @Override12 public void initChannel(SocketChannel ch) {13 // protobufDecoder仅仅负责编码,并不支持读半包,所以在之前,一定要有读半包的处理器。14 // 有三种方式可以选择:15 // 使用netty提供ProtobufVarint32FrameDecoder16 // 继承netty提供的通用半包处理器 LengthFieldBasedFrameDecoder17 // 继承ByteToMessageDecoder类,自己处理半包18 19 // 解码器在读时才用,编码器在写时才用20 21 // 解码类22 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());// frame解码得到ByteBuf23 ch.pipeline().addLast(new ProtobufDecoder(PersonProbuf.Person.getDefaultInstance()));// ByteBuf解码得到自定义的Protobuf类24 // 编码类25 ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());26 ch.pipeline().addLast(new ProtobufEncoder());27 28 ch.pipeline().addLast(new ReqServerHandler());29 }30 });31 32 // 绑定端口,同步等待成功33 ChannelFuture f = b.bind(port).sync();34 System.out.println("服务端启动.");35 36 // 等待服务端监听端口关闭37 f.channel().closeFuture().sync();38 } finally {39 // 释放线程池资源40 bossGroup.shutdownGracefully();41 WorkerGroup.shutdownGracefully();42 }43 }44 45 public static void main(String[] args) throws Exception {46 int port = 8080;47 if (args != null && args.length > 0) {48 try {49 port = Integer.valueOf(args[0]);50 } catch (NumberFormatException ex) {51 }52 }53 new ReqServer().bind(port);54 }55 56 }
ReqServer
1 public class ReqServerHandler extends ChannelHandlerAdapter {
//由于添加了编码器和解码器,这里输出和读入的都已是自定义的Protobuf类型 2 @Override 3 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 4 PersonProbuf.Person people = (PersonProbuf.Person) msg; 5 if ("Orange".equalsIgnoreCase(people.getName())) { 6 // if("Orange".equals(people.getName())){
7 System.out.println("accept client people:[" + people.toString() + "]"); 8 ctx.writeAndFlush(response(people.getId())); 9 }10 }11 12 private PersonProbuf.Person response(long peopleID) {13 PersonProbuf.Person.Builder builder = PersonProbuf.Person.newBuilder();14 builder.setId(peopleID);15 builder.setName("karl");16 builder.setSex("boy");17 builder.setTel("110");18 return builder.build();19 }20 21 @Override22 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {23 cause.printStackTrace();24 ctx.close();25 }26 }
ReqServerHandler

其中用到了自动生成的 PersonProbuf.java,生成方法如下:

1、从 下载压缩包并解压。(生成各种语言的代码时需要用到相关库).

2、从 下载protoc.exe放在上述包的src目录下(用于根据.proto文件生产对应的java类),不妨把src目录加入到环境变量。

3、定义消息格式,person.proto(消息格式的定义规则可参看 ),如下:

1 //option java_package = "Serialization_ProtoBuf.ProtoBuf"; //写包名的话,会自动创建包名指定的文件夹并把生成的类放在其下,此外,类里属性或方法的调用会加上全限定名,移动到其他地方时难改,所以最好别加此项。2 option java_outer_classname = "PersonProbuf"; 3 4 message Person {5     optional int64 id=1; //可选的字段,为64位整数..6     optional string name=2;7     optional string sex=3;8     optional string tel=4;9 }
person.proto

4、执行 protoc.exe --java_out=java类输出目录 proto文件路径 ,就自动生成了对应的类文件Personprobuf.java,文件名由person.proto里的 java_outer_classname 指定(很长。。。)

 

其他:Protobuf生成的类的基本操作(以Personprobuf.java为例)

1 class TestProtobuf { 2  3     public static void main(String[] args) { 4         PersonProbuf.Person.Builder builder = PersonProbuf.Person.newBuilder(); 5  6         builder.setId(1); 7         builder.setName("Karl"); 8         builder.setSex("boy"); 9         builder.setTel("110");10         PersonProbuf.Person person = builder.build();11 12         System.out.println(person.toString());13 14         System.out.println(person.toByteString());15         System.out.println();16 17         byte[] buf = person.toByteArray();18         for (byte b : buf) {19             System.out.print(b);20         }21         System.out.println("\n");22 23         try {24             PersonProbuf.Person person2 = PersonProbuf.Person.parseFrom(buf);25             System.out.println(person2.getName() + ", " + person2.getTel());26         } catch (InvalidProtocolBufferException e) {27             e.printStackTrace();28         }29     }30 }31 32 33 //结果34 id: 135 name: "Karl"36 sex: "boy"37 tel: "110"38 39 
40 41 8118475971141082639811112134349494842 43 Karl, 110
TestProtobuf

 

2、参考资料

1、 1.1

2、 官方示例

3、 官方示例翻译

4、 结合Protobuf

5、 Netty总结

 

转载于:https://www.cnblogs.com/z-sm/p/6749843.html

你可能感兴趣的文章
HTTP协议
查看>>
CentOS7 重置root密码
查看>>
Centos安装Python3
查看>>
PHP批量插入
查看>>
laravel连接sql server 2008
查看>>
Ubuntu菜鸟入门(五)—— 一些编程相关工具
查看>>
valgrind检测linux程序内存泄露
查看>>
Hadoop以及组件介绍
查看>>
1020 Tree Traversals (25)(25 point(s))
查看>>
第一次作业
查看>>
“==”运算符与equals()
查看>>
单工、半双工和全双工的定义
查看>>
Hdu【线段树】基础题.cpp
查看>>
时钟系统
查看>>
BiTree
查看>>
5个基于HTML5的加载动画推荐
查看>>
水平权限漏洞的修复方案
查看>>
静态链接与动态链接的区别
查看>>
Android 关于悬浮窗权限的问题
查看>>
如何使用mysql
查看>>