应用
本博客是根据黑马程序员Netty实战 学习时所做的笔记
1、粘包与半包 服务器代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class StudyServer { static final Logger log = LoggerFactory.getLogger(StudyServer.class); void start () { NioEventLoopGroup boss = new NioEventLoopGroup (1 ); NioEventLoopGroup worker = new NioEventLoopGroup (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("connected {}" , ctx.channel()); super .channelActive(ctx); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { log.debug("disconnect {}" , ctx.channel()); super .channelInactive(ctx); } }); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080 ); log.debug("{} binding..." , channelFuture.channel()); channelFuture.sync(); log.debug("{} bound..." , channelFuture.channel()); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); log.debug("stopped" ); } } public static void main (String[] args) { new StudyServer ().start(); } }
粘包现象 客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class StudyClient { static final Logger log = LoggerFactory.getLogger(StudyClient.class); public static void main (String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { log.debug("connected..." ); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); for (int i = 0 ; i < 10 ; i++) { ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(new byte []{0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 }); ctx.writeAndFlush(buffer); } } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1" , 8080 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error" , e); } finally { worker.shutdownGracefully(); } } }
服务器接收结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 7999 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x5b43ecb0 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :53797 ] READ: 160B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000010 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000020 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000030 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000040 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000050 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000060 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000070 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+
可见虽然客户端是分别以16字节为单位,通过channel向服务器发送了10次数据,可是 服务器端却只接收了一次,接收数据的大小为160B,即客户端发送的数据总大小,这就是粘包现象
半包现象 将客户端-服务器之间的channel容量进行调整
服务器代码
1 2 serverBootstrap.option(ChannelOption.SO_RCVBUF, 10 );
注意
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10) 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
服务器接收结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 5901 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :49679 ] READ: 36B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000010 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| |00000020 | 00 01 02 03 |.... | +--------+-------------------------------------------------+----------------+ 5901 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :49679 ] READ: 40B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000010 | 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000020 | 04 05 06 07 08 09 0a 0b |........ | +--------+-------------------------------------------------+----------------+ 5901 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :49679 ] READ: 40B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 0c 0d 0e 0f 00 01 02 03 04 05 06 07 08 09 0a 0b |................| |00000010 | 0c 0d 0e 0f 00 01 02 03 04 05 06 07 08 09 0a 0b |................| |00000020 | 0c 0d 0e 0f 00 01 02 03 |........ | +--------+-------------------------------------------------+----------------+ 5901 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :49679 ] READ: 40B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000010 | 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................| |00000020 | 04 05 06 07 08 09 0a 0b |........ | +--------+-------------------------------------------------+----------------+ 5901 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc73284f3 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :49679 ] READ: 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 0c 0d 0e 0f |.... | +--------+-------------------------------------------------+----------------+
可见客户端每次发送的数据,因channel容量不足,无法将发送的数据一次性接收 ,便产生了半包现象
现象分析 粘包
现象
原因
应用层
接收方 ByteBuf 设置太大(Netty 默认 1024);接收方缓冲数据,一起发送
传输层-网络层
滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且 窗口大小足够大(大于256 bytes),这 256 bytes 字节就会缓冲在接收方的滑动窗口中, 当滑动窗口中缓冲了多个报文就会粘包
Nagle 算法:会造成粘包
半包
现象
原因
应用层
传输层-网络层
滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
数据链路层
MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
本质 发生粘包与半包现象的本质是因为 TCP 是流式协议,消息无边界
解决方案 短链接 客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开 。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以 短链接无法解决半包现象
客户端代码改进
修改channelActive方法
1 2 3 4 5 6 7 8 public void channelActive (ChannelHandlerContext ctx) throws Exception { log.debug("sending..." ); ByteBuf buffer = ctx.alloc().buffer(16 ); buffer.writeBytes(new byte []{0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 }); ctx.writeAndFlush(buffer); ctx.channel().close(); }
将发送步骤整体封装为send()方法,调用10次send()方法,模拟发送10次数据
1 2 3 4 5 6 public static void main (String[] args) { for (int i = 0 ; i < 10 ; i++) { send(); } }
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 6452 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3eb6a684 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :65024 ] ACTIVE6468 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3eb6a684 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :65024 ] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 6468 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3eb6a684 , L:/127.0 .0 .1 :8080 ! R:/127.0 .0 .1 :65024 ] INACTIVE6483 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x7dcc31ff , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :65057 ] ACTIVE6483 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x7dcc31ff , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :65057 ] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................| +--------+-------------------------------------------------+----------------+ 6483 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x7dcc31ff , L:/127.0 .0 .1 :8080 ! R:/127.0 .0 .1 :65057 ] INACTIVE...
客户端先于服务器建立连接,此时控制台打印ACTIVE
,之后客户端向服务器发送了16B的数据,发送后断开连接,此时控制台打印INACTIVE
,可见 未出现粘包现象
定长解码器 客户端于服务器 约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度 。若发送数据长度不足则需要 补齐 至该长度
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分 ,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到 FixedLengthFrameDecoder 对数据进行定长解码 ,具体使用方法如下
1 ch.pipeline().addLast(new FixedLengthFrameDecoder (16 ));
客户端代码
客户端发送数据的代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final int maxLength = 16 ;char c = 'a' ;for (int i = 0 ; i < 10 ; i++) { ByteBuf buffer = ctx.alloc().buffer(maxLength); byte [] bytes = new byte [maxLength]; for (int j = 0 ; j < (int )(Math.random()*(maxLength-1 )); j++) { bytes[j] = (byte ) c; } buffer.writeBytes(bytes); c++; ctx.writeAndFlush(buffer); }
服务器代码
使用FixedLengthFrameDecoder
对粘包数据进行拆分,该handler需要添加在LoggingHandler
之前,保证数据被打印时已被拆分
1 2 3 ch.pipeline().addLast(new FixedLengthFrameDecoder (16 )); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG));
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 8222 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xbc122d07 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :52954 ] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 61 61 61 00 00 00 00 00 00 00 00 00 00 00 00 |aaaa............| +--------+-------------------------------------------------+----------------+ 8222 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xbc122d07 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :52954 ] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 62 62 62 00 00 00 00 00 00 00 00 00 00 00 00 00 |bbb.............| +--------+-------------------------------------------------+----------------+ 8222 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xbc122d07 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :52954 ] READ: 16B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 63 63 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |cc..............| +--------+-------------------------------------------------+----------------+ ...
行解码器 行解码器的是 通过分隔符对数据进行拆分 来解决粘包半包问题的
可以通过 LineBasedFrameDecoder(int maxLength)
来拆分以换行符(\n)为分隔符的数据,也可以通过 DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)
来 指定通过什么分隔符来拆分数据(可以传入多个分隔符)
两种解码器 都需要传入数据的最大长度 ,若超出最大长度,会抛出TooLongFrameException
异常
以换行符 \n 为分隔符
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final int maxLength = 64 ;char c = 'a' ;for (int i = 0 ; i < 10 ; i++) { ByteBuf buffer = ctx.alloc().buffer(maxLength); Random random = new Random (); StringBuilder sb = new StringBuilder (); for (int j = 0 ; j < (int )(random.nextInt(maxLength-2 )); j++) { sb.append(c); } sb.append("\n" ); buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8)); c++; ctx.writeAndFlush(buffer); }
服务器代码
1 2 3 4 ch.pipeline().addLast(new DelimiterBasedFrameDecoder (64 )); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG));
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 4184 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9d6ac701 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :58282 ] READ: 10B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaa | +--------+-------------------------------------------------+----------------+ 4184 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9d6ac701 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :58282 ] READ: 11B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 62 62 62 62 62 62 62 62 62 62 62 |bbbbbbbbbbb | +--------+-------------------------------------------------+----------------+ 4184 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9d6ac701 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :58282 ] READ: 2B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 63 63 |cc | +--------+-------------------------------------------------+----------------+ ...
以自定义分隔符 \c 为分隔符
客户端代码
1 2 3 4 5 6 7 ... sb.append("\\c" ); buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8)); ...
服务器代码
1 2 3 4 5 ByteBuf bufSet = ch.alloc().buffer().writeBytes("\\c" .getBytes(StandardCharsets.UTF_8));ch.pipeline().addLast(new DelimiterBasedFrameDecoder (64 , ch.alloc().buffer().writeBytes(bufSet))); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG));
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 8246 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x86215ccd , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :65159 ] READ: 14B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaa | +--------+-------------------------------------------------+----------------+ 8247 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x86215ccd , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :65159 ] READ: 3B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 62 62 62 |bbb | +--------+-------------------------------------------------+----------------+ ...
长度字段解码器 在传送数据时可以在数据中 添加一个用于表示有用数据长度的字段 ,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的
LengthFieldBasedFrameDecoder
解码器可以提供更为丰富的拆分方法,其构造方法有五个参数
1 2 3 4 public LengthFieldBasedFrameDecoder ( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip)
参数解析
maxFrameLength 数据最大长度
表示数据的最大长度(包括附加信息、长度标识等内容)
lengthFieldOffset 数据长度标识的起始偏移量
用于指明数据第几个字节开始是用于标识有用字节长度的,因为前面可能还有其他附加信息
lengthFieldLength 数据长度标识所占字节数 (用于指明有用数据的长度)
lengthAdjustment 长度表示与有用数据的偏移量
用于指明数据长度标识和有用数据之间的距离,因为两者之间还可能有附加信息
initialBytesToStrip 数据读取起点
读取起点,不读取 0 ~ initialBytesToStrip 之间的数据
参数图解
1 2 3 4 5 6 7 8 9 10 lengthFieldOffset = 0 lengthFieldLength = 2 lengthAdjustment = 0 initialBytesToStrip = 0 (= do not strip header) BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) +--------+----------------+ +--------+----------------+ | Length | Actual Content |----->| Length | Actual Content | | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" | +--------+----------------+ +--------+----------------+
从0开始即为长度标识,长度标识长度为2个字节
0x000C 即为后面 HELLO, WORLD
的长度
1 2 3 4 5 6 7 8 9 10 lengthFieldOffset = 0 lengthFieldLength = 2 lengthAdjustment = 0 initialBytesToStrip = 2 (= the length of the Length field) BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes) +--------+----------------+ +----------------+ | Length | Actual Content |----->| Actual Content | | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" | +--------+----------------+ +----------------+
从0开始即为长度标识,长度标识长度为2个字节,读取时从第二个字节开始读取 (此处即跳过长度标识)
因为跳过了用于表示长度的2个字节 ,所以此处直接读取HELLO, WORLD
1 2 3 4 5 6 7 8 9 10 lengthFieldOffset = 2 (= the length of Header 1) lengthFieldLength = 3 lengthAdjustment = 0 initialBytesToStrip = 0 BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) +----------+----------+----------------+ +----------+----------+----------------+ | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content | | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" | +----------+----------+----------------+ +----------+----------+----------------+
长度标识前面还有2个字节的其他内容 (0xCAFE),第三个字节开始才是长度标识,长度表示长度为3个字节(0x00000C)
Header1中有附加信息,读取长度标识时需要跳过这些附加信息来获取长度
1 2 3 4 5 6 7 8 9 10 lengthFieldOffset = 0 lengthFieldLength = 3 lengthAdjustment = 2 (= the length of Header 1 ) initialBytesToStrip = 0 BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) +----------+----------+----------------+ +----------+----------+----------------+ | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content | | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" | +----------+----------+----------------+ +----------+----------+----------------+
从0开始即为长度标识,长度标识长度为3个字节,长度标识之后还有2个字节的其他内容 (0xCAFE)
长度标识(0x00000C)表示的是从其后lengthAdjustment(2个字节)开始的数据的长度,即HELLO, WORLD ,不包括0xCAFE
1 2 3 4 5 6 7 8 9 10 lengthFieldOffset = 1 (= the length of HDR1) lengthFieldLength = 2 lengthAdjustment = 1 (= the length of HDR2) initialBytesToStrip = 3 (= the length of HDR1 + LEN) BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) +------+--------+------+----------------+ +------+----------------+ | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | +------+--------+------+----------------+ +------+----------------+
长度标识前面有1个字节的其他内容,后面也有1个字节的其他内容,读取时从长度标识之后3个字节处开始读取 ,即读取 0xFE HELLO, WORLD
使用
通过 EmbeddedChannel 对 handler 进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class EncoderStudy { public static void main (String[] args) { EmbeddedChannel channel = new EmbeddedChannel ( new LengthFieldBasedFrameDecoder (1024 , 1 , 4 , 1 , 0 ), new LoggingHandler (LogLevel.DEBUG) ); ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); send(buffer, "Hello" ); channel.writeInbound(buffer); send(buffer, "World" ); channel.writeInbound(buffer); } private static void send (ByteBuf buf, String msg) { int length = msg.length(); byte [] bytes = msg.getBytes(StandardCharsets.UTF_8); buf.writeByte(0xCA ); buf.writeInt(length); buf.writeByte(0xFE ); buf.writeBytes(bytes); } }
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | ca 00 00 00 05 fe 48 65 6c 6c 6f |......Hello | +--------+-------------------------------------------------+----------------+ 146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | ca 00 00 00 05 fe 57 6f 72 6c 64 |......World | +--------+-------------------------------------------------+----------------+
2、协议设计与解析 协议的作用 TCP/IP 中消息传输基于流的方式,没有边界
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
Redis协议 如果我们要向Redis服务器发送一条set name Nyima
的指令,需要遵守如下协议
1 2 3 4 5 6 7 8 9 10 11 *3 \r\n $3 \r\n set\r\n $4 \r\n name\r\n $5 \r\n Nyima\r\n
客户端代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class RedisClient { static final Logger log = LoggerFactory.getLogger(StudyServer.class); public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); try { ChannelFuture channelFuture = new Bootstrap () .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { final byte [] LINE = {'\r' ,'\n' }; ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes("*3" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("$3" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("set" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("$4" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("name" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("$5" .getBytes()); buffer.writeBytes(LINE); buffer.writeBytes("Nyima" .getBytes()); buffer.writeBytes(LINE); ctx.writeAndFlush(buffer); } }); } }) .connect(new InetSocketAddress ("localhost" , 6379 )); channelFuture.sync(); channelFuture.channel().close().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
控制台打印结果
1 2 3 4 5 6 7 8 1600 [nioEventLoopGroup-2 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x28c994f1 , L:/127.0 .0 .1 :60792 - R:localhost/127.0 .0 .1 :6379 ] WRITE: 34B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3. .$3. .set..$4. | |00000010 | 0a 6e 61 6d 65 0d 0a 24 35 0d 0a 4e 79 69 6d 61 |.name..$5. .Nyima| |00000020 | 0d 0a |.. | +--------+-------------------------------------------------+----------------+
Redis中查询执行结果
HTTP协议 HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec
作为服务器端的解码器与编码器,来处理HTTP请求
1 2 3 4 public final class HttpServerCodec extends CombinedChannelDuplexHandler <HttpRequestDecoder, HttpResponseEncoder> implements HttpServerUpgradeHandler .SourceCodec
服务器代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class HttpServer { static final Logger log = LoggerFactory.getLogger(StudyServer.class); public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); new ServerBootstrap () .group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec ()); ch.pipeline().addLast(new SimpleChannelInboundHandler <HttpRequest>() { @Override protected void channelRead0 (ChannelHandlerContext ctx, HttpRequest msg) { log.debug(msg.uri()); DefaultFullHttpResponse response = new DefaultFullHttpResponse (msg.protocolVersion(), HttpResponseStatus.OK); byte [] bytes = "<h1>Hello, World!</h1>" .getBytes(StandardCharsets.UTF_8); response.headers().setInt(CONTENT_LENGTH, bytes.length); response.content().writeBytes(bytes); ctx.writeAndFlush(response); } }); } }) .bind(8080 ); } }
服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求 即可
1 2 ch.pipeline().addLast(new SimpleChannelInboundHandler <HttpRequest>()
获得请求后,需要返回响应给浏览器。需要创建响应对象DefaultFullHttpResponse
,设置HTTP版本号及状态码,为避免浏览器获得响应后,因为获得CONTENT_LENGTH
而一直空转,需要添加CONTENT_LENGTH
字段,表明响应体中数据的具体长度
1 2 3 4 5 6 7 8 DefaultFullHttpResponse response = new DefaultFullHttpResponse (msg.protocolVersion(), HttpResponseStatus.OK);byte [] bytes = "<h1>Hello, World!</h1>" .getBytes(StandardCharsets.UTF_8);response.headers().setInt(CONTENT_LENGTH, bytes.length); response.content().writeBytes(bytes);
运行结果
浏览器
控制台
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 1714 [nioEventLoopGroup-2 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7 , L:/0 :0 :0 :0 :0 :0 :0 :1 :8080 - R:/0 :0 :0 :0 :0 :0 :0 :1 :55503 ] READ: 688B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico| |00000010 | 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1 ..Host:| |00000020 | 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080. | |00000030 | 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee| |00000040 | 70 2d 61 6c 69 76 65 0d 0a 50 72 61 67 6d 61 3a |p-alive..Pragma:| .... 1716 [nioEventLoopGroup-2 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7 , L:/0 :0 :0 :0 :0 :0 :0 :1 :8080 - R:/0 :0 :0 :0 :0 :0 :0 :1 :55503 ] WRITE: 61B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.| |00000010 | 0a 43 6f 6e 74 65 6e 74 2d 4c 65 6e 67 74 68 3a |.Content-Length:| |00000020 | 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22. ...<h1>Hello| |00000030 | 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e |, World!</h1> | +--------+-------------------------------------------------+----------------+
自定义协议 组成要素
编码器与解码器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class MessageCodec extends ByteToMessageCodec <Message> { @Override protected void encode (ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { out.writeBytes(new byte []{'N' ,'Y' ,'I' ,'M' }); out.writeByte(1 ); out.writeByte(1 ); out.writeByte(msg.getMessageType()); out.writeInt(msg.getSequenceId()); out.writeByte(0xff ); ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (bos); oos.writeObject(msg); byte [] bytes = bos.toByteArray(); out.writeInt(bytes.length); out.writeBytes(bytes); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magic = in.readInt(); byte version = in.readByte(); byte seqType = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte [] bytes = new byte [length]; in.readBytes(bytes, 0 , length); ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (bytes)); Message message = (Message) ois.readObject(); out.add(message); System.out.println("===========魔数===========" ); System.out.println(magic); System.out.println("===========版本号===========" ); System.out.println(version); System.out.println("===========序列化方法===========" ); System.out.println(seqType); System.out.println("===========指令类型===========" ); System.out.println(messageType); System.out.println("===========请求序号===========" ); System.out.println(sequenceId); System.out.println("===========正文长度===========" ); System.out.println(length); System.out.println("===========正文===========" ); System.out.println(message); } }
编码器与解码器方法源于父类ByteToMessageCodec ,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类 。此处使用了自定义类Message,代表消息
1 public class MessageCodec extends ByteToMessageCodec<Message>
编码器负责将附加信息与正文信息写入到ByteBuf中 ,其中附加信息总字节数最好为2n,不足需要补齐 。正文内容如果为对象,需要通过序列化 将其放入到ByteBuf中
解码器负责将ByteBuf中的信息取出,并放入List中 ,该List用于将信息传递给下一个handler
编写测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class TestCodec { static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class); public static void main (String[] args) throws Exception { EmbeddedChannel channel = new EmbeddedChannel (); channel.pipeline().addLast(new LengthFieldBasedFrameDecoder (1024 , 12 , 4 , 0 , 0 )); channel.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); channel.pipeline().addLast(new MessageCodec ()); LoginRequestMessage user = new LoginRequestMessage ("Nyima" , "123" ); ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec ().encode(null , user, byteBuf); channel.writeInbound(byteBuf); } }
测试类中用到了LengthFieldBasedFrameDecoder,避免粘包半包问题
通过MessageCodec的encode方法将附加信息与正文写入到ByteBuf中,通过channel执行入站操作。入站时会调用decode方法进行解码
运行结果
@Sharable注解 为了提高handler的复用率,可以将handler创建为handler对象 ,然后在不同的channel中使用该handler对象进行处理操作
1 2 3 4 LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG); // 不同的channel中使用同一个handler对象,提高复用率 channel1.pipeline().addLast(loggingHandler); channel2.pipeline().addLast(loggingHandler);
但是并不是所有的handler都能通过这种方法来提高复用率的 ,例如LengthFieldBasedFrameDecoder
。如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题
channel1中收到了一个半包,LengthFieldBasedFrameDecoder发现不是一条完整的数据,则没有继续向下传播
此时channel2中也收到了一个半包,因为两个channel使用了同一个LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个完整的数据包 。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误
为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用@Sharable注解来标明,该handler能否在多个channel中共享。
只有带有该注解,才能通过对象的方式被共享 ,否则无法被共享
自定义编解码器能否使用@Sharable注解 这需要根据自定义的handler的处理逻辑进行分析
我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加@Sharable注解的
但是实际情况我们并不能 添加该注解,会抛出异常信息ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared
如果想要共享,需要怎么办呢?
继承MessageToMessageDecoder 即可。该类的目标是:将已经被处理的完整数据再次被处理。 传过来的Message如果是被处理过的完整数据 ,那么被共享也就不会出现问题了,也就可以使用@Sharable注解了。实现方式与ByteToMessageCodec类似
1 2 3 4 5 6 7 8 9 10 11 12 @ChannelHandler .Sharablepublic class MessageSharableCodec extends MessageToMessageCodec <ByteBuf, Message> { @Override protected void encode (ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception { ... } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { ... } }
3、在线聊天室 聊天室业务 用户登录接口 1 2 3 4 5 6 7 8 9 10 public interface UserService { boolean login (String username, String password) ; }
用户会话接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public interface Session { void bind (Channel channel, String username) ; void unbind (Channel channel) ; Object getAttribute (Channel channel, String name) ; void setAttribute (Channel channel, String name, Object value) ; Channel getChannel (String username) ; }
群聊会话接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public interface GroupSession { Group createGroup (String name, Set<String> members) ; Group joinMember (String name, String member) ; Group removeMember (String name, String member) ; Group removeGroup (String name) ; Set<String> getMembers (String name) ; List<Channel> getMembersChannel (String name) ; boolean isCreated (String name) ; }
整体结构
client包:存放客户端相关类
message包:存放各种类型的消息
protocol包:存放自定义协议
server包:存放服务器相关类
service包:存放用户相关类
session包:单聊及群聊相关会话类
客户端代码结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ChatClient { static final Logger log = LoggerFactory.getLogger(ChatClient.class); public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler loggingHandler = new LoggingHandler (LogLevel.DEBUG); MessageSharableCodec messageSharableCodec = new MessageSharableCodec (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(loggingHandler); ch.pipeline().addLast(messageSharableCodec); } }); Channel channel = bootstrap.connect().sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
服务器代码结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class ChatServer { static final Logger log = LoggerFactory.getLogger(ChatServer.class); public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); LoggingHandler loggingHandler = new LoggingHandler (LogLevel.DEBUG); MessageSharableCodec messageSharableCodec = new MessageSharableCodec (); try { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group(boss, worker); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtocolFrameDecoder ()); ch.pipeline().addLast(loggingHandler); ch.pipeline().addLast(messageSharableCodec); } }); Channel channel = bootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
登录 客户端代码 客户端添加如下handler, 分别处理登录、聊天等操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { new Thread (()->{ Scanner scanner = new Scanner (System.in); System.out.println("请输入用户名" ); String username = scanner.next(); System.out.println("请输入密码" ); String password = scanner.next(); LoginRequestMessage message = new LoginRequestMessage (username, password); ctx.writeAndFlush(message); System.out.println("等待后续操作..." ); try { waitLogin.await(); } catch (InterruptedException e) { e.printStackTrace(); } if (!loginStatus.get()) { ctx.channel().close(); return ; } while (true ) { System.out.println("==================================" ); System.out.println("send [username] [content]" ); System.out.println("gsend [group name] [content]" ); System.out.println("gcreate [group name] [m1,m2,m3...]" ); System.out.println("gmembers [group name]" ); System.out.println("gjoin [group name]" ); System.out.println("gquit [group name]" ); System.out.println("quit" ); System.out.println("==================================" ); String command = scanner.nextLine(); String[] commands = command.split(" " ); switch (commands[0 ]){ case "send" : ctx.writeAndFlush(new ChatRequestMessage (username, commands[1 ], commands[2 ])); break ; case "gsend" : ctx.writeAndFlush(new GroupChatRequestMessage (username,commands[1 ], commands[2 ])); break ; case "gcreate" : String[] members = commands[2 ].split("," ); Set<String> set = new HashSet <>(Arrays.asList(members)); set.add(username); ctx.writeAndFlush(new GroupCreateRequestMessage (commands[1 ],set)); break ; case "gmembers" : ctx.writeAndFlush(new GroupMembersRequestMessage (commands[1 ])); break ; case "gjoin" : ctx.writeAndFlush(new GroupJoinRequestMessage (username, commands[1 ])); break ; case "gquit" : ctx.writeAndFlush(new GroupQuitRequestMessage (username, commands[1 ])); break ; case "quit" : ctx.channel().close(); return ; default : System.out.println("指令有误,请重新输入" ); continue ; } } }, "login channel" ).start(); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("{}" , msg); if (msg instanceof LoginResponseMessage) { LoginResponseMessage message = (LoginResponseMessage) msg; boolean isSuccess = message.isSuccess(); if (isSuccess) { loginStatus.set(true ); } waitLogin.countDown(); } } });
服务器代码 服务器添加如下handler,并添加到对应的channel中,负责处理登录请求信息,并作出响应
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @ChannelHandler .Sharable public class LoginRequestMessageHandler extends SimpleChannelInboundHandler <LoginRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception { String username = msg.getUsername(); String password = msg.getPassword(); boolean login = UserServiceFactory.getUserService().login(username, password); LoginResponseMessage message; if (login) { message = new LoginResponseMessage (true , "登陆成功" ); SessionFactory.getSession().bind(ctx.channel(), username); } else { message = new LoginResponseMessage (false , "登陆失败" ); } ctx.writeAndFlush(message); } } LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler ();ch.pipeline().addLast(new LoginRequestMessageHandler ());
运行结果 客户端
1 2 3 4 5665 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317 , 1 , 1 , 1 , 0 , 279 5667 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:AbstractResponseMessage{success=true , reason='登陆成功' }5667 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true , reason='登陆成功' }success
服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 11919 [nioEventLoopGroup-3 -1 ] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317 , 1 , 1 , 0 , 0 , 217 11919 [nioEventLoopGroup-3 -1 ] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:LoginRequestMessage{username='Nyima' , password='123' }7946 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x8e7c07f6 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :60572 ] WRITE: 295B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 4e 59 49 4d 01 01 01 00 00 00 00 ff 00 00 01 17 |NYIM............| |00000010 | ac ed 00 05 73 72 00 31 63 6e 2e 6e 79 69 6d 61 |....sr.1cn.nyima| |00000020 | 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes| |00000030 | 73 61 67 65 2e 4c 6f 67 69 6e 52 65 73 70 6f 6e |sage.LoginRespon| |00000040 | 73 65 4d 65 73 73 61 67 65 e2 34 49 24 72 52 f3 |seMessage.4I$rR.| |00000050 | 07 02 00 00 78 72 00 34 63 6e 2e 6e 79 69 6d 61 |....xr.4cn.nyima| |00000060 | 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes| |00000070 | 73 61 67 65 2e 41 62 73 74 72 61 63 74 52 65 73 |sage.AbstractRes| |00000080| 70 6f 6e 73 65 4d 65 73 73 61 67 65 b3 7e 19 32 |ponseMessage.~.2 | |00000090| 9b 88 4d 7b 02 00 02 5a 00 07 73 75 63 63 65 73 |..M{...Z..succes| |000000a0| 73 4c 00 06 72 65 61 73 6f 6e 74 00 12 4c 6a 61 |sL..reasont..Lja| |000000b0| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 78 |va/lang/String;x| |000000c0| 72 00 24 63 6e 2e 6e 79 69 6d 61 63 2e 73 74 75 |r.$cn.nyimac.stu| |000000d0| 64 79 2e 64 61 79 38 2e 6d 65 73 73 61 67 65 2e |dy.day8.message.| |000000e0 | 4d 65 73 73 61 67 65 dd e9 84 b7 21 db 18 52 02 |Message....!..R.| |000000f0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType| |00000100 | 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.| |00000110 | 00 00 00 00 00 00 00 01 74 00 0c e7 99 bb e9 99 |........t.......| |00000120 | 86 e6 88 90 e5 8a 9f |....... | +--------+-------------------------------------------------+----------------+
单聊 客户端输入send username content
即可发送单聊消息,需要服务器端添加处理ChatRequestMessage的handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @ChannelHandler .Sharable public class ChatRequestMessageHandler extends SimpleChannelInboundHandler <ChatRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception { Channel channel = SessionFactory.getSession().getChannel(msg.getTo()); if (channel != null ) { channel.writeAndFlush(new ChatResponseMessage (msg.getFrom(), msg.getContent())); } else { ctx.writeAndFlush(new ChatResponseMessage (false , "对方用户不存在或离线,发送失败" )); } } } ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler ();ch.pipeline().addLast(chatRequestMessageHandler);
运行结果
发送方(zhangsan)
接收方(Nyima)
1 2 20230 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - ChatResponseMessage{from='zhangsan' , content='hello' }
群聊 创建 添加处理GroupCreateRequestMessage
的handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @ChannelHandler .Sharablepublic class GroupCreateMessageHandler extends SimpleChannelInboundHandler <GroupCreateRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception { String groupName = msg.getGroupName(); Set<String> members = msg.getMembers(); Group group = GroupSessionFactory.getGroupSession().createGroup(groupName, members); if (group == null ) { GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage (true , groupName + "创建成功" ); ctx.writeAndFlush(groupCreateResponseMessage); List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName); groupCreateResponseMessage = new GroupCreateResponseMessage (true , "您已被拉入" +groupName); for (Channel channel : membersChannel) { channel.writeAndFlush(groupCreateResponseMessage); } } else { GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage (false , groupName + "已存在" ); ctx.writeAndFlush(groupCreateResponseMessage); } } } GroupCreateMessageHandler groupCreateMessageHandler = new GroupCreateMessageHandler ();ch.pipeline().addLast(groupCreateMessageHandler);
运行结果
创建者客户端
1 2 3 4 5 6 7 8 9 10 gcreate Netty学习 zhangsan,lisi 31649 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true , reason='Netty学习创建成功' }15244 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true , reason='您已被拉入Netty学习' }gcreate Netty学习 zhangsan,lisi 40771 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false , reason='Netty学习已存在' }
群员客户端
1 28788 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true , reason='您已被拉入Netty学习' }
聊天 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @ChannelHandler .Sharablepublic class GroupChatMessageHandler extends SimpleChannelInboundHandler <GroupChatRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception { String groupName = msg.getGroupName(); GroupSession groupSession = GroupSessionFactory.getGroupSession(); boolean isCreated = groupSession.isCreated(groupName); if (isCreated) { List<Channel> membersChannel = groupSession.getMembersChannel(groupName); for (Channel channel : membersChannel) { channel.writeAndFlush(new GroupChatResponseMessage (msg.getFrom(), msg.getContent())); } } else { ctx.writeAndFlush(new GroupChatResponseMessage (false , "群聊不存在" )); } } } GroupChatMessageHandler groupChatMessageHandler = new GroupChatMessageHandler ();ch.pipeline().addLast(groupChatMessageHandler);
运行结果
发送方(群聊存在)
1 2 3 gsend Netty学习 你们好 45408 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupChatResponseMessage{from='zhangsan' , content='你们好' }
接收方
1 48082 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupChatResponseMessage{from='zhangsan' , content='你们好' }
发送方(群聊不存在)
1 2 3 gsend Spring学习 你们好 25140 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false , reason='群聊不存在' }
加入 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @ChannelHandler .Sharablepublic class GroupJoinMessageHandler extends SimpleChannelInboundHandler <GroupJoinRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception { GroupSession groupSession = GroupSessionFactory.getGroupSession(); Set<String> members = groupSession.getMembers(msg.getGroupName()); boolean joinFlag = false ; if (!members.contains(msg.getUsername()) && groupSession.isCreated(msg.getGroupName())) { joinFlag = true ; } if (joinFlag) { groupSession.joinMember(msg.getGroupName(), msg.getUsername()); ctx.writeAndFlush(new GroupJoinResponseMessage (true ,"加入" +msg.getGroupName()+"成功" )); } else { ctx.writeAndFlush(new GroupJoinResponseMessage (false , "加入失败,群聊未存在或您已加入该群聊" )); } } } GroupJoinMessageHandler groupJoinMessageHandler = new GroupJoinMessageHandler ();ch.pipeline().addLast(groupJoinMessageHandler);
运行结果
正常加入群聊
1 94921 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true , reason='加入Netty学习成功' }
加入不能存在或已加入的群聊
1 44025 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false , reason='加入失败,群聊未存在或您已加入该群聊' }
退出 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @ChannelHandler .Sharablepublic class GroupQuitMessageHandler extends SimpleChannelInboundHandler <GroupQuitRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception { GroupSession groupSession = GroupSessionFactory.getGroupSession(); String groupName = msg.getGroupName(); Set<String> members = groupSession.getMembers(groupName); String username = msg.getUsername(); boolean joinFlag = false ; if (groupSession.isCreated(groupName) && members.contains(username)) { joinFlag = true ; } if (joinFlag) { groupSession.removeMember(groupName, username); ctx.writeAndFlush(new GroupQuitResponseMessage (true , "退出" +groupName+"成功" )); } else { ctx.writeAndFlush(new GroupQuitResponseMessage (false , "群聊不存在或您未加入该群,退出" +groupName+"失败" )); } } } GroupQuitMessageHandler groupQuitMessageHandler = new GroupQuitMessageHandler ();ch.pipeline().addLast(groupQuitMessageHandler);
运行结果
正常退出
1 32282 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true , reason='退出Netty学习成功' }
退出不存在或未加入的群聊
1 67404 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false , reason='群聊不存在或您未加入该群,退出Netty失败' }
查看成员 1 2 3 4 5 6 7 8 9 10 @ChannelHandler .Sharablepublic class GroupMembersMessageHandler extends SimpleChannelInboundHandler <GroupMembersRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception { ctx.writeAndFlush(new GroupMembersResponseMessage (GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName()))); } } GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler ();ch.pipeline().addLast(groupMembersMessageHandler);
运行结果
1 46557 [nioEventLoopGroup-2 -1 ] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupMembersResponseMessage{members=[zhangsan, Nyima]}
退出聊天室 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @ChannelHandler .Sharablepublic class QuitHandler extends ChannelInboundHandlerAdapter { @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { SessionFactory.getSession().unbind(ctx.channel()); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { SessionFactory.getSession().unbind(ctx.channel()); } } QuitHandler quitHandler = new QuitHandler ();ch.pipeline().addLast(quitHandler);
退出时,客户端会关闭channel并返回
1 2 3 4 case "quit" : ctx.channel().close(); return ;
空闲检测 连接假死 原因
网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到 ,仍然占用着资源
公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,会白白地消耗资源
应用程序线程阻塞,无法进行数据读写
问题
假死的连接占用的资源不能自动释放
向假死的连接发送数据,得到的反馈是发送超时
解决方法 可以添加IdleStateHandler
对空闲时间进行检测,通过构造函数可以传入三个参数
readerIdleTimeSeconds 读空闲经过的秒数
writerIdleTimeSeconds 写空闲经过的秒数
allIdleTimeSeconds 读和写空闲经过的秒数
当指定时间内未发生读或写事件时,会触发特定事件
读空闲会触发READER_IDLE
写空闲会触发WRITE_IDLE
读和写空闲会触发ALL_IDEL
想要处理这些事件,需要自定义事件处理函数
服务器端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ch.pipeline().addLast(new IdleStateHandler (5 , 0 , 0 )); ch.pipeline().addLast(new ChannelDuplexHandler () { @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { ctx.channel().close(); } } });
使用IdleStateHandler
进行空闲检测
使用双向处理器
对入站与出站事件进行处理
IdleStateHandler
中的事件为特殊事件,需要实现ChannelDuplexHandler
的userEventTriggered
方法,判断事件类型并自定义处理方式,来对事件进行处理
为 避免因非网络等原因引发的READ_IDLE事件 ,比如网络情况良好,只是用户本身没有输入数据,这时发生READ_IDLE事件, 直接让服务器断开连接是不可取的
为避免此类情况,需要在 客户端向服务器发送心跳包 ,发送频率要小于 服务器设置的IdleTimeSeconds
,一般设置为其值的一半
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ch.pipeline().addLast(new IdleStateHandler (0 , 3 , 0 )); ch.pipeline().addLast(new ChannelDuplexHandler () { @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.WRITER_IDLE) { ctx.writeAndFlush(new PingMessage ()); } } });