How to choose encoder when using Netty for communication development? How to solve the TCP sticky/unpack problem? What is the startup process on the server? What is the connection service process?

A codec

1.1 What is a codec

In the process of network transmission, data is transmitted by byte stream. The client converts the original data format into bytes during data transfer, called encoding. The server converts the bytes into their original format, called decoding. Generally referred to as the codec. The codec is divided into two parts – the encoder and the decoder, with the encoder responsible for outbound and the decoder for inbound.

1.2 decoder

1.2.1 overview

The decoder is responsible for inbound operations, so it must also implement the ChannelInboundHandler interface, so the decoder is essentially a ChannelHandler. We only need to inherit a custom codec ByteToMessageDecoder (Netty provides an abstract class, inheritance ChannelInboundHandlerAdapter), realize the decode (). Netty provides some common decoder implementations, right out of the box. As follows:

1 RedisDecoder Redis based decoder 2 XmlDecoder XML based decoder 3 JsonObjectDecoder Json data based decoder 4 HttpObjectDecoder HTTP protocol based decoderCopy the code

MessageToMessageDecoder Netty also provides MessageToMessageDecoder, a decoder that converts one format to another, and also provides some implementations as follows:

1 StringDecoder will receive ByteBuf into a string 2 ByteArrayDecoder will receive ByteBuf into a byte array 3 Base64Decoder Decodes Base64 encoded by ByteBuf or US-ASCII strings to ByteBuf.Copy the code

1.2.2 Converting byte Flow to Intger type (Case)

1 byte decoder

package com.haopt.netty.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; Public class ByteToIntegerDecoder extends ByteToMessageDecoder {/** ** @param CTX context * @param in Input ByteBuf message data * @override Protected void decode(ChannelHandlerContext CTX, ByteBuf in, List<Object> out) throws Exception {if(in.readableBytes() >= 4){// Int takes up 4 bytes. Out.add (in.readint ()); // Read data of type int, put it into output, complete data type conversion}}}Copy the code

2 Handler

package com.haopt.netty.codec; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Integer i = (Integer) msg; System.out.println(" the message received by the server is: "+ I); }}Copy the code

Add a decoder to the pipeline

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline()
    .addLast(new ByteToIntegerDecoder())
    .addLast(new ServerHandler());
}
Copy the code

1.3 the encoder

1.3.1 overview

Convert the original format to bytes. To implement a custom decoder, we simply inherit the MessageToByteEncoder (which implements the ChannelOutboundHandler interface), which is essentially a ChannelHandler. Some encoders implemented in Netty are as follows:

1 ObjectEncoder Encoder the object (which needs to implement the Serializable interface) as a byte stream 2 SocksMessageEncoder Code SocksMessage as a byte stream 3 HAProxyMessageEncoder Encode the HAProxyMessage as a byte streamCopy the code

Netty also provides MessageToMessageEncoder, an encoder that converts one format to another, and also provides some implementations:

1 RedisEncoder the Redis object. 2 StringEncoder the string. 3 Base64Encoder the Base64 stringCopy the code

1.3.2 Encoding the Integer type into bytes for passing (Case)

  1. Custom encoder
package com.haopt.netty.codec.client; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg); }}Copy the code
  1. Handler
package com.haopt.netty.codec.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void ChannelRead0 (ChannelHandlerContext CTX, ByteBuf MSG) throws Exception {system.out. println(" Receives the message from the server: " + msg.toString(CharsetUtil.UTF_8)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(123); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
  1. pipeline
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new IntegerToByteEncoder());
    ch.pipeline().addLast(new ClientHandler());
}
Copy the code

Develop Http server

Through the HTTP decoder provided by Netty, HTTP server development.

2.1 Netty configuration

  1. server
package com.haopt.netty.codec.http; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.stream.ChunkedWriteHandler; Public class NettyHttpServer {public static void main(String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap ServerBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); / / configuration server channel serverBootstrap. Channel (NioServerSocketChannel. Class); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void InitChannel (SocketChannel CH) throws Exception {ch.pipeline() // HTTP request decoder // Aggregates the URI and request body in the HTTP request into a complete FullHttpRequest object  .addLast(new HttpRequestDecoder()) .addLast(new HttpObjectAggregator(1024 * 128)) .addLast(new HttpResponseEncoder()) AddLast (new ChunkedWriteHandler()); addLast(new ServerHandler()); }}); // Worker thread handler ChannelFuture Future = serverbootstrap.bind (8080).sync(); // Worker thread handler ChannelFuture = serverbootstrap.bind (8080).sync(); System.out.println(" Server startup complete... ") ); // Wait for the server listening port to close future.channel().closeFuture().sync(); } the finally {/ / elegant close boss. ShutdownGracefully (); worker.shutdownGracefully(); }}}Copy the code
  1. ServerHandler
package com.haopt.netty.codec.http; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import java.util.Map; public class ServerHandler extends SimpleChannelInboundHandler<FullHttpRequest>{ @Override public void ChannelRead0 (ChannelHandlerContext CTX, FullHttpRequest Request) throws Exception {// Parse FullHttpRequest, Map<String, String> paramMap = new RequestParser(request).parse(); String name = paramMap.get("name"); // Construct response object FullHttpResponse httpResponse = new DefaultFullHttpResponse(httpversion.http_1_1, httpresponseStatus.ok); httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html; charset=utf-8"); StringBuilder sb = new StringBuilder(); sb.append("<h1>"); Sb. Append (" Hello, "+ name); sb.append("</h1>"); httpResponse.content().writeBytes(Unpooled.copiedBuffer(sb,CharsetUtil.UTF_8)); / / operation is completed, turn off the channel CTX. WriteAndFlush (httpResponse). AddListener (ChannelFutureListener. CLOSE); }}Copy the code
  1. RequestParser
package com.haopt.netty.codec.http; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.codec.http.multipart.Attribute; import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; import io.netty.handler.codec.http.multipart.InterfaceHttpData; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * public class RequestParser {private FullHttpRequest fullReq; Public RequestParser(FullHttpRequest req) {this.fullReq = req; } /** ** Parse request parameters * @return contains all request parameters key-value pairs, if there are no parameters, return empty Map ** @throws IOException */ public Map<String, String> parse() throws Exception { HttpMethod method = fullReq.method(); Map<String, String> parmMap = new HashMap<>(); QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri()); Decoder.parameters ().entryset ().foreach (entry -> {// entry.getValue() is a List that takes only the first element parmap.put (entry.getKey(), entry.getValue().get(0)); }); } else if (HttpPostRequestDecoder == method) {decoder = new HttpPostRequestDecoder(fullReq);  decoder.offer(fullReq); List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas(); for (InterfaceHttpData parm : parmList) { Attribute data = (Attribute) parm; parmMap.put(data.getName(), data.getValue()); }} else {throw new RuntimeException(" other methods are not supported "); } return parmMap; }}Copy the code
  1. object
package com.haopt.netty.codec.obj; public class User implements java.io.Serializable { private static final long serialVersionUID = -89217070354741790L; private Long id; private String name; private Integer age; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } @Override public String toString() { return "User{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; }}Copy the code

2.2 the service side

  1. NettyObjectServer
package com.haopt.netty.codec.obj; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; Public class NettyObjectServer {public static void main(String[] args) throws Exception {// Main, does not process any business logic, EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap ServerBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); / / configuration server channel serverBootstrap. Channel (NioServerSocketChannel. Class); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel> () { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new ObjectDecoder(ClassResolvers.weakCachingResolver( this.getClass().getClassLoader() ))) .addLast(new ServerHandler()); }}); // Worker thread handler ChannelFuture Future = serverbootstrap.bind (6677).sync(); // Worker thread handler ChannelFuture = serverbootstrap.bind (6677).sync(); System.out.println(" Server startup complete... ") ); // Wait for the server listening port to close future.channel().closeFuture().sync(); } the finally {/ / elegant close boss. ShutdownGracefully (); worker.shutdownGracefully(); }}}Copy the code
  1. ServerHandler
package com.haopt.netty.codec.obj; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class ServerHandler extends SimpleChannelInboundHandler<User> { @Override public void ChannelRead0 (ChannelHandlerContext CTX, User User) throws Exception {// Obtain the User object System.out.println(User); ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); }}Copy the code

2.3 the client

  1. NettyObjectClient
package com.haopt.netty.codec.obj; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ObjectEncoder; public class NettyObjectClient { public static void main(String[] args) throws Exception{ EventLoopGroup worker = new NioEventLoopGroup(); Try {// The server Bootstrap class Bootstrap = new Bootstrap(); bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ClientHandler()); }}); ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); }}}Copy the code
  1. ClientHandler
package com.haopt.netty.codec.obj; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void ChannelRead0 (ChannelHandlerContext CTX, ByteBuf MSG) throws Exception {system.out. println(" Receives the message from the server: " + msg.toString(CharsetUtil.UTF_8)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { User user = new User(); user.setId(1L); User. Elegantly-named setName (" zhang "); user.setAge(20); ctx.writeAndFlush(user); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

2.4 Optimization of JDK serialization

JDK serialization is more convenient to use, but the performance is poor, serialized bytes are large, so generally in the project will not use its own serialization, but will use the third-party serialization framework Hessian encoding and decoding.

  1. Import dependence
<dependency>
  <groupId>com.caucho</groupId>
  <artifactId>hessian</artifactId>
  <version>4.0.63</version>
</dependency>
Copy the code
  1. The User object
package com.haopt.netty.codec.hessian; public class User implements java.io.Serializable{ private static final long serialVersionUID = -8200798627910162221L; private Long id; private String name; private Integer age; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } @Override public String toString() { return "User{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; }}Copy the code
  1. Hessian serialization utility class
package com.haopt.netty.codec.hessian.codec; import com.caucho.hessian.io.HessianInput; import com.caucho.hessian.io.HessianOutput; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * public class HessianSerializer {public <T> byte[] serialize(T obj) {ByteArrayOutputStream OS = new ByteArrayOutputStream(); HessianOutput ho = new HessianOutput(os); try { ho.writeObject(obj); ho.flush(); return os.toByteArray(); } catch (IOException e) { throw new RuntimeException(e); } finally { try { ho.close(); } catch (IOException e) { throw new RuntimeException(e); } try { os.close(); } catch (IOException e) { throw new RuntimeException(e); } } } public <T> Object deserialize(byte[] bytes, Class<T> clazz) { ByteArrayInputStream is = new ByteArrayInputStream(bytes); HessianInput hi = new HessianInput(is); try { return (T) hi.readObject(clazz); } catch (IOException e) { throw new RuntimeException(e); } finally { try { hi.close(); } catch (Exception e) { throw new RuntimeException(e); } try { is.close(); } catch (IOException e) { throw new RuntimeException(e); }}}}Copy the code
  1. The encoder
package com.haopt.netty.codec.hessian.codec; import cn.itcast.netty.coder.hessian.User; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class HessianEncoder extends MessageToByteEncoder<User> { private HessianSerializer hessianSerializer = new HessianSerializer(); protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception { byte[] bytes = hessianSerializer.serialize(msg); out.writeBytes(bytes); }}Copy the code
  1. decoder
public class HessianDecoder extends ByteToMessageDecoder { private HessianSerializer hessianSerializer = new HessianSerializer(); Protected void decode(ChannelHandlerContext CTX, ByteBuf in, List<Object> out) throws Exception { Incomplete copy // Avoid exceptions: Did not read anything but decoded a message ByteBuf in2 = in.retainedDuplicate(); byte[] dst; If (in2.hasarray ()) {// heap buffer mode DST = in2.array(); } else { dst = new byte[in2.readableBytes()]; in2.getBytes(in2.readerIndex(), dst); } // Skip all bytes, indicating that in.skipbytes (in.readableBytes()) has been read; . / / deserialize the Object obj = hessianSerializer deserialize (DST, the User. The class); out.add(obj); }}Copy the code
  1. The service side
public class NettyHessianServer { public static void main(String[] args) throws Exception { // System.setProperty("io.netty.noUnsafe", "true"); EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap ServerBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); / / configuration server channel serverBootstrap. Channel (NioServerSocketChannel. Class); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel> () { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new HessianDecoder()) .addLast(new ServerHandler()); }}); . / / / / worker thread processor serverBootstrap childOption (ChannelOption. ALLOCATOR, UnpooledByteBufAllocator. DEFAULT); ChannelFuture future = serverBootstrap.bind(6677).sync(); System.out.println(" Server startup complete... ") ); // Wait for the server listening port to close future.channel().closeFuture().sync(); } the finally {/ / elegant close boss. ShutdownGracefully (); worker.shutdownGracefully(); }}}Copy the code
public class ServerHandler extends SimpleChannelInboundHandler<User> { @Override public void ChannelRead0 (ChannelHandlerContext CTX, User User) throws Exception {// Obtain the User object System.out.println(User); ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); }}Copy the code
  1. The client
public class NettyHessianClient { public static void main(String[] args) throws Exception { EventLoopGroup worker = new NioEventLoopGroup(); Try {// The server Bootstrap class Bootstrap = new Bootstrap(); bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HessianEncoder()); ch.pipeline().addLast(new ClientHandler()); }}); ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); }}}Copy the code
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void ChannelRead0 (ChannelHandlerContext CTX, ByteBuf MSG) throws Exception {system.out. println(" Receives the message from the server: " + msg.toString(CharsetUtil.UTF_8)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { User user = new User(); user.setId(1L); User. Elegantly-named setName (" zhang "); user.setAge(20); ctx.writeAndFlush(user); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

TCP sticky/unpack problems and solutions

2.1 ReplayingDecoder

  1. Custom decoder to change buf to int
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { if (buf.readableBytes() < 4) { return; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return; } out.add(buf.readBytes(length)); }}Copy the code

2. Optimize with ReplayingDecoder

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
      	out.add(buf.readBytes(buf.readInt()));
    }
}
Copy the code
  1. Instructions for ReplayingDecoder
1 uses a special ByteBuf called ReplayingDecoderByteBuf to extend ByteBuf 2 overwrites ByteBuf's readXxx() and other methods to check for readable bytes. ReplayingDecoder overwrites ByteToMessageDecoder's callDecode() method Catch Signal and reset ByteBuf's readerIndex in the catch block. 4 Continue to wait for data until data is available. In this way, the required data can be read. The generic S in class 5 definition is a state machine enumeration class used to record decoding state, which is used in state(S), checkpoint(S) and other methods. Java.lang.Void can also be used as a placeholder for simple decoding.Copy the code
  1. Pay attention to
1 Part of buffer operations (readBytes(ByteBuffer DST), retain(), release() and other methods directly throw exceptions) 2 In some cases affect performance (such as decoding the same message more than once)Copy the code

Inherit ReplayingDecoder, error examples and modifications

// This is an example of an error: // The message contains two integers, the decode method is called twice, the queue size is not equal to 2, the code does not achieve the desired result. public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>();  @Override public void decode(ByteBuf buf, List<Object> out) throws Exception { // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); assert values.size() == 2; out.add(values.poll() + values.poll()); }}Copy the code
// Correct way to do this:  public class MyDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<Integer>(); @Override public void decode(ByteBuf buf, List<Object> out) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; out.add(values.poll() + values.poll()); }}Copy the code

The realization of the ByteToIntegerDecoder2

Public class ByteToIntegerDecoder2 extends ReplayingDecoder<Void> {/** * @param CTX context * @param in Input ByteBuf message data * @override Protected void decode(ChannelHandlerContext CTX, ByteBuf in, List<Object> out) throws Exception { out.add(in.readInt()); // Read data of type int, put it into output, complete data type conversion}}Copy the code

2.2 Reoccurrence of Unpacking and Sticking Packets (The Client Sends 10 Data to the Server)

  1. Client startup class
public class NettyClient { public static void main(String[] args) throws Exception{ EventLoopGroup worker = new NioEventLoopGroup(); Try {// The server Bootstrap class Bootstrap = new Bootstrap(); bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientHandler()); }}); ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); }}}Copy the code
  1. The client ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf MSG) throws Exception {system.out.println (" Message received from the server: "+ msg.toString(charsetutil.utf_8)); System.out.println(" Number of messages received from the server: "+ (++count)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer("from client a message!" , CharsetUtil.UTF_8)); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
  1. The service side NettyServer
Public class NettyServer {public static void main(String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap ServerBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); / / configuration server channel serverBootstrap. Channel (NioServerSocketChannel. Class); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new ServerHandler()); }}); // Worker thread handler ChannelFuture Future = serverbootstrap.bind (5566).sync(); // Worker thread handler ChannelFuture = serverbootstrap.bind (5566).sync(); System.out.println(" Server startup complete... ") ); // Wait for the server listening port to close future.channel().closeFuture().sync(); } the finally {/ / elegant close boss. ShutdownGracefully (); worker.shutdownGracefully(); }}}Copy the code
  1. ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf MSG) throws Exception {system.out.println (" server receives message: "+ msg.toString(charsetutil.utf_8)); System.out.println(" Number of messages received by the server: "+ (++count)); ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8)); }}Copy the code

2.2 What Is TCP Packet Sticking and Unpacking

TCP is transmitted by streams, and streams are data without boundaries. The server accepts data from the client without knowing whether it is one or more. It is not known how the server unpacks.

Therefore, when data is transferred between the server and the client, the packet unpacking rules must be formulated. The client sticks packages according to this rule, and the server unpacks packages according to this rule. If there is any violation of this rule, the server cannot get the expected data.

  1. Solutions (three kinds)
1. Add a header to the sent packet to store the size of the data in the header. Then the server can read the data according to the size. 2. Send the data with a fixed length. If the data exceeds the specified length, send it in multiple times. 3. Set a boundary between data packets, such as adding special symbols, so that the receiver can separate different data packets through this boundary.Copy the code

2.3 Actual combat: Solve TCP packet sticking/unpacking problem

  1. Custom protocol
public class MyProtocol { private Integer length; Private byte[] body; Public Integer getLength() {return length; } public void setLength(Integer length) { this.length = length; } public byte[] getBody() { return body; } public void setBody(byte[] body) { this.body = body; }}Copy the code
  1. The encoder
public class MyEncoder extends MessageToByteEncoder<MyProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception { out.writeInt(msg.getLength()); out.writeBytes(msg.getBody()); }}Copy the code
  1. decoder
public class MyDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int length = in.readInt(); Byte [] data = new byte[length]; // Define byte array by length in.readbytes (data); MyProtocol MyProtocol = new MyProtocol(); myProtocol.setLength(length); myProtocol.setBody(data); out.add(myProtocol); }}Copy the code
  1. The client ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<MyProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MyProtocol MSG) throws Exception {system.out.println (" Message received from the server: "+ new String(msg.getBody(), charsetutil.utf_8)); System.out.println(" Number of messages received from the server: "+ (++count)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { byte[] data = "from client a message!" .getBytes(CharsetUtil.UTF_8); MyProtocol myProtocol = new MyProtocol(); myProtocol.setLength(data.length); myProtocol.setBody(data); ctx.writeAndFlush(myProtocol); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
  1. NettyClient
public class NettyClient { public static void main(String[] args) throws Exception{ EventLoopGroup worker = new NioEventLoopGroup(); Try {// The server Bootstrap class Bootstrap = new Bootstrap(); bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MyEncoder()); ch.pipeline().addLast(new MyDecoder()); ch.pipeline().addLast(new ClientHandler()); }}); ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); }}}Copy the code
  1. ServerHandler
public class ServerHandler extends SimpleChannelInboundHandler<MyProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MyProtocol MSG) throws Exception {system.out.println (" The server receives the message: "+ new String(msg.getBody(), charsetutil.utf_8)); System.out.println(" Number of messages received by the server: "+ (++count)); byte[] data = "ok".getBytes(CharsetUtil.UTF_8); MyProtocol myProtocol = new MyProtocol(); myProtocol.setLength(data.length); myProtocol.setBody(data); ctx.writeAndFlush(myProtocol); }}Copy the code
  1. NettyServer
Public class NettyServer {public static void main(String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap ServerBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); / / configuration server channel serverBootstrap. Channel (NioServerSocketChannel. Class); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new MyDecoder()) .addLast(new MyEncoder()) .addLast(new ServerHandler()); }}); // Worker thread handler ChannelFuture Future = serverbootstrap.bind (5566).sync(); // Worker thread handler ChannelFuture = serverbootstrap.bind (5566).sync(); System.out.println(" Server startup complete... ") ); // Wait for the server listening port to close future.channel().closeFuture().sync(); } the finally {/ / elegant close boss. ShutdownGracefully (); worker.shutdownGracefully(); }}}Copy the code
  1. test

Netty core source code analysis

3.1 Analyzing the Server Startup Process

  1. Create a server Channel
1 ServerBootstrap bind() InitAndRegister () in AbstractBootstrap 2 is used to create a Channel. This is done by the newChannel() method in ReflectiveChannelFactory. In the constructor of NioServerSocketChannel, open ServerSocketChannel through JDK NIO underlying SelectorProvider. AbstractNioChannel = ch.configureBlocking(false); The AbstractChannel constructor creates the ID, unsafe, and Pipeline contents. 6 through NioServerSocketChannelConfig access to TCP at the bottom of some parametersCopy the code
  1. Initialize a server Channel
1 AbstractBootstrap initAndRegister() initializes a channel with init(channel); 2 Set channelOptions and Attributes in the Init () method of ServerBootstrap. Serverbootstrap.handler () will be added to the pipeline if serverBootstrap.handler() is set. ServerBootstrapAcceptor: add childHandler to connect pipeline:Copy the code
ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast( new ServerBootstrapAcceptor(ch, currentChildGroup,currentChildHandler, currentChildOptions, currentChildAttrs)); }});Copy the code
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object MSG) {final Channel child = (Channel) MSG; // Add a custom childHandler to the connected pipeline child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { childGroup.register(child).addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { if (! future.isSuccess()) { forceClose(child, future.cause()); }}}); } catch (Throwable t) { forceClose(child, t); }}Copy the code
  1. Register the selector
InitAndRegister () regFuture = config().group().register(channel); 2 in io.net ty. Channel. AbstractChannel. AbstractUnsafe# register () do the actual registered 2.1 AbstractChannel. Enclosing eventLoop = eventLoop; Perform the assignment operation of the eventLoop, and the subsequent IO events will be performed by the eventLoop. 2.2 call register0 (the promise) of doRegister () the actual registered 3 io.net ty. Channel. Nio. AbstractNioChannel# doRegister method implementationCopy the code
JavaChannel () -- channel created earlier //eventLoop().unwrappedSelector() -- get selector // Register the event of interest as 0, It says there's no event that I'm interested in, and then I'm going to re-register the event and I'm going to register this object as an attachment to the selector, SelectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);Copy the code
  1. Binding port
1 entry on the io.net ty. The bootstrap. AbstractBootstrap# doBind0 (), Start a thread to perform binding port operations 2 call io.net ty. Channel. AbstractChannelHandlerContext# bind (java.net.SocketAddress, Io.net ty. Channel. ChannelPromise) method, again start threads execute 3 final call io.net ty. Channel. Socket. Nio. NioServerSocketChannel# doBind () method to bind operationsCopy the code
@SuppressJava6Requirement(reason = "Usage Guarded by Java Version check") @override protected void  doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code

When do I update the master/slave event of the selector? Finally at io.net ty. Channel. Nio. AbstractNioChannel# doBeginRead () method

protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (! selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); Public NioServerSocketChannel(ServerSocketChannel) {// Set the event of interest to OP_ACCEPT}} channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }Copy the code

3.2 Connection request process source code analysis

  1. New Connection access
Io.net ty. The entrance channel. Nio. NioEventLoop# processSelectedKey (Java. Nio. Channels. SelectionKey, Io.net ty. Channel. Nio. AbstractNioChannel) into NioMessageUnsafe the read () method Call io.net ty. Channel. Socket. Nio. NioServerSocketChannel# doReadMessages () method, to create the JDK at the bottom of the channel, and encapsulate NioSocketChannel added to the List containerCopy the code
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch ! = null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }Copy the code
Create NioSocketChannel object new NioSocketChannel(this, ch), The selectionkey. OP_READ event is passed to identify the creation ID, unsafe, and pipeline object setting ch.configureBlocking(false); Create a NioSocketChannelConfig objectCopy the code
  1. Register read events
On the io.net ty. Channel. Nio. AbstractNioMessageChannel. The NioMessageUnsafe: for (int I = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); // Propagate read events}Copy the code
At io.net ty. Channel. AbstractChannelHandlerContext# invokeChannelRead (java.lang.object) method in private void InvokeChannelRead (Object MSG) {if (invokeHandler()) {try { The second one is ServerBootstrapAcceptor // Registration and event binding with ServerBootstrapAcceptor entering the same logic as the new connection's registered Selector ((ChannelInboundHandler)) handler()).channelRead(this, msg); } catch (Throwable t) { invokeExceptionCaught(t); } } else { fireChannelRead(msg); }}Copy the code

Use Netty optimization points

4.1 zero copy

1 Bytebuf uses out-of-heap memory as a pooled Direct Buffer. There is no need for a second copy of the byte Buffer. If Bytebuf uses heap memory, the JVM copies it to the heap first and then writes it to the Socket. 2 CompositeByteBuf encapsulates multiple ByteBuFs into one ByteBuf without process copy when adding bytebuFs. 3 The transferTo method of Netty's file transfer class DefaultFileRegion sends files to the target channel without cyclic copying, improving performance.Copy the code

4.2 EventLoop task scheduling

channel.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
    	channel.writeAndFlush(data)
    }
});
Copy the code

Instead of using hannel.writeandFlush (data); EventLoop task scheduling is placed directly into the execution queue of the EventLoop corresponding to the channel, which causes thread switching. Note: At the bottom of writeAndFlush, a new thread is started if it is not executed via eventLoop.

4.3 Reduce ChannelPipline call length

public class YourHandler extends ChannelInboundHandlerAdapter { @Override public void ChannelActive (ChannelHandlerContext CTX) {// MSG goes through the entire ChannelPipline, all handlers go through it. ctx.channel().writeAndFlush(msg); // From the current handler to the end of the pipline, shorter calls. ctx.writeAndFlush(msg); }}Copy the code

4.4 Reducing ChannelHandler Creation (rarely configured)

If channelHandler is stateless (that is, no state parameters need to be saved), use Sharable annotations and create only one instance at bootstrap time to reduce GC. Otherwise, each connection will generate a new handler object.

@ChannelHandler.Shareable public class StatelessHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) {} } public class MyInitializer extends ChannelInitializer<Channel> { private static final ChannelHandler INSTANCE = new StatelessHandler(); @Override public void initChannel(Channel ch) { ch.pipeline().addLast(INSTANCE); }}Copy the code

Note: Codecs such as ByteToMessageDecoder are stateful and cannot use Sharable annotations.

4.5 Setting Parameters

The server bossGroup only needs to be set to 1, because ServerSocketChannel will only register with one eventLoop during initialization, and this eventLoop will only have one thread running, so there is no need to set it to multiple threads. For IO threads, the workGroup is usually set to twice the number of CPU cores, which is the default provided by Netty, in order to make full use of the CPU and reduce the overhead of line context switching. In scenarios with high response times, use. ChildOption (channeloption.tcp_nodelay, true) and. Option(channeloption.tcp_nodelay, true) to disable nagle without waiting. Send it immediately.Copy the code

Five ByteBuf API

  1. Sequential read API





2. Sequential write operations

Netty articles

Seven layer protocol and TCP/IP protocol, three handshake four wave, BIO, NIO(Netty front) one entry Netty (Netty a) this article in the follow-up in the project to use Netty problems and solutions, to be continued……