This is the 16th day of my participation in the August More Text Challenge. For details, see:August is more challenging

preface

In our last article, we looked at the Reactor model in detail,

If not, you can check out the Netty Source Analysis Series (10) Reactor model

Links to other articles on Netty source analysis are as follows:

Netty source analysis series (nine) Netty program bootstrap class

Netty source code analysis series (eight) Netty how to achieve zero copy

Netty source code analysis series (seven) Byte buffer ByteBuf (next)

Netty source code analysis series (6) ByteBuf buffer

Netty source code analysis series (five) ChannelPipeline source analysis

Netty source code analysis series (4) ChannelHandler

Netty source code Analysis series (3) Channel overview

Netty source code analysis series (2) Netty architecture design

Netty source analysis series (a) Netty overview

Next, we will explore the Netty model. Netty uses the master-slave Reactor multi-threaded model.

Netty model

Here’s how Netty works:

The execution process is as follows:

  • Netty abstracts two groups of thread pools: the BossGroup, which receives connections from clients, and the WorkerGroup, which reads and writes from the network.

  • BossGroup and WorkerGroup are of type NioEventLoopGroup.

  • NioEventLoopGroup is equivalent to an event loop group. This group contains multiple event loops, each of which is NioEventLoop.

  • NioEventLoop represents a thread that iterates over and over again to perform processing tasks. Each NioEventLoop has a selector that listens to the network traffic of the socket bound to it.

  • A NioEventLoopGroup can have multiple threads, that is, it can contain multiple NioEventLoops.

  • Each Boss NioEventLoop performs three steps:

    • 1. Polling accept events;
    • 2. Handle the Accept event, establish a connection with the client, and generateNioScocketChannelAnd register it with a selector on a worker NIOEventLoop;
    • 3. The tasks that process the task queue, namely runAllTasks.
  • Steps performed by each Worker NIOEventLoop loop:

    • 1, pollingread.writeEvent Handles I/O events, that isread , writeEvents;
    • 2, in the correspondingNioScocketChannelProcessing;
    • 3. Process tasks in the task queue, i.erunAllTasks;
  • When each Worker NIOEventLoop processes business, it uses pipeline. Pipeline contains channel, that is, the corresponding channel can be obtained through pipeline, and many processors (channelhandlers) are maintained in pipeline.

Code sample

Introducing Maven dependencies

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.149..Final</version>
</dependency>
Copy the code

Server side pipe handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    // Read the actual data (here we can read the message sent by the client)
    2. Object MSG: default Object */. /* 1
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server ctx =" + ctx);
        Channel channel = ctx.channel();
        // Convert MSG to a ByteBuf
        //ByteBuf is provided by Netty, not NIO's ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("The message sent by the client is :" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("Client address :" + channel.remoteAddress());
    }


    // The data is read
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //writeAndFlush is write + Flush
        // Write data to the cache and refresh
        // In general, we encode the data we send
        ctx.writeAndFlush(Unpooled.copiedBuffer("The company's recent account has no money, wait a few days!", CharsetUtil.UTF_8));
    }

    // Handle the exception, which usually requires closing the channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}Copy the code

The server main program

public class NettyServer {

    public static void main(String[] args) throws Exception {
        // Create BossGroup and WorkerGroup
        / / that
        //1. Create two thread groups bossGroup and workerGroup
        //2. The bossGroup only handles connection requests. The workerGroup handles the real business with the client
        //3. Both are infinite loops
        //4. Number of child threads (NioEventLoop) in bossGroup and workerGroup
        // The default number of actual CPU cores x 2
        //
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); / / 8
        try {
            // Create a startup object on the server and set parameters
            ServerBootstrap bootstrap = new ServerBootstrap();
            // Use chained programming to set this up
            bootstrap.group(bossGroup, workerGroup) // Set two thread groups
                    .channel(NioServerSocketChannel.class) //bossGroup is implemented using NioSocketChannel as the channel for the server
                    .option(ChannelOption.SO_BACKLOG, 128) // Set the number of connections in the thread queue.
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // Set the child to keep the connection alive for worker thread groups
                    .childHandler(new ChannelInitializer<SocketChannel>() {//workerGroup uses SocketChannel to create a channel to initialize the object (anonymous object)
                        // Set processor for pipeline
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // The SocketChannel can be managed using a collection. When pushing messages, the service can be added to the taskQueue or scheduleTaskQueue of the NIOEventLoop corresponding to each channel
                            ch.pipeline().addLast(newNettyServerHandler()); }});// Set the handler for our workerGroup's EventLoop pipeline

            System.out.println("... The server is ready...");
            // Bind a port and synchronize, generating a ChannelFuture object
            // Start the server (and bind the port)
            ChannelFuture cf = bootstrap.bind(7788).sync();
            // Register a listener for cf to monitor the events we care about
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("Service started on port 7788...");
                    } else {
                        System.out.println("Service startup failed..."); }}});// Listen for closed channels
            cf.channel().closeFuture().sync();
        } finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

NioEventLoopGroup is a multi-threaded event loop for handling I/O operations. Netty provides a number of different implementations of EventLoopGroup to handle different transports.

In the server application above, two NioEventLoopgroups are used. The first one, called bossGroup, is used to receive incoming connections. The second, called workerGroup, handles connections that have already been received. Once the bossGroup receives the connection, it registers the connection information with the workerGroup.

ServerBootstrap is a boot class for NIO services. You can use a Channel directly in this service.

  • groupMethod is used to setEventLoopGroup.
  • throughChannelMethod can be specified for new connectionsChannelA type ofNioServerSocketChannelClass.
  • childHandlerIs used to specify theChannelHandlerWhich is what we implemented beforeNettyServerHandler.
  • Can be achieved byoptionSet the specifiedChannelTo implement theNioServerSocketChannelIs used to configure parameters.
  • childOptionThe main SettingsSocketChannelThe son ofChannelOptions.
  • bindUsed to start services on a bond port.

Client pipe processor

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    // This method is triggered when the channel is ready
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client ctx =" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("Boss, when do I get paid?", CharsetUtil.UTF_8));
    }

    // Triggers when the channel has a read event
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("The server replied to the message :" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("Server address:"+ ctx.channel().remoteAddress());
    }

    // Handle the exception, which usually requires closing the channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

Client main program

public class NettyClient {

    public static void main(String[] args) throws Exception {
        // The client needs an event loop group
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // Create the client startup object
            // Note that the client uses Bootstrap instead of ServerBootstrap
            Bootstrap bootstrap = new Bootstrap();
            // Set related parameters
            bootstrap.group(group) // Set the thread group
                    .channel(NioSocketChannel.class) // Set the client channel implementation class (reflection)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler()); // Add your own processor}}); System.out.println("Client OK..");
            // Start the client to connect to the server
            // Netty's asynchronous model is used to describe the future
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1".7788).sync();
            // Listen for the closing channel
            channelFuture.channel().closeFuture().sync();
        } finally{ group.shutdownGracefully(); }}}Copy the code

The client only needs a NioEventLoopGroup.

A test run

Start the NettyServer and NettyClient programs respectively

Server console output:

. The server is ready... The service has been started. The port number is 7788... server ctx =ChannelHandlerContext(NettyServerHandler#0, [id: 0xa1b2233c, L:/127.0.0.1:7788 -r :/127.0.0.1:63239]) Client address :/127.0.0.1:63239Copy the code

Client console output:

The client ok.. client ctx =ChannelHandlerContext(NettyClientHandler#0, [id: 0x21d6f98e, L:/127.0.0.1:63239 -r :/127.0.0.1:7788]) Server address: /127.0.0.1:7788Copy the code

So far, a simple development based on Netty server and client is complete.

conclusion

This article mainly explains the working principle and simple application of Netty. In the next section, we will explain the Netty codec.

At the end

I am a code is being hit is still trying to advance. If the article is helpful to you, remember to like, follow yo, thank you!