“This is the 21st day of my participation in the First Challenge 2022. For details: First Challenge 2022”

Netty Sticking and unpacking packets

TCP sticky packet unpacking refers to that several packets sent by the sender are glued into one packet or a certain packet is unwrapped for receiving. As shown in the following figure, the client sends two packets, D1 and D2, but the server may receive data in the following cases.

Why do sticky bags appear

TCP is connection-oriented and stream-oriented and provides high reliability services. Both the sending and receiving ends (client and server) need to have pairs of sockets. Therefore, in order to send multiple packets to the receiving end more effectively, the sending end uses optimization method (Nagle algorithm) to merge the data with smaller intervals and small data volume into a large data block and then encapsulate the packet. This improves efficiency, but it makes it difficult for the receiver to distinguish the complete packet because flow-oriented communication has no message protection boundaries. The following code shows an example of sticky packages:

Server code

Service startup code

package com.jony.netty.chat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class ChatServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Add decoder to pipeline. AddLast ("decoder", new StringDecoder()); Pipeline. AddLast ("encoder", new StringEncoder()); pipeline.addLast(new ChatServerHandler()); // add your own business handler}}); System.out.println(" Chatroom server started... ") ); ChannelFuture channelFuture = bootstrap.bind(9999).sync(); // Close channelfuture.channel ().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

Send and receive message code

package com.jony.netty.chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; Public class ChatServerHandler extends SimpleChannelInboundHandler < String > {/ / GlobalEventExecutor. The INSTANCE is a global event actuators, Is a singleton private static ChannelGroup ChannelGroup = new DefaultChannelGroup (GlobalEventExecutor. INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // Indicates that the channel is ready, @override public void channelActive(ChannelHandlerContext CTX) throws Exception {Channel Channel = ctx.channel(); // This method will iterate through all channels in the channelGroup, And send the message channelGroup. WriteAndFlush (" [client] "+ channel. RemoteAddress () +" launched "+ SDF. The format (new Java. Util. The Date ()) + "\n"); // Add the current channel to channelGroup channelgroup.add (channel); System.out.println(ctx.channel().remoteAddress() + "live "+ "\n"); } // Indicates that the channel is inactive, @Override public void channelInactive(ChannelHandlerContext CTX) throws Exception {Channel Channel = ctx.channel(); / / to the current online customers will leave the customer information push channelGroup. WriteAndFlush (" [client] "+ channel. RemoteAddress () +" offline "+" \ n "); System.out.println(ctx.channel().remoteAddress() + "offline "+ "\n"); System.out.println("channelGroup size=" + channelGroup.size()); } @override protected void channelRead0(ChannelHandlerContext CTX, String MSG) throws Exception {// Obtain the current channel channel channel = ctx.channel(); ForEach (ch -> {if (channel! WriteAndFlush ("[client]" + channel.remoteAddress() + "send message:" + MSG + "\n"); WriteAndFlush (" + MSG + "\n");} else {writeAndFlush(" + MSG + "\n"); }}); System.out.println(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// Close the channel ctx.close(); }}Copy the code

Client code

Client connection code

package com.jony.netty.chat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class ChatClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); pipeline.addLast(new ChatClientHandler()); }}); ChannelFuture = bootstrap.connect("127.0.0.1", 9999).sync(); Channel = channelfuture.channel (); System.out.println("========" + channel.localAddress() + "========"); // Scanner Scanner = new Scanner(system.in); // Scanner Scanner = new Scanner(system.in); // while (scanner.hasNextLine()) { // String msg = scanner.nextLine(); // // send to server via channel // channel.writeandFlush (MSG); // } for (int i = 0; i < 200; I++) {channel.writeandflush ("hello, jony!" ); } } finally { // group.shutdownGracefully(); }}}Copy the code

The client code mainly uses the following code to send messages frequently

for (int i = 0; i < 200; I++) {channel.writeandflush ("hello, jony!" ); }Copy the code

Client sends and receives message code

package com.jony.netty.chat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); }}Copy the code

The execution result

The result is that the for loop only sends a message once, but after TCP is optimized, the message is stuck to the packet. Although the efficiency is improved, this is not the desired result. Here is how to unpack the message

[himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony!Copy the code

The solution

1) Format data: each piece of data has a fixed format (start, end), this method is simple, but when selecting the start and end of the character must pay attention to each piece of data must not appear inside the start or end character. Disadvantages: When sending a message to this solution can be added to the message of a fixed delimiter, reads the message in again through the separator for unpacking, but this scheme, low code maintainability, on the one hand, one thousand when sending a message to have our default delimiter, message will be disorder, on the other hand, the other personnel in the maintenance of the code, Or when adding other logic, it’s easy to ignore this separator without knowing it.

2) Sending length: Send each piece of data together with the length of the data. For example, the first four bits of each piece of data can be selected as the length of the data. The application layer can determine the start and end of each piece of data according to the length.

Through the understanding of the above scheme, obviously the second scheme is more secure, nonsense not to say, on the code.

unpacking

Create a wrapper class for the message

You need two fields, one character length and one character content

package com.jony.netty.split; Public class MyMessageProtocol {public class MyMessageProtocol {private int len; Private byte[] content; public int getLen() { return len; } public void setLen(int len) { this.len = len; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; }}Copy the code

Server code

Server connects and adds codecs

If the server does not send messages and only needs to receive messages, only the decoder can be added

package com.jony.netty.split; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class MyServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageDecoder()); pipeline.addLast(new MyServerHandler()); }}); System.out.println("netty server start." ); ChannelFuture channelFuture = serverBootstrap.bind(9000).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

Core code: the main added decoder and message processor

pipeline.addLast(new MyMessageDecoder()); 
pipeline.addLast(new MyServerHandler());
Copy the code

Decoder (Data processing and parsing)

This code encapsulates the length of the string in the class based on the message, then reads the string and sends it to the next Hander

package com.jony.netty.split; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class MyMessageDecoder extends ByteToMessageDecoder { int length = 0; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println(); System.out.println("MyMessageDecoder decode is called "); // Binary bytecode -> MyMessageProtocol packet (object) system.out.println (in); if(in.readableBytes() >= 4) { if (length == 0){ length = in.readInt(); } if (in.readableBytes() < length) {system.out.println (" There is not enough data currently readable, continue to wait." ); return; } byte[] content = new byte[length]; if (in.readableBytes() >= length){ in.readBytes(content); MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); out.add(messageProtocol); } length = 0; }}}Copy the code

Read and write messages

package com.jony.netty.split; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class MyServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol MSG) throws Exception {system.out. println("==== the server receives the following message ===="); System.out.println(" length =" + msg.getlen ()); System.out.println(" content =" + new String(msg.getContent(), charsetutil.utf_8)); System.out.println(" Number of packets received by the server =" + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

Client code

Client connects and adds related components

package com.jony.netty.split; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class MyClient { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageEncoder()); pipeline.addLast(new MyClientHandler()); }}); System.out.println("netty client start." ); ChannelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); ChannelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); }}}Copy the code

Client encoder

package com.jony.netty.split; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception {system.out.println ("MyMessageEncoder encode method called "); out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); }}Copy the code

Client message handler

Each sent message is converted to byte, and then the message length and content are set into the message encapsulation class for sending

package com.jony.netty.split; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for(int i = 0; i< 200; I++) {String MSG = "hello, I am zhang SAN!" ; MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length); messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(messageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

The execution result

Normal reading

MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 0, widx: 1024, cap: 1024) ==== The server receives the following message ==== Length =24 Content = Hello, THIS is Zhang SAN! The server receives the message package number = 1 MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 28, widx: 1024, cap: 1024) ==== The server receives the following message ==== Length =24 Content = Hello, THIS is Zhang SAN! The server receives the message package number = 2 MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 56, widx: 1024, cap: 1024) ==== The server receives the following message ==== Length =24 Content = Hello, THIS is Zhang SAN! Number of packets received by the server =3Copy the code

Based on the above information, we can see the following code, we currently read a message of 1024 length, and the message content is no sticky packet situation.

PooledUnsafeDirectByteBuf(ridx: 0, widx: 1024, cap: 1024)
Copy the code

Message discontinuity

MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 980, widx: 1024, cap: 1024) ==== The server receives the following message ==== Length =24 Content = Hello, THIS is Zhang SAN! The server receives the message package number = 36 MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 1008, widx: 1024, cap: 1024) current readable data is not enough, continue to wait for. MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 1012, widx: 1024, cap: 1024) current readable data is not enough, continue to wait for. MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 1012, widx: 4172, cap: 8192) ==== The server received the following message ==== Length =24 content = Hello, THIS is Zhang SAN! Number of packets received by the server =37Copy the code

As you can see from the code above, when the message length is insufficient, the program will wait to read, wait until the next time the message length is sufficient, and then continue to read, as we can see

PooledUnsafeDirectByteBuf(ridx: 1012, widx: 4172, cap: 8192)
Copy the code

Ridx is 1012, the previous widx is 1024, that is, there are two remaining tape read length is not enough, and then the program does not read, wait until the next character length is no problem to continue to read, so as to avoid TCP in the data transmission, automatic to stick or unpack caused by data confusion.