This is the 9th day of my participation in the August More Text Challenge. For details, see:August is more challenging

preface

We also gave a preliminary introduction to the ChannelPipeline interface in the previous article.

Netty source code analysis series (2) Netty architecture design

ChannelPipeline interface

ChannelPipeline interface uses the chain of responsibility design mode, the bottom layer using two-way linked list data structure, the chain of each processor series. Every time a client request comes in, all processors in ChannelPipeline have a chance to process it.

Each newly created Channel will be assigned a new ChannelPipeline. The association is permanent; A Channel can neither attach another ChannelPipeline nor separate its current one.

Create a ChannelPipeline

ChannelPipeline A data pipeline is bound to a Channel pipeline. A Channel Channel corresponds to a ChannelPipeline. A ChannelPipeline is created when a Channel is initialized.

Consider the following example:

public void run(a) throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup(); / / (1)
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap(); / / (2)
        b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class) / / (3)
            .childHandler(new ChannelInitializer<SocketChannel>() { / / (4)
                @Override
                public void initChannel(SocketChannel ch) throws Exception {

                    // Add ChannelHandler to ChannelPipeline
                    ch.pipeline().addLast(new DiscardServerHandler());
                }
            })
            .option(ChannelOption.SO_BACKLOG, 128)          / / (5)
            .childOption(ChannelOption.SO_KEEPALIVE, true); / / (6)

        // Bind the port and start receiving incoming connections
        ChannelFuture f = b.bind(port).sync(); / / (7)

        System.out.println("DiscardServer started, port:" + port);

        // Wait for the server socket to close.
        // In this example, this won't happen, but you can gracefully shut down your server.
        f.channel().closeFuture().sync();
    } finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}Copy the code

When ServerBootstrap is initialized, the SocketChannel pipeline can be fetched without manual instantiation. Because Netty creates a ChannelPipeline for each Channel connection.

Most subclasses of Channel inherit from AbstractChannel, and the AbstractChannel constructor is also called when an instance is created. The AbstractChannel constructor creates a ChannelPipeline instance with the following core code:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    this.id = this.newId();
    this.unsafe = this.newUnsafe();
    this.pipeline = this.newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline(a) {
    return new DefaultChannelPipeline(this);
}
Copy the code

As you can see from the code above, when a Channel is created, an instance of the DefaultChannelPipeline class is created by the Channel. DefaultChannelPipeline is the default implementation of ChannelPipeline.

Pipeline is AbstractChannel properties, internal maintains a AbstractChannelHandlerContext as nodes of two-way linked list, create a head and tail node points to head end respectively, the source code is as follows:

public class DefaultChannelPipeline implements ChannelPipeline {   	

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head; }...final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, DefaultChannelPipeline.TailContext.class);
            this.setAddComplete(); }... }final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler.ChannelInboundHandler {
        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, DefaultChannelPipeline.HeadContext.class);
            this.unsafe = pipeline.channel().unsafe();
            this.setAddComplete(); }... }... }Copy the code

From the source code as you can see, the above TailContext and HeadContext inherited AbstractChannelHandlerContext, and implements the ChannelHandler interface. AbstractChannelHandlerContext internal maintains next and prev list Pointers and direction of inbound, outbound node. TailContext implements ChannelInboundHandler, HeadContext implements ChannelOutboundHandler and ChannelInboundHandler.

ChannelPipeline Event transmission mechanism

Through the ChannelPipeline addFirst () method to add ChannelHandler, and create a corresponding to the ChannelHandler DefaultChannelHandlerContext instance.

public class DefaultChannelPipeline implements ChannelPipeline {  
    / /...
    
	public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        AbstractChannelHandlerContext newCtx;
        synchronized(this) {
            checkMultiplicity(handler);
            name = this.filterName(name, handler);
            newCtx = this.newContext(group, name, handler);
            this.addFirst0(newCtx);
            if (!this.registered) {
                newCtx.setAddPending();
                this.callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if(! executor.inEventLoop()) {this.callHandlerAddedInEventLoop(newCtx, executor);
                return this; }}this.callHandlerAdded0(newCtx);
        return this;
    }
    
    / /...
    

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this.this.childExecutor(group), name, handler);
    }
    
    / /...

}
Copy the code

Handle outbound events

The following is an example of the channelRead() method when handling outbound events:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);
		
		// Write the message to the pipe
		ctx.write(msg);/ / write message
	}
	
	/ /...
}
Copy the code

The write() method in the above code fires an outbound event, which calls the write() method on DefaultChannelPipeline.

public final ChannelFuture write(Object msg) {
    return this.tail.write(msg);
}
Copy the code

As you can see from the above source code, you are calling the write method of the tail on DefaultChannelPipeline.

The above method will call to DefaultChannelHandlerContext write () method.

private void write(Object msg, boolean flush, ChannelPromise promise) {
    ObjectUtil.checkNotNull(msg, "msg");

    try {
        if (this.isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            return; }}catch (RuntimeException var8) {
        ReferenceCountUtil.release(msg);
        throw var8;
    }

    AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : 'yao');
    Object m = this.pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else{ next.invokeWrite(m, promise); }}else {
        AbstractChannelHandlerContext.WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }

}
Copy the code

The above write() method looks for the next outbound node, which is a ChannelHandler of an outbound type after the current ChannelHandler, and invokes the invokeWrite() method on the next node.

void invokeWrite(Object msg, ChannelPromise promise) {
    if (this.invokeHandler()) {
        this.invokeWrite0(msg, promise);
    } else {
        this.write(msg, promise); }}Copy the code

The invokeWrite0() method is then called, which eventually calls the Write method of ChannelOutboundHandler.

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler)this.handler()).write(this, msg, promise);
        } catch(Throwable var4) { notifyOutboundHandlerException(var4, promise); }}Copy the code

At this point, processing is finished with the first node, and the next node is executed and the loop continues.

Therefore, when handling outbound events, data is transferred from the tail node to the head node.

Handle inbound events

The starting point for inbound event processing is to trigger the ChannelPipeline Fire method, such as the following example of the fireChannelActive() method:

public class DefaultChannelPipeline implements ChannelPipeline {   	  
    / /...
    
    public final ChannelPipeline fireChannelActive(a) {
            AbstractChannelHandlerContext.invokeChannelActive(this.head);
            return this;
        }
    / /...
}
Copy the code

As you can see from the above source code, the node being processed is the head node. AbstractChannelHandlerContext. InvokeChannelActive methods are defined as follows:

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            public void run(a) { next.invokeChannelActive(); }}); }}Copy the code

This method eventually calls the channelActive method of the ChannelInboundHandler.

private void invokeChannelActive(a) {
    if (this.invokeHandler()) {
        try {
            ((ChannelInboundHandler)this.handler()).channelActive(this);
        } catch (Throwable var2) {
            this.invokeExceptionCaught(var2); }}else {
        this.fireChannelActive(); }}Copy the code

At this point, the processing of the first node is complete, and the continuous loop to the next node begins.

Therefore, when handling inbound events, data is transferred from the head node to the tail node.

The ChannelHandler ChannelPipeline

From the above ChannelPipeline interface source code can be seen, ChannelPipeline is through addXxx or removeXxx method to add the ChannelHandler dynamic ChannelPipeline, Or remove ChannelHandler from ChannelPipeline. So how is ChannelPipeline to ensure the security of concurrent access?

Take the addLast method as an example. The source code of DefaultChannelPipeline is as follows:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    AbstractChannelHandlerContext newCtx;
    Synchronized guarantees thread safety
    synchronized(this) {
        checkMultiplicity(handler);
        newCtx = this.newContext(group, this.filterName(name, handler), handler);
        this.addLast0(newCtx);
        if (!this.registered) {
            newCtx.setAddPending();
            this.callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if(! executor.inEventLoop()) {this.callHandlerAddedInEventLoop(newCtx, executor);
            return this; }}this.callHandlerAdded0(newCtx);
    return this;
}
Copy the code

As you can see from the above source code, the use of the synchronized keyword ensures secure access to threads. Other methods are implemented in similar ways.

ChannelHandlerContext interface

The ChannelHandlerContext interface is the link between ChannelHandler and ChannelPipeline.

A ChannelHandlerContext is created every time a ChannelHandler is added to the ChannelPipeline. The main function of ChannelHandlerContext is to manage the interaction between its associated ChannelHandler and other channelHandlers in the same ChannelPipeline.

For example, a ChannelHandlerContext can notify the next ChannelHandler in a ChannelPipeline to start executing and dynamically modify its own ChannelPipeline.

The ChannelHandlerContext contains a number of methods, some of which also appear in channels and Channel Pipelines. If these methods are called through a Channel or an instance of the ChannelPipeline, they propagate throughout the ChannelPipeline. In contrast, the same method called on an instance of ChannelHandlerContext will only start with the current ChannelHandler and propagate to the next ChannelHandler in the related pipeline capable of handling events. As a result, ChannelHandlerContext contains a shorter stream of events than the same methods in other classes, which can be used to maximize performance.

The relationship between ChannelHandlerContext and other components

The following diagram shows the relationship between ChannelPipeline, Channel, ChannelHandler and ChannelHandlerContext.

(1) The Channel is bound to ChannelPipeline.

(2) The ChannelPipeline bound to a Channel contains all channelHandlers.

(3) ChannelHandler.

(4) When adding ChannelHandler to ChannelPipeline, ChannelHandlerContext is created.

Skip certain channelHandlers

The following code shows how to get a reference to a Channel from the ChannelHandlerContext and trigger a write event into the stream by calling the write() method on the Channel.

ChannelHandlerContext ctx = context;
Channel channel = ctx.channel(); // Get the Channel in the ChannelHandlerContext
channel.write(msg);
Copy the code

The following code shows how to get ChannelPipeline from ChannelHandlerContext.

ChannelHandlerContext ctx = context;
ChannelPipeline pipeline = ctx.pipeline(); // Get ChannelPipeline on ChannelHandlerContext
pipeline.write(msg);
Copy the code

The flow of events is the same in both of the above examples. Although the called Channel and the write() method on the ChannelPipeline will always propagate events through the entire ChannelPipeline, at the ChannelHandler level, The movement of events from one ChannelHandler to the next is done by calls on the ChannelHandlerContext.

The following figure shows the event propagation mechanism for a Channel or ChannelPipeline.

As can be seen in the figure above:

(1) the event is passed to the first ChannelHandler of ChannelPipeline;

(2) The ChannelHandler passes the event to the next ChannelHandler in ChannelPipeline through the associated ChannelHandlerContext.

(3) The ChannelHandler passes the event to the next ChannelHandler in ChannelPipeline through the associated ChannelHandlerContext.

As you can see from the above process, if these methods are called through a Channel or an instance of ChannelPipeline, they will certainly propagate throughout the ChannelPipeline.

Is it possible to skip some processors? The answer is yes.

Reduce overhead by reducing the passing of events that are not of interest to ChannelHandler, and improve performance by removing specific handlers that are interested in the event. To implement processing from a particular ChannelHandler, you must refer to the ChannelHandlerContext associated with the previous ChannelHandler of this ChannelHandler. This ChannelHandlerContext will call the next ChannelHandler to its associated ChannelHandler, as follows:

ChannelHandlerContext ctx = context;
ctx.write(msg);
Copy the code

Calling the write() method of ChannelHandlerContext directly will send the buffer to the next ChannelHandler.

As shown in the figure below, messages will flow through the ChannelPipeline starting from the next ChannelHandler, bypassing all channelhandlers before it.

(1) Execute the ChannelHandlerContext method call.

(2) Events are sent to the next ChannelHandler.

(3) After the last ChannelHandler, the event is removed from ChannelPipeline.

It is especially useful when invoking a particular ChannelHandler operation.

Such as:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);

        // Write the message to the pipe
		ctx.write(msg);/ / write message
		ctx.flush(); // Flush the message
		
		// The above two methods are equivalent to ctx.writeAndFlush(MSG);
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

		// Close the connection when an exception occurscause.printStackTrace(); ctx.close(); }}Copy the code

conclusion

Above is about ChannelPipeline source analysis, I believe carefully read, you will understand the relationship between ChannelPipeline, Channel, ChannelHandler and ChannelHandlerContext. In the next section, we continue to parse the Netty source code.

At the end

I am a code is being hit is still trying to advance. If the article is helpful to you, remember to like, follow yo, thank you!