What did Netty do?

Netty is a network communication framework based on NIO, which makes a lot of improvements and abstractions to the NIO API provided by the JDK. Without Netty, we would need to do the following:

  • How can the framework be designed to ensure maintainability and extensibility
  • How to define the thread model: which threads handle Accept events, which threads read and write IO, and which threads handle business logic
  • NIO bug in JDK, such as empty training bug
  • How to serialize and deserialize
  • How to handle TCP sticky packets
  • How to do timeout detection, abnormal reconnection and so on

Netty abstracts the whole communication process between client and server, designs a reactor thread model to receive IO events such as accept, read and write of channel, and abstracts a processing chain pipeline composed of ChannelHandler to process IO events. The following is the Netty Server processing flow diagram (there is no Boss NioEventLoopGroup on the Client) :Combined with the figure above, the core process of Server processing IO can be summarized as follows:

  • Boss NioEventLoopGroupResponsible for accepting the Channel’s Accept event, willacceptedThe channel toWorker NioEventLoopGroupOne of theNioEventLoop, as Worker’sNioEventLoopThey have their ownSelector.channelIn theSelectorRegister read and write events on
  • As a WorkerNioEventLoopThey’re constantly pollingSelectorOn the event, callprocessSelectedKeysMethod to handle IO events, according to the inbound and outbound scenarios, and finally topipelineTo invoke theChannelHandlerChain to deal with

In this design, the actual workload of the developer is based on the characteristics of the businessChannelHandler, implement the corresponding event processing method, and register to in orderpipelineFor example, dubbo extends its own serialization and deserialization handlers, as well as those containing the business thread pool logicAllChannelHandler. This section describes the core components of NettyChannelHandlerInterface, which defines basic IO operations and is divided into two sub-interfaces, respectively representing the inbound (data read process) and outbound (data write process) operations. The class diagram is as follows:

ChannelInboundHandlerThe interface defines various events for the inbound process:

ChannelOutboundHandlerThe interface defines various events for the inbound process:

ChannelHandlerPrior to joiningpipelineThe chain will be packaged intoAbstractChannelHandlerContextClass,AbstractChannelHandlerContextA class is a bidirectional linked list,pipelineA head and tail node are maintained to find the entry of the processing chain. The internal structure of the final pipeline is shown as follows:

After having a general understanding of the whole Netty process, we analyze the source code

Source code analysis

In combination withThe official documentationThe Netty version of my debug is 4.1.25.Final. The official document teaches us step by step how to change from a simplest demo to a demo that involves serialization, deserialization and unpacking logic. I have sorted out the demo on the official websitegithubIf you are interested, you can download your debug, first say the use of several classes in the demo, and then analyze the source code:

  • TimerServer: starts a Netty Server
  • TimeServerHandler: Custom Server side processor that writes a UnixTime object when a client is connected
  • TimeEncoder: custom encoding handler that the Server uses to encode UnixTime objects into bytes
  • TimeClient: Responsible for starting a Netty Client
  • TimeClientHandler: a custom client processor that reads IO data and prints it
  • TimeDecoder: Custom decoder class that the client uses to deserialize bytes read from IO into UnixTime objects
  • UnixTime: indicates the time object

This project is relatively simple, that is, when the Sever end receives the connection of the Client end, it will write the UnixTime object into the channel through TimeServerHandler, and then encode the UnixTime object through the TimeEncoder and send it through the channel. The Client reads the data from the channel, decodes it into a UnixTime object and prints it out

Here to the Server side start, Server side write process, Client side read process three steps to analyze the source code.

Server startup process

TimerServer

        // boss
        EventLoopGroup bossGroup = new NioEventLoopGroup(); / / (1)
        // worker
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            / / Netty Server side
            ServerBootstrap b = new ServerBootstrap(); / / (2)
            / / set group
            b.group(bossGroup, workerGroup)
                    // Set the current Server Channel type to NioServerSocketChannel
                    .channel(NioServerSocketChannel.class) / / (3)
                    // Set pipeline handler chain for all accepted channels
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeEncoder(), newTimeServerHandler()); }});// Bind the port, initialize NioServerSocketChannel's pipeline, and register event listeners
            ChannelFuture f = b.bind(port).sync(); / / (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            / / close
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
Copy the code

You can see that the following things are done during Server startup:

  • Configure Boss and WorkerNioEventLoopGroup
  • Pipeline setup for the Accepted channelChannelHandlerchain
  • throughbind(port)Method to complete the binding logic, this method is more core, the main logic is as follows:
    • instantiationNioServerSocketChannel
    • With the init method, isNioServerSocketChannelInitializes the pipeline configuration, including for initializationChannelHandlerThe chainChannelInitializerClass (This class adds importantServerBootstrapAcceptorIs mainly responsible for registering the Accepted channel to a Selector of a NioEventLoop in the worker.
    • Choose one of the bossesNioEventLoopAnd willNioServerSocketChannelRegister it with the Selector
    • So once you register the Selector, activate the Selector in initChannelInitializerConfiguration, perfectionChannelHandlerchain
    • forNioServerSocketChannelBinding port

The most important port binding logic is left at the end, and the port binding logic is performed only after the initial configuration and Selector registration is successful. In this process, asynchronous operations (performed asynchronously with NioEventLoop) and event notifications are used extensively.

b.bind(port)Source code, eventually called todoBindMethods:


private ChannelFuture doBind(final SocketAddress localAddress) {
    // initializes NioServerSocketChannel and registers it with a Selector
    finalChannelFuture regFuture = initAndRegister(); .// Wait for the result asynchronously, add the result Listener, and do the logic according to the final success or failure
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    // Add a listener for the operationComplete event for the regFuture. Bind the port logic based on the result only after the Selector is registered and initialized
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if(cause ! =null) {
                // Set the failed state
                promise.setFailure(cause);
            } else {
                //
                promise.registered();
                // Bind portsdoBind0(regFuture, channel, localAddress, promise); }}});return promise;
  
}
Copy the code

InitAndRegister () is divided into two steps: init and register:

final ChannelFuture initAndRegister(a) {
    Channel channel = null; .// Reflection constructor to instantiate NioServerSocketChannel
    channel = channelFactory.newChannel();
    // initializes the pipeline handler chain for NioServerSocketChannelinit(channel); . ChannelFuture regFuture = config().group().register(channel); .return regFuture;
}

Copy the code

The init(channel) code looks like this: The key logic of this method is to add ServerBootstrapAcceptor to the NioserverSocketChannel pipeline, This class registers the Accepted channel to a Selector of a NioEventLoop in the worker and assigns tasks to woker

@Override
    void init(Channel channel) throws Exception {... ChannelPipeline p = channel.pipeline(); .// add ServerBootstrapAcceptor to NioserverSocketChannel pipeline
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if(handler ! =null) {
                    pipeline.addLast(handler);
                }
                // Add ServerBootstrapAcceptor asynchronously
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run(a) {
                        pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code

Register (channel) code is as follows:

@Override
  public ChannelFuture register(Channel channel) {
      return next().register(channel);
  }
  
 
private void register0(ChannelPromise promise) {...// Call the Java channel API to register the selected Selector on the NioEventLoopdoRegister(); .// Add ServerBootstrapAcceptor by calling the initChannel method of ChannelInitializerpipeline.invokeHandlerAddedIfNeeded(); .// Start the ChannelFutureListener, including the ChannelFutureListener in the b.bind(port) code above
    safeSetSuccess(promise);
    // Invoke the Registered event handler chainpipeline.fireChannelRegistered(); . }Copy the code

Bound port: Logic for performing port binding asynchronously

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run(a) {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code

This is the end of the Server startup process, the whole process is very simple, only requires your own little configuration and some custom ChannelHandler. To summarize the core logic:

  • instantiationNioServerSocketChannel
  • willNioServerSocketChannelRegister to a Selector of a NioEventLoop in the Boss
  • To perfect the pipelineChannelHandlerChain, including the most importantServerBootstrapAcceptor
  • forNioServerSocketChannelBinding port

The source code uses a large number of asynchronous, event-driven implementations to link these core processes together.

IO write process

The read/write process is finally handled by the ChannelHandler chain maintained by pipeline. In this project, the Server will write a UnixTime object to the channel after receiving the connection from the Client. The code is in the customized TimeServerHandler. The channelActive method is triggered when the connection to the Client is successful. Let’s use this simple logic to see how the writing process works.


/ * * *@author HJ
 * @dateThe 2021-06-05 * * /
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { / / (1)
    // Write the current time to the channel after the client connection is successful
        ChannelFuture f = ctx.writeAndFlush(newUnixTime()); f.addListener(ChannelFutureListener.CLOSE); }}Copy the code

It can be seen from the code that the UnixTime object is written, which must be serialized into byte and sent through the socket. Therefore, the TimeEncoder class responsible for serialization and encoding will be added in the Server startup code.

.childHandler(new ChannelInitializer<SocketChannel>() {
  @Override
  public void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new TimeEncoder(), newTimeServerHandler()); }});Copy the code

UnixTimeIs a timestamp, the code is as follows:

public class UnixTime {
    private final longvalue; . }Copy the code

Accepted channelpipelineThe structure is as follows:

We look at thectx.writeAndFlush(new UnixTime())The process ofctxispipelinetheChannelHandlerA linked list node in a chain (see illustration above),invokeWriteAndFlushMethods are eventually dividedwriteandflushPart two:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else{ writeAndFlush(msg, promise); }}Copy the code

invokeWrite0(msg, promise);Write data to the area to be flushed, wait for the final flush operation to be sent through the socket, and then switch to the following method:

private void write(Object msg, boolean flush, ChannelPromise promise) {
  // Get the next outbound ChannelHandler, source code below
  AbstractChannelHandlerContext next = findContextOutbound();
  final Object m = pipeline.touch(msg, next);
  // Executor is the bound NioEventLoop
  EventExecutor executor = next.executor();
  // Determine if NioEventLoop is executing itself. If so, execute it directly. If not, encapsulate a task to be executed asynchronously by NioEventLoop
  if (executor.inEventLoop()) {
      if (flush) {
          next.invokeWriteAndFlush(m, promise);
      } else{ next.invokeWrite(m, promise); }}else {
  // Encapsulate a task to be executed asynchronously by NioEventLoop
      AbstractWriteTask task;
      if (flush) {
          task = WriteAndFlushTask.newInstance(next, m, promise);
      }  else{ task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); }}// Run the list from tail to head to find the outbound ChannelHandler
private AbstractChannelHandlerContext findContextOutbound(a) {
  AbstractChannelHandlerContext ctx = this;
  do {
      ctx = ctx.prev;
  } while(! ctx.outbound);return ctx;
}
Copy the code

The main logic of the method is inChannelHandlerFind the next exit in the chainChannelHandlerAnd leave it to him, according to thepipelineThe diagram shows that the next outbound operation isTimeEncoder(Responsible for converting POJO objects into bytes), the debug screenshot is as follows:

The TimeEncoder code is as follows: Convert the UnixTime POJO object into byte and write it into buffer

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {

    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
        out.writeInt((int)msg.value()); }}Copy the code

Finally, the write method of the head node is called. The debug diagram is as follows:

Instead of actually sending the message through the socket, the message is wrapped into oneEntryUploaded to a uploading list that should be flushed with debug as follows:

Now invokeFlush0(); Method: The flush method on the head node is eventually called:

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}
Copy the code

The Unsafe class of Netty, which interacts directly with SocketChannel, calls the following code:


protected void doWrite(ChannelOutboundBuffer in) throws Exception {
  SocketChannel ch = javaChannel();
  int writeSpinCount = config().getWriteSpinCount();
  do{...// Place all data to be flushed in this array
      ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
      intnioBufferCnt = in.nioBufferCount(); .// Send data through SocketChannel
      switch (nioBufferCnt) {
          case 0:
              // We have something else beside ByteBuffers to write so fallback to normal writes.
              writeSpinCount -= doWrite0(in);
              break;
          case 1: {...final intlocalWrittenBytes = ch.write(buffer); .break;
          }
          default: {...final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); .break; }}}while (writeSpinCount > 0);

  incompleteWrite(writeSpinCount < 0);
}

Copy the code

The write process is mainly through the outbound ChannelHandler chain maintained by pipeline (traversing the outbound ChannelHandler in the head direction), which is divided into write and flush steps. The first step is to write data to the linked list to be flushed. The Unsafe class then calls Flush to send all data through the SockeChannel, and ultimately the write and flush methods on the Unsafe class, using a lot of asynchronous event-driven processing.

IO read process

The read process is the same as the write process, which is handled by pipeline’s ChannelHandler chain. The difference is that the read process is an inbound operation, from the head node to the tail direction. In this project, the Client will read data.

conclusion

Through the above analysis, we can see the principle of Netty’s asynchronous event-driven. The main core components include Boss thread pool, Worker thread pool, NioEventLoop, Pipeline and ChannelHandler, among which NioEventLoop is the core of the event-driven. Is the engine of the whole frame, there are two core methods:

  • processSelectedKeysMethod handles THE ACCEPT, read, and write IO events
  • runAllTaskTo handle other asynchronous tasks, such as registration, activation, connection, binding, write data, and other event tasks, as well as custom tasks

The architecture is clear and the responsibilities of each component are clear.