The first two sections covered the basics of Netty:

  • Netty programming practice: control NIO
  • Netty programming: Reactor model

If you haven’t seen these two episodes, I recommend you to do so, because these are the basics

Netty Introduction example

Netty (Netty) : Netty (Netty) : Netty (Netty) : Netty (Netty) : Netty

Case description:

The server receives the message sent by the client

Let’s start by creating the project and configuring the environment

Configure the environment

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.50. The Final</version>
</dependency>
Copy the code

Maven-based approach to Netty, needless to say

Server main program

public class IMNettyServer {
    public static final String HOST = "127.0.0.1";
    public static final int PORT = 45882;

    public static void main(String[] args) {
        new IMNettyServer().start_server();
    }

    private void start_server(a) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap()
                    // Reactor group
                    .group(boss, work)
                    // Bind ports
                    .localAddress(PORT)
                    // NIO type channel
                    .channel(NioServerSocketChannel.class)
                    // Channel parameters
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    // Assemble subchannel pipeline
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // Add the handler to the pipeline
                            socketChannel.pipeline().addLast(newIMNettyServerHandler()); }}); ChannelFuture future = bootstrap.bind().sync(); System.out.println("Server started successfully, listening port:" + future.channel().localAddress());

            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{ work.shutdownGracefully(); boss.shutdownGracefully(); }}}Copy the code

Handler

@ChannelHandler.Sharable
public class IMNettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client channel registered successfully");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client listener:" + ctx.channel().localAddress());
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Read data");
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.getBytes(0, bytes);

        System.out.println("Client:" + new String(bytes));

        super.channelRead(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel cache has been read.");
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client" + ctx.channel().localAddress() + "Disconnect");
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Something abnormal has occurred.");
        super.exceptionCaught(ctx, cause); }}Copy the code

You see the above code, it is estimated that many will be confused, don’t worry, we look at the client side of the code, bit by bit to analyze them

The client

public class IMNettyClient {
    public static void main(String[] args) {
        new IMNettyClient().connect();
    }

    private void connect(a) {
        NioEventLoopGroup work = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap()
                    // Reactor group
                    .group(work)
                    / / channel
                    .channel(NioSocketChannel.class)
                    // Connect to the server
                    .remoteAddress(IMNettyServer.HOST, IMNettyServer.PORT)
                    / / set
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    // set the channel pipeline
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // Add the handler to the pipeline
							socketChannel.pipeline().addLast("read".newImNettyClientHandler()); }}); ChannelFuture sync = bootstrap .connect() .sync(); sendMsg(sync); }catch(InterruptedException e) { e.printStackTrace(); }}private void sendMsg(ChannelFuture sync) {
        Scanner scanner = new Scanner(System.in);

        while(scanner.hasNext()) { String msg = scanner.next(); ByteBuf buf = sync.channel().alloc().buffer(); buf.writeBytes(msg.getBytes()); sync.channel().writeAndFlush(buf); }}}Copy the code

From the code, we can see that the client side of the code and the server side of the code is much the same, the difference is Netty each component, we will analyze this small case

The embodiment of Reactor model in Netty

As mentioned above, Netty is a high-performance, highly scalable asynchronous event-driven infrastructure and tools designed for web servers and clients based on the Reactor model

Review the process flow of the reactor model

  • Event registration

The channel registers the selector and specifies the corresponding IO event

  • The polling event

A reactor is responsible for a thread, constantly polling for IO events in the selector

  • Dispatching events

Sends the queried I/O events to the Handler service processor bound to the I/O events

  • Complete I/O operations and service processing

Corresponding Channel component in Netty

Channels are a very important component of Netty, as you can see from the flow above:

  • The reactor pattern is closely related to the channel
  • The I/O events queried and distributed by the reactor originate from channels

So let’s start with the channel component in Netty

Instead of using NIO’s channel components, Netty encapsulates each communication protocol. Netty not only supports asynchronous communication, but also encapsulates standard blocking IO

Therefore, each protocol channel in Netty has two versions of asynchronous IO and synchronous IO.

In Netty 4.x, however, synchronous IO is classed as obsolete, so I won’t introduce them. You can look at the code. Synchronous IO starts with Oio

Common Netty channels are as follows:

  • NioServerSocketChannel

The asynchronous non-blocking socket server listens for the channel, which is already used in the server code above

  • NioSocketChannel

The asynchronous non-blocking socket client listens on the channel, which is already used in the client code above

It can be said that the above two channels are the most commonly used channels for developing TCP protocol in Netty

  • NioDatagramChannel

Asynchronous non-blocking UDP transport channel

use

As you can see, in Netty we pass in the specified channel class by calling the channel() method

// Specify the client channel
channel(NioSocketChannel.class);
// Specify the server channel
channel(NioServerSocketChannel.class);
Copy the code

The corresponding Reactor in Netty

In the NIO Reactor model, the Reactor is responsible for the event processing thread, constantly polling, and querying registered I/O events through the Selector Selector. In Netty, there is also an EventLoopGroup

This class is an interface class whose implementation class we use primarily in Netty: NioEventLoopGroup

The reactor mode in Netty must be the multithreaded version, so

NioEventLoopGroup NioEventLoopGroup NioEventLoopGroup NioEventLoopGroup NioEventLoopGroup NioEventLoopGroup NioEventLoopGroup NioEventLoopGroup NioEventLoopGroup NioEventLoopGroup NioEventLoopGroup Internal thread pool specifies the number of threads in the pool. The internal thread pool is the same as before:

A NioEventLoop has a Thread that polls for I/O events for a selector

use

In the code above, we can see that we are using the no-argument construction,

NioEventLoopGroup work = new NioEventLoopGroup();
Copy the code

Take a guess, what is the number of threads inside the no-parameter constructor?

The constructor with no arguments has twice the number of internal threads as the maximum number of available CPU processors

Server code reactor group resolution

Looking at the server code above, we provide two NioEventLoopGroups for the server

NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();

group(boss, work)
Copy the code

One way to think about it is,

  • In order for the server to receive new connections in time, the boss reactor is responsible for listening and receiving new connections
  • The WORK reactor is responsible for processing I/O events

With that out of the way, let’s talk about the processor

The Handler processor

As mentioned earlier, the types of IO events that selectors can monitor include:

  • Can be read

SelectionKey.OP_READ

  • Can write

SelectionKey.OP_WRITE

  • receive

SelectionKey.OP_CONNECT

  • The connection

SelectionKey.OP_ACCEPT

A selector inside the NioEventLoop reactor performs the query for the above events and then dispatches the events to the Handler Handler we defined

use

  • Through the serverchildHandler()To assemble the defined processor
childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // Custom processor
        socketChannel.pipeline().addLast(newIMNettyServerHandler()); }});Copy the code
  • Client passhandler()To assemble the defined processor
handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // Custom processor
        socketChannel.pipeline().addLast("read".newImNettyClientHandler()); }});Copy the code

Processor first, pipelining later

Processor type

All business processing is done in the processor. In Netty, the processor is divided into two main categories, one is inbound processor, the other is outbound processor

ChannelInboundHandler: inbound processor

In Netty, an inbound handler is a logical processing that is triggered by Netty from the bottom of a channel, passed through layers, and then called ChannelInboundHandler. Virtually all business processing is handled through the inbound processor

The process

Let’s take the OP_READ event as an example

  • It happens in the passageOP_READAfter the event, will beNioEventLoopQuery to
  • And then it’s distributed toChannelInboundHandlerInbound handler, calling its methodschannelRead()
  • Then, inchannelRead(), we can read data from the channel for business logic operations

As you can see from the above processing, Netty’s inbound processor triggers in the direction from the underlying channel to the ChannelInboundHandler inbound processor

The life cycle

ChannelInboundHandler is an interface class, in Netty, we usually use its subclasses ChannelInboundHandlerAdapter

Let’s take a look at some of the most important ones

  • channelRegistered

This method is triggered when a client connects

  • channelActive

When a client connection is successful, this method will be triggered. We can use this method to monitor the client connection address, online number and other service functions

  • channelRead

When the channel buffer is readable, a channel readable event is emitted. In this method, we get the data for the channel buffer

  • channelReadComplete

When the channel buffer becomes readable, Netty triggers a channel read complete event

  • channelInactive

When the connection is disconnected or unavailable, Netty will trigger this method. In this method, we can perform service functions such as user logout

  • exceptionCaught

When an exception occurs during channel processing, Netty triggers an exception capture event

Let’s take a look at the order in which each method is executed. Take a look at the code below and rewrite all the methods to see the order in the output

public class IMNettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client channel registered successfully");
        super.channelRegistered(ctx);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client listener:" + ctx.channel().localAddress());
        super.channelActive(ctx);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Read data");
        super.channelRead(ctx, msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel cache has been read.");
        super.channelReadComplete(ctx);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client" + ctx.channel().localAddress() + "Disconnect");
        super.channelInactive(ctx);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Something abnormal has occurred.");
        super.exceptionCaught(ctx, cause);
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Lifecycle method: add");
        super.handlerAdded(ctx);
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Lifecycle method: remove");
        super.handlerRemoved(ctx); }}Copy the code

You can take a guess

  1. When a client connects to a server
Lifecycle method: add client channel successfully registered client listener: /127.00.1:45882
Copy the code
  1. When a client sends a message to a server
Reading client: the AAA channel cache has been read. Reading client: the ASDF channel cache has been readCopy the code

These two methods are executed multiple times, as soon as data is sent

  1. The client disconnects from the server. Procedure
The channel cache has read the exception client /127.00.1:45882Remove the life cycleCopy the code

Because I’m forcing the client to shut down, an exception is raised

Now that we know about inbound handlers, let’s look at the station handlers

ChannelOutboundHandler: outbound processor

An outbound handler represents an IO operation by the ChannelOutboundHandler to a channel. That is, after the application has finished processing the business, the result of the processing can be written to the underlying channel and sent to the other end. One of the most common methods is write()

So the outbound processor is triggered in the direction of the Netty channel to operate on the underlying Java IO channel

Important method

ChannelOutboundHandler is an interface class, in Netty, we usually use its subclasses ChannelOutboundHandlerAdapter

Let’s take a look at some of the most important ones

  • bind

Listening address binding: The IO address binding of the underlying Java IO channel is completed. In the case of TCP transport, the method is used on the server side

  • connect

Connect to the server to complete the connection operation. In the case of TCP, the method is used by the client

  • write

Write data to the underlying layer to write data from the Netty channel to the underlying Java IO channel. This method only triggers the operation, but does not actually write the data. We will see what this means later when we learn about codecs: usually we can write the version number, magic number, message length, etc

  • flush

Flushes data and writes the data in the buffer to the peer end

  • read

Read data from the underlying layer

  • disConnect

Disconnect the server, if TCP protocol, method used for the client

  • close

Active channel closing

Note: to be honest, I use the outbound processor to use a little less, or I do not know about the use of outbound processor, I will not say more, the comments section is open for everyone to feel free to speak, we will discuss together

Let’s look at the special components that bind the relationship between the channel and the Handler Handler

Pipeline:

Let’s sort out the relationships among components in Netty’s reactor model:

  • NioEventLoopandNioChannelIs a one-to-many relationship: a reactor can query IO events of many channels
  • NioChannelandThe Handler processorThe relationship is many-to-many: IO events of a channel can be processed by multiple Handler handlers. A Handler processor can also bind multiple channels to handle IO events for multiple channels

In this case, Netty provides us with a special component, ChannelPipline, which we call pipeline, in order to coordinate the various components and ensure the normal operation of the application

Because it is like a pipeline that strings together multiple Handler handlers bound to a channel, all Handler handlers added are nodes in the pipeline, just like a pipelining process in a factory

Execution order

ChannelPipline is designed as a two-way linked list structure that can dynamically add and remove Handler business processors, such as

  • addLast

  • addFirst

  • remove

  • .

In pipelining, the order of execution of the inbound and outbound processors is different

  • Inbound processor

The order of the inbound processors is front-to-back, in the order in which we add processors to the pipeline, for example

socketChannel.pipeline().addLast(new InHandlerA());
socketChannel.pipeline().addLast(new InHandlerB());
socketChannel.pipeline().addLast(new InHandlerB());
Copy the code

So, their structure in the pipeline is A –> B –> C structure, the order of execution is A –> B –> C

  • Outbound processor

The order of the outbound processors is from back to front, in reverse order of the order in which we added the processors to the pipeline, for example, in the same way as above, but with the outbound processors

socketChannel.pipeline().addLast(new OutHandlerA());
socketChannel.pipeline().addLast(new OutHandlerB());
socketChannel.pipeline().addLast(new OutHandlerB());
Copy the code

Their structure in the pipeline is A –> B –> C, but they are executed from back to front, so the order of execution is C –> B –> A

You can check it out for yourself

Each IO event from the channel goes to the pipeline processor, and the processor encounters three conditions during its processing

  • If there are other Handler handlers following, the IO event is passed to the next Handler Handler
  • If no other Handler handlers follow, the processing of the IO event ends there
  • If termination is needed in the middle of the pipeline, the processor can also choose not to pass the IO event further down the pipeline and terminate processing at this point
Truncated pipeline processor

The third case above says that we can manually block the pipeline, so let’s see how to block it,

Using an inbound handler:

public class IMNettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Read data");
        // super.channelRead(ctx, msg);}}Copy the code

For every overridden method, we call the method of the parent class, and if we don’t call the method of the parent class, the pipeline stops passing down

assembly

So, if you remember the code above, how does Netty assemble processors into a pipeline

/ / the server
childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(newIMNettyServerHandler()); }});/ / the client
handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // This code is basically the same as the Handler on the server side
        socketChannel.pipeline().addLast("read".newImNettyClientHandler()); }});Copy the code

There is one class involved: ChannelInitializer, also known as the ChannelInitializer handler. As long as we implement the initChannel() method and get the newly received channel (parameter), we can assemble the handler into the pipeline

Start the class

Bootstrap is a convenient factory class provided by Netty. This class can be used to assemble components and initialize programs on the Netty client or server side.

Netty provides us with two launch classes and a very convenient way to chain calls

  • Bootstrap

Client startup class

  • ServerBootstrap

The server startup class

Channel Option Configuration

As you can see from the code above, we set a series of options for either the server channel or the client channel using option(). Let’s look at some of the common options

  • The SO_RCVBUF, SO_SNDBUF

This is a TCP parameter that sets the size of the send and receive buffers in the kernel for each TCP socket

  • SO_KEEPALIVE

This parameter is a TCP parameter and indicates the underlying TCP heartbeat mechanism. True indicates that the connection holds the heartbeat. The default value is false

  • SO_BACKLOG

This is a TCP parameter and indicates the length of the queue for the server to receive connections. If the queue is full, the client rejects the connection.

The default value is 200 for Windows and 128 for other operating systems.

  • SO_BROADCAST

This is a TCP parameter, which indicates setting the broadcast mode

  • ALLOCATOR

Define an instance of ByteBuf, which we’ll cover in the next section

To this, the basic knowledge of Netty is complete, the code is actually some fixed writing method, it is important to understand Netty mode, etc

The words in the back

About the above points, if there is any bad or problematic writing, welcome to point out, the above are the most basic knowledge points, more in-depth knowledge we study together, but also welcome to discuss with me.

The following is a new source of personal knowledge I built with Hexo + NexT. The latest content will be put here first. Welcome to visit

Most basic, nothing, O(∩_∩)O ha ha ~

Summary of my knowledge system