“This is the 15th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.

A list,

ChannelHandler is used to handle various events on a Channel, including inbound and outbound events.

All channelhandlers are connected into a string, called a Pipeline.

  • The inbound processor is usually ChannelInboundHandlerAdapter subclass, the client is mainly used to read data, write the result back.
  • The outbound processor is usually ChannelOutboundHandlerAdapter subclass, mainly for processing the result of the write back.

You can combine the various components of Netty with, for example, the following:

Second, code analysis

We through the form of a code, to show the relationship between the Handler and Pipeline, and ChannelInboundHandlerAdapter ChannelOutboundHandlerAdapter way of use. Client code, using the client used in previous articles, supports console input:

Public static void main(String[] args) throws Exception {// Gracefully, the NioEventLoopGroup group = new NioEventLoopGroup(); ChannelFuture channelFuture = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("localhost", 8080); Channel = channelfuture.sync ().channel(); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); If ("q".equals(line)) {system.out.println (" close channel"); // close channel.close(); break; } channel.writeAndFlush(line); } }, "input").start(); ChannelFuture closeFuture = channel.closeFuture(); AddListener ((ChannelFutureListener) Future -> group.shutdownListener ()); }Copy the code

2.1 ChannelInboundHandlerAdapter

Server code, first add three push handlers, and specify the names h1, H2, h3, print 1,2, and 3 respectively

public static void main(String[] args) { new ServerBootstrap().group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioServerSocketChannel) { ChannelPipeline pipeline = nioServerSocketChannel.pipeline(); pipeline.addLast("h1", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("1"); super.channelRead(ctx, msg); }}); pipeline.addLast("h2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("2"); super.channelRead(ctx, msg); }}); pipeline.addLast("h3", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("3"); super.channelRead(ctx, msg); }}); } }).bind(8080); }Copy the code

Start the client and server, respectively. The client enters whatever it wants, and the server gets the following output:

1
2
3
Copy the code

From the above we can conclude that the inbound handlers we added are executed in the order they were added.

With simple source tracing:

Private void addLast0 (AbstractChannelHandlerContext newCtx) {/ / get the current processor end node in the list of previous processor AbstractChannelHandlerContext prev  = this.tail.prev; // Set prev to the previous processor of the new processor newctx.prev = prev; // Set the new handler's tail to tail newctx.next = this.tail; // Set the new handler's tail to tail newctx.next = this.tail; Next = newCtx; // Set the next node of the prev to be the new processor. This.tail.prev = newCtx; this.tail.prev = newCtx; this.tail.prev = newCtx; }Copy the code

We can conclude that the handlers we added through the addLast method are actually added to a linked list, where each node has a header and tail node for its object, so the three handlers we added earlier would look like this:

2.2 ChannelOutboundHandlerAdapter

Then on the basis of the above, we increase ChannelOutboundHandlerAdapter processor, the CPU is only on the channel data to perform, this did not conceal, we directly write code to add in the middle of the code.

public static void main(String[] args) { new ServerBootstrap().group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioServerSocketChannel) { ChannelPipeline pipeline = nioServerSocketChannel.pipeline(); // Inbound processor pipeline.addlast ("h1", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("1"); super.channelRead(ctx, msg); }}); pipeline.addLast("h2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("2"); super.channelRead(ctx, msg); }}); pipeline.addLast("h3", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("3"); super.channelRead(ctx, msg); // the server channel calls the write method, The outbound processor will only take effect nioServerSocketChannel. WriteAndFlush (CTX) alloc (). The buffer () writeBytes (" helloworld ". GetBytes ())); }}); // pipeline. AddLast ("h4", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("4"); super.write(ctx, msg, promise); }}); pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("5"); super.write(ctx, msg, promise); }}); pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("6"); super.write(ctx, msg, promise); }}); } }).bind(8080); }Copy the code

Results:

One, two, three, six, five, fourCopy the code

As shown above, we find that the outbound processors are processed in order of 6, 5, and 4, from back to front.

In fact, ChannelPipeline implementation is a two-way linked list, so achieve the above outbound, inbound function.

2.3 Analysis of the above code process

As shown in the previous code, we can add a number of handlers to a Pipeline and execute them in a certain order. The main thing is that I can do different types of processing on the received data within each handler.

For example, in H1, I convert the received content into a string, send it to the H2 processor, which then converts the received string into a Java object, and so on.

So how do handlers pass processed data from one handler to another?

In the previous inbound handler, there is a line of code like this:

super.channelRead(ctx, msg);

This method looks like this:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }
Copy the code

Ctx.firechannelread (MSG) is executed internally, which is where the processed message is passed to the next handler for processing.

And this method can only wake up the next inbound processor, as in our previous code. Using this line of code to wake up H4 in H3 is not effective because H4 is an outbound processor. So we can delete this line of code and just use the following:

nioServerSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes(“helloworld”.getBytes()));

Note:

Our code above uses the writeAndFlush method for server channel (nioServerSocketChannel).

The object passed in each handler has a CTX, which also has writeAndFlush:

If you use this CTX and place it in H3, we will see that the execution results will only be passed over the results of the first three inbound processors.

Why is that?

Because the writeAndFlush method CTX is used, pipeline will look forward from the current processor to see if there is an outbound processor, and our H3, H2, h1 are all inbound processors, so there is no output from the outbound processor.

NioServerSocketChannel looks from tail to front, so it finds h6, h5, and h4.

validation

Execute CTX’s writeAndFlush method in h3, annotate the nioServerSocketChannel call, and place H3 after h5 to execute. According to the previous conclusion, it should print 1, 2, 3, 5, 4.

pipeline.addLast("h3", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("3"); //h4 is an outbound handler and this call has no effect. //super.channelRead(ctx, msg); // the server channel calls the write method, The outbound processor will only take effect / / nioServerSocketChannel writeAndFlush (CTX) alloc (). The buffer () writeBytes (" helloworld ". GetBytes ())); ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("helloworld".getBytes())); }});Copy the code

Code execution results, as expected:

One, two, three, five, fourCopy the code