Summary: It is difficult to develop Java niO. Netty can be understood as a layer of encapsulation based on NIO, which is more convenient to use. The bottom layer of NIO is a series of things such as Select Epoll.

1. An overview of the

Netty is an asynchronous, event-driven network application framework for rapid development of maintainable, high-performance network servers and clients

1.1 Netty Advantages

  • Netty vs NIO, a lot of work and bugs
    • You need to build your own protocols
    • Solve TCP transmission problems, such as sticky packets and half packets
    • Epoll Empty polling results in 100% CPU
    • Add enhancements to the API to make it easier to use, such as FastThreadLocal => ThreadLocal, ByteBuf => ByteBuffer
  • Netty vs other Network application frameworks
    • Mina is maintained by Apache, and there may be major refactoring in future 3.x releases that will break API backwards compatibility. Netty’s development iteration is faster, with cleaner apis and better documentation
    • Tried and true, 16 years, Netty version
      • 2.x 2004
      • 3.x 2008
      • 4.x 2013
      • 5. X has been abandoned (no obvious performance improvement, high maintenance cost)

2. Hello World

2.1 the target

Develop a simple server and client

  • The client sends hello, world to the server
  • The server only receives, but does not return

Join the rely on

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39. The Final</version>
</dependency>
Copy the code

2.2 Server

package netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class HelloServer {
    public static void main(String[] args) {
        // 1, the initiator, is responsible for the assembly of netty components coordination
        new ServerBootstrap()
                // Add BossEventLoop and WorkerEventLoop = selector to listen for events + multithreading
                .group(new NioEventLoopGroup()) / / 1
                // Select the service Scoket implementation class
                .channel(NioServerSocketChannel.class) / / 2
                // Boos -- handles read and write events
                // Determines which actions the worker will need to perform in the future
                .childHandler(
                        // Initializer for the channel that reads and writes to the client
                        new ChannelInitializer<NioSocketChannel>() { / / 3
                            @Override
                            protected void initChannel(NioSocketChannel ch) {
                                // Add a concrete handler to convert ByteBuf to a string
                                ch.pipeline().addLast(new StringDecoder()); / / 5
                                // Custom handler
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { / / 6
                                    // Prints the string converted in the previous step
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); }}); }})// The server's listening port
                .bind(8080); / / 4}}import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class HelloServer {
    public static void main(String[] args) {
        // 1, the initiator, is responsible for the assembly of netty components coordination
        new ServerBootstrap()
                // Add BossEventLoop and WorkerEventLoop = selector to listen for events + multithreading
                .group(new NioEventLoopGroup()) / / 1
                // Select the service Scoket implementation class
                .channel(NioServerSocketChannel.class) / / 2
                // Boos -- establishes connection worker (child) -- handles read and write events
            
                // Determines which actions the worker will need to perform in the future
                .childHandler(
                        // Initializer for the channel that reads and writes to the client
                        new ChannelInitializer<NioSocketChannel>() { / / 3
                            @Override
                            protected void initChannel(NioSocketChannel ch) {
                                // Add a concrete handler to convert ByteBuf to a string
                                ch.pipeline().addLast(new StringDecoder()); / / 5
                                // Custom handler
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { / / 6
                                    // Prints the string converted in the previous step
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); }}); }})// The server's listening port
                .bind(8080); / / 4}}Copy the code

Code reading

  • At 1, NioEventLoopGroup is created, which can be simply interpreted as thread pool + Selector, which will be expanded later

  • 2, select the service Scoket implementation class, where NioServerSocketChannel represents the NIO-based server-side implementation, and other implementations include

  • The method is called childHandler because all subsequent handlers are added for SocketChannel, not ServerSocketChannel. ChannelInitializer processor (executed only once). It is used to add more processors to initChannel after the SocketChannel client has established a connection

  • ServerSocketChannel Bound listening port

  • At 5, the SocketChannel processor decodes ByteBuf => String

  • At point 6, SocketChannel’s business handler uses the result of the previous handler

2.3 the client

package netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Date;

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        // 1. The bootstrap class is responsible for assembling a netty component
        new Bootstrap()
                / / add Eventgroup
                .group(new NioEventLoopGroup()) / / 1
                // Select the channel implementation of the client
                .channel(NioSocketChannel.class) / / 2
                // Add handlers
                .handler(new ChannelInitializer<Channel>() { / / 3
                    @Override
                    protected void initChannel(Channel ch) {
                        // Called after the connection is established, execute and build String- ByteBuf
                        ch.pipeline().addLast(new StringEncoder()); / / 8
                    }
                })
                .connect("127.0.0.1".8080) / / 4
                .sync() // 5=== blocks the method until the connection is established
                .channel() // 6 === indicates the connection object between the client and server
                // Send data
                .writeAndFlush(new Date() + "******************: hello world!"); / / 7}}Copy the code

Code reading

  • 1, create NioEventLoopGroup, same as Server

  • 2, select the client Socket implementation class, NIO SocketChannel represents the NIO based client implementation, other implementations also

  • The ChannelInitializer handler (executed only once) is used to add more processors to the SocketChannel after the SocketChannel connection is established

  • 4 indicates the server and port to be connected

  • 5. Many Netty methods are asynchronous, such as connect. In this case, you need to use sync to wait for connect to complete

  • At 6, get the Channel object, which is the channel abstraction and can be used to read and write data

  • At 7, the message is written and the buffer is emptied

  • At 8, the message is processed by the channel handler, which emits String => ByteBuf

  • The data is transmitted through the network and arrives at the server. The handlers at places 5 and 6 on the server are triggered successively to complete a process

2.4 Process Combing

You need to start with the right mindset

  • Think of a channel as a data channel
  • MSG is understood as flowing data. At first, the input is ByteBuf, but after pipeline pipeline processing, it will be changed into other types of objects, and finally the output becomes ByteBuf
  • Handler refers to the processing of data
    • Pipeline is responsible for publishing events (read, read completed…). Propagated to each handler, which handles the event it is interested in (rewriting the corresponding event handling method).
    • There are Inbound and Outbound handlers
  • Think of an eventLoop as a worker processing data
    • A worker can manage IO operations for multiple channels, and once a worker is responsible for a channel, it is responsible to the end (binding)
    • Workers can perform IO operations as well as task processing. Each worker has a task queue, in which multiple channels of pending tasks can be stacked, including ordinary tasks and scheduled tasks
    • Workers process data in pipeline order, following handler’s plan (code) in turn, and different workers can be assigned for each process
    • Each worker is a single thread pool
    • The collection of workers is the EventLoopGroup

3. The component

3.1 EventLoop

Event loop object

An EventLoop is essentially a single-threaded executor (which also maintains a Selector) with a run method that handles a stream of I/O events on a Channel.

Its inheritance relationship is more complicated

  • A line is inherited from J.U.C.S cheduledExecutorService therefore includes all methods in the thread pool
  • The other line is netty’s own OrderedEventExecutor,
    • The Boolean inEventLoop(Thread Thread) method is provided to determine whether a Thread belongs to the EventLoop
    • The parent method is provided to see which EventLoopGroup you belong to

Event loop group

An EventLoopGroup is a group of eventloops. A Channel will call the Register method of the EventLoopGroup to bind one of the eventloops. Subsequent IO events on this Channel are handled by EventLoop (to ensure thread-safe IO event processing).

  • Inherited from Netty’s own EventExecutorGroup
    • Implements the Iterable interface to provide the ability to traverse EventLoop
    • The next method gets the next EventLoop in the collection

Take a simple implementation as an example:

// Create two eventloops internally. Each EventLoop maintains a thread
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
Copy the code

The output

io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
io.netty.channel.DefaultEventLoop@60f82f98
Copy the code

You can also use a for loop

DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
for (EventExecutor eventLoop : group) {
    System.out.println(eventLoop);
}
Copy the code

The output

io.netty.channel.DefaultEventLoop@60f82f98
io.netty.channel.DefaultEventLoop@35f983a6
Copy the code

Perform common tasks

Performing scheduled Tasks

import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.NettyRuntime;

import java.util.concurrent.TimeUnit;

public class TestEventLoop {
    public static void main(String[] args) {
        // 1. Create an event loop group
        NioEventLoopGroup group = new NioEventLoopGroup(2); // It can handle IO events, common tasks, and scheduled tasks
        System.out.println(NettyRuntime.availableProcessors());
        // 2. Get the next event loop object, can return to the first, implement, if the same time multiple channels, allocate
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println("* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *");
        // 3. Perform a common task ====> is a task in a thread pool
        group.next().submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("I'm Eventloop:" + Thread.currentThread().getName());
        });
        System.out.println("I'm the main thread");
        System.out.println("* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *");
        //4. Execute a scheduled task
        group.next().scheduleAtFixedRate(() -> {
            System.out.println("I'm deferred.");
        }, 0.1, TimeUnit.SECONDS);
        System.out.println("I'm the main thread"); }}Copy the code

💡 gracefully closed

Gracefully disable the shutdownGracefully method. This method first switches the EventLoopGroup to the closed state to reject new tasks, and then stops the thread when all tasks in the task queue have been processed. This ensures that the overall application exits in a normal and orderly manner

Demo NioEventLoop handling IO events

Server side two nio worker workers

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.Charset;

public class EventLoopServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
	            // The socket used by the client
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    // called when the connection is established,
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            // We care about the read event
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                // Set its character setSystem.out.println(buf.toString(Charset.defaultCharset())); }}); } }).bind(8080); }}Copy the code

Client, start three times, modify the send string to zhangsan (first time), Lisi (second time), wangwu (third time)

public static void main(String[] args) throws InterruptedException {
    Channel channel = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    System.out.println("init...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            })
            .channel(NioSocketChannel.class).connect("localhost".8080)
            .sync()
            .channel();

    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
    Thread.sleep(2000);
    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
Copy the code

The final output

22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu  22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwuCopy the code

You can see that two workers take turns working on the channel, but there is a binding between the worker and the channel

Threading model

Specific instructions:

1. Boos is only responsible for the Accept event of a channel and assigns the channel to a NioEventLoop. Each NioEventLoop is a thread pool containing only one thread, with a selector component on it for polling 3. Each channel is bound to a NioEventLoop, and a NioEventLoop can be bound to a channel. A channel can be understood as a buffer in the network card, socket 5. Both the server and the client can write and read to it, and let the other side receive 6. When one side writes to a channel, it needs to go through OutBound; when the other side reads, it needs to go through InBound 7. Pipeline can be understood as a processor bound to a channel, and both sides can bind differentlyCopy the code

Add two more non-NIO workers

// A separate EventLoopGroup is used to handle special events
DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
new ServerBootstrap()
    / / boss and worker
    // Boss is responsible for connection events, worker is responsible for read and write events
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    // Server socket implementation
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch)  {
            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
            ch.pipeline().addLast(normalWorkers,"myhandler".new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                    if(byteBuf ! =null) {
                        byte[] buf = new byte[16];
                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
                        log.debug(newString(buf)); }}}); } }).bind(8080).sync();
Copy the code

The client code remains the same, start three times, change the send string to zhangsan (first time), Lisi (second time), wangwu (third time)

The output

22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 -r :/127.0.0.1:52588] REGISTERED 22:19:48 [DEBUG] [nioEventLoopgroup-4-1] i.n.H.L.logingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 -r :/127.0.0.1:52588] ACTIVE 22:19:48 [DEBUG] [nioEventLoopgroup-4-1] i.n.H.L.logingHandler - [ID: 0x251562D5, L:/127.0.0.1:8080 -r :/127.0.0.1:52588] READ: 0x251562D5, L:/127.0.0.1:8080 -r :/127.0.0.1:52588] 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7a 68 61 6e 67 73 61 6e |zhangsan | +--------+-------------------------------------------------+----------------+ 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/ 127.0.0.1:8080-r :/127.0.0.1:52588] READ COMPLETE 22:19:48 [DEBUG] [defaulteventLoopgroup-2-1] C.I.o.ventLooptest -  zhangsan 22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562D5, L:/127.0.0.1:8080 -r :/127.0.0.1:52588] READ: 0x251562D5, L:/127.0.0.1:8080 -r :/127.0.0.1:52588] 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 7a 68 61 6e 67 73 61 6e |zhangsan | +--------+-------------------------------------------------+----------------+ 22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/ 127.0.0.1:8080-r :/127.0.0.1:52588] READ COMPLETE 22:19:50 [DEBUG] [defaulteventLoopgroup-2-1] C.I.o.ventLooptest -  zhangsan 22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 -r :/127.0.0.1:52612] REGISTERED 22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.H.L.logingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 -r :/127.0.0.1:52612] ACTIVE 22:20:25 [DEBUG] [nioEventLoopgroup-4-2] i.n.H.L.logingHandler - [ID: 0x94B2a840, L:/127.0.0.1:8080 -r :/127.0.0.1:52612] READ: 0x94B2a840, L:/127.0.0.1:8080 -r :/127.0.0.1:52612] 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 6c 69 73 69 |lisi | +--------+-------------------------------------------------+----------------+ 22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/ 127.0.0.1:8080-r :/127.0.0.1:52612] READ COMPLETE 22:20:25 [DEBUG] [defaulteventLoopgroup-2-2] C.I.O.ventLooptest -  lisi 22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94B2a840, L:/127.0.0.1:8080 -r :/127.0.0.1:52612] READ: 0x94B2a840, L:/127.0.0.1:8080 -r :/127.0.0.1:52612] 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 6c 69 73 69 |lisi | +--------+-------------------------------------------------+----------------+ 22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/ 127.0.0.1:8080-r :/127.0.0.1:52612] READ COMPLETE 22:20:27 [DEBUG] [defaulteventLoopgroup-2-2] C.I.O.ventLooptest -  lisi 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 -r :/127.0.0.1:52625] REGISTERED 22:20:38 [DEBUG] [nioEventLoopgroup-4-1] i.n.H.L.LogingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 -r :/127.0.0.1:52625] ACTIVE 22:20:38 [DEBUG] [nioEventLoopgroup-4-1] i.n.H.L.logingHandler - [ID: 0x79A26AF9, L:/127.0.0.1:8080 -r :/127.0.0.1:52625] READ: 6B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 77 61 6e 67 77 75 |wangwu | +--------+-------------------------------------------------+----------------+ 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/ 127.0.0.1:8080-r :/127.0.0.1:52625] READ COMPLETE 22:20:38 [DEBUG] [defaulteventLoopgroup-2-1] C.I.o.ventLooptest -  wangwu 22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79A26AF9, L:/127.0.0.1:8080 -r :/127.0.0.1:52625] READ: 6B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 77 61 6e 67 77 75 |wangwu | +--------+-------------------------------------------------+----------------+ 22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/ 127.0.0.1:8080-r :/127.0.0.1:52625] READ COMPLETE 22:20:40 [DEBUG] [defaulteventLoopgroup-2-1] C.I.o.ventLooptest -  wangwuCopy the code

As you can see, NIO workers and non-NIO workers are also bound to channels (LoggingHandlers are executed by NIO workers, while our own handlers are executed by non-NIO workers)

💡 How do I replace a user during handler execution?

The key code io.net ty. Channel. AbstractChannelHandlerContext# invokeChannelRead ()

So here’s the head thread, doing the task, basically passing the task over and over again

  1. When the next handler uses the same thread as the current handler, it is passed directly, called directly, or done by itself
  2. ** Executor.execute () further encapsulates the call when the thread used by the next handler is not the same as the current thread, leaving it to another thread
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // Whether the event loop of the next handler is the same thread as the current event loop
    // Return eventLoop == for the next handler, head calls h1 and returns the thread specified by h1
    EventExecutor executor = next.executor();
    // If yes, call it directly
    // The current handler thread is the same thread as h1's thread
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // No, the code to be executed is submitted as a task to the next event loop (substitution)
    else {
        executor.execute(new Runnable() {
            @Override
            public void run(a) { next.invokeChannelRead(m); }}); }}Copy the code
  • If both handlers are bound to the same thread, they are called directly
  • Otherwise, encapsulate the code to be invoked as a task object, which is invoked by the next handler thread

Demonstrate NioEventLoop handling common tasks

In addition to handling IO events, NioEventLoop can also submit normal tasks to it

NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

log.debug("server start...");
Thread.sleep(2000);
nioWorkers.execute(()->{
    log.debug("normal task...");
});
Copy the code

The output

22:30:36 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:30:38 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - normal task...
Copy the code

It can be used to perform long tasks

Demonstrates NioEventLoop handling a scheduled task

NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

log.debug("server start...");
Thread.sleep(2000);
nioWorkers.scheduleAtFixedRate(() -> {
    log.debug("running...");
}, 0.1, TimeUnit.SECONDS);
Copy the code

The output

22:35:15 [DEBUG] [main] c.i.o.EventLoopTest2 - server start... 22:35:17 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running... 22:35:18 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running... 22:35:19 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running... 22:35:20 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running... .Copy the code

It can be used to perform scheduled tasks

3.2 the Channel

The main role of a channel

  • Close () can be used to close a channel
  • CloseFuture () is used to handle channel closure
    • The sync method waits synchronously for a channel to close
    • The addListener method asynchronously waits for a channel to close
  • The pipeline() method adds handlers
  • The write() method writes data, not necessarily immediately, to the send cache
  • The writeAndFlush() method writes data to and brushes it out

ChannelFuture

This is the client code

new Bootstrap() 
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1".8080)
    .sync()
    .channel()
    .writeAndFlush(new Date() + ": hello world!");
Copy the code

Now let’s break it down

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1".8080); / / 1

channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");
Copy the code
  • 1 returns the ChannelFuture object, which uses the channel() method to retrieve the channel object

Note that the CONNECT method is asynchronous, meaning that the method execution returns before the connection is established. Therefore, the correct Channel object is not available in the channelFuture object

The experiment is as follows:

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1".8080);

System.out.println(channelFuture.channel()); / / 1
// Block the current thread until the NIO thread is connected
channelFuture.sync(); / / 2
System.out.println(channelFuture.channel()); / / 3
Copy the code
  • When the connection is not established, print [ID: 0x2e1884dd]

  • At 2, the sync method waits synchronously for the connection to complete

  • [id: 0x2E1884dd, L:/ 127.0.0.1:57191-r :/127.0.0.1:8080]

    The ones with futures are usually asynchronous

In addition to synchronizing asynchronous operations with the sync method, you can also use the callback method:

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1".8080);
System.out.println(channelFuture.channel()); / / 1
// Create a new thread to execute the callback. The main thread will not execute
channelFuture.addListener((ChannelFutureListener) future -> {
    System.out.println(future.channel()); / / 2
});
Copy the code
  • When the command output reaches 1, the connection is not established[id: 0x749124ba]
  • ChannelFutureListener is called when the connection is established (with the operationComplete method), so the connection must be established by execution 2, printing[id: 0 x749124ba, L: / 127.0.0.1:57351 - R: / 127.0.0.1:8080]

CloseFuture

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group new NioEventLoopGroup(a);
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // is called after the connection has been established
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost".8080));
        Channel channel = channelFuture.sync().channel();
        log.debug("{}", channel);
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String line = scanner.nextLine();
                if ("q".equals(line)) {
                    channel.close(); // close After the asynchronous operation for 1s
// log.debug(" Handle actions after shutdown "); // Can't clean up here
                    break; } channel.writeAndFlush(line); }},"input").start();

        // Get CloseFuture object, 1) synchronous processing closed, 2) asynchronous processing closed
        ChannelFuture closeFuture = channel.closeFuture();
        /*log.debug("waiting close..." ); // Continue to run closeFuture.sync() only after the close execution is complete; Log.debug (" Handle actions after shutdown "); * /
        // Callback operation, who closed, who called
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                log.debug("Handling operations after closure");
                // The main thread can be terminated only if the thread in the group is closedgroup.shutdownGracefully(); }}); }}Copy the code

3.3 the Future & Promise

These two interfaces are often used in asynchronous processing

The Future in Netty has the same name as the Future in the JDK, but it has two interfaces. The Future in Netty inherits from the JDK, and Promise extends Netty Future

  • JDK Futures can only synchronously wait for tasks to end (or succeed, or fail) before getting results
  • Netty Future can either wait for the result to end asynchronously or wait for the result to end asynchronously
  • Not only does Netty Promise have the functionality of Netty Future, it also exists independently of the task, serving only as a container for passing results between two threads
Function/Name jdk Future netty Future Promise
cancel Cancel the task
isCanceled Whether the task is canceled
isDone Task completion does not distinguish between success and failure
get Get task result, block wait
getNow Gets the result of the task, non-blocking, and returns NULL if the result has not yet been produced
await Wait until the task is complete. If the task fails, isSuccess is used instead of throwing an exception
sync Wait for the task to end and throw an exception if the task fails
isSuccess Determine whether the task succeeded
cause Gets a failure message, non-blocking, and returns NULL if there is no failure
addLinstener Add a callback to receive the result asynchronously
setSuccess Setting Result
setFailure Setting failure Result

Case 1

The synchronization task succeeded. Procedure

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(()->{
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.debug("set success, {}".10);
    promise.setSuccess(10);
});

log.debug("start...");
log.debug("{}",promise.getNow()); // No results yet
log.debug("{}",promise.get());
Copy the code

The output

11:51:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
11:51:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null
11:51:54 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set success, 10
11:51:54 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - 10
Copy the code

Case 2

The asynchronous processing task succeeded. Procedure

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

// Set the callback to receive the result asynchronously
promise.addListener(future -> {
    // Future is the promise
    log.debug("{}",future.getNow());
});

// The configuration succeeds after 1000
eventExecutors.execute(()->{
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.debug("set success, {}".10);
    promise.setSuccess(10);
});

log.debug("start...");
Copy the code

The output

11:49:30 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
11:49:31 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set success, 10
11:49:31 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - 10
Copy the code

Example 3

Failed to synchronize tasks – sync & get

DefaultEventLoop eventExecutors = new DefaultEventLoop();
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

        eventExecutors.execute(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RuntimeException e = new RuntimeException("error...");
            log.debug("set failure, {}", e.toString());
            promise.setFailure(e);
        });

        log.debug("start...");
        log.debug("{}", promise.getNow());
        promise.get(); // sync() will also raise an exception, but get will use ExecutionException again
Copy the code

The output

12:11:07 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
12:11:07 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null
12:11:08 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error...
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: error...
	at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)
	at com.itcast.oio.DefaultPromiseTest2.main(DefaultPromiseTest2.java:34)
Caused by: java.lang.RuntimeException: error...
	at com.itcast.oio.DefaultPromiseTest2.lambda$main$0(DefaultPromiseTest2.java:27)
	at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:745)
Copy the code

Example 4

Synchronous processing task failed – await

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    RuntimeException e = new RuntimeException("error...");
    log.debug("set failure, {}", e.toString());
    promise.setFailure(e);
});

log.debug("start...");
log.debug("{}", promise.getNow());
promise.await(); // Unlike sync and get, exceptions are not thrown
log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());
Copy the code

The output

12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null
12:18:54 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error...
12:18:54 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...
Copy the code

Case 5

Description Failed to process tasks asynchronously

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

promise.addListener(future -> {
    log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());
});

eventExecutors.execute(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    RuntimeException e = new RuntimeException("error...");
    log.debug("set failure, {}", e.toString());
    promise.setFailure(e);
});

log.debug("start...");
Copy the code

The output

12:04:57 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error...
12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...
Copy the code

Case 6

Await deadlock check

DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.submit(()->{
    System.out.println("1");
    try {
        promise.await();
        // Note that you cannot catch InterruptedException only
        / / otherwise deadlock inspection thrown BlockingOperationException will continue to spread up
        // The submitted task is wrapped as a PromiseTask, whose run method catches all exceptions and sets it to a Promise failure instead of throwing
    } catch (Exception e) { 
        e.printStackTrace();
    }
    System.out.println("2");
});
eventExecutors.submit(()->{
    System.out.println("3");
    try {
        promise.await();
    } catch (Exception e) {
        e.printStackTrace();
    }
    System.out.println("4");
});
Copy the code

The output

1 2 3 4 io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384) at  io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212) at com.itcast.oio.DefaultPromiseTest.lambda$main$0(DefaultPromiseTest.java:27) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745) io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384) at  io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212) at com.itcast.oio.DefaultPromiseTest.lambda$main$1(DefaultPromiseTest.java:36) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745)Copy the code

3.4 Handler & Pipeline

ChannelHandler is used to handle various events on a Channel, including inbound and outbound events. All channelhandlers are connected into a string, called a Pipeline

  • The inbound processor is usually ChannelInboundHandlerAdapter subclass, the client is mainly used to read data, write the result back
  • The outbound processor is usually ChannelOutboundHandlerAdapter subclass, mainly for processing the result of the write back

For example, each Channel is a product processing workshop, Pipeline is the assembly line in the workshop, and ChannelHandler is each process in the assembly line, while ByteBuf is the raw material that goes through many processes: First through a process of entry, and then through a process of exit into the final product

Let’s get the order straight, server

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(1);
                    ctx.fireChannelRead(msg); / / 1}}); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(2);
                    ctx.fireChannelRead(msg); / / 2}}); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    System.out.println(3);
                    ctx.channel().write(msg); / / 3}}); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                    System.out.println(4);
                    ctx.write(msg, promise); / / 4}}); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                    System.out.println(5);
                    ctx.write(msg, promise); / / 5}}); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                    System.out.println(6);
                    ctx.write(msg, promise); / / 6}}); } }) .bind(8080);
Copy the code

The client

new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1".8080)
    .addListener((ChannelFutureListener) future -> {
        future.channel().writeAndFlush("hello,world");
    });
Copy the code

Server-side printing:

One, two, three, six, five, fourCopy the code

As you can see, ChannelInboundHandlerAdapter is executed in the order of addLast, and ChannelOutboundHandlerAdapter is to comply with the provisions of the reverse order of addLast. The ChannelPipeline implementation is a two-way linked list of ChannelHandlerContext (which wraps ChannelHandler)

  • In the inbound handler, ctx.FireChannelRead (MSG) isCalls the next inbound handler
    • If you comment out 1, only 1 is printed
    • If you comment out 2 of the code, only 1 2 will be printed
  • Ctx.channel ().write(MSG) at 3 willIt starts at the tailSubsequent execution of the outbound processor
    • If you comment out 3 of the code, only 1, 2, 3 will be printed
  • Similarly, a call to ctx.write(MSG, promise) in an outbound processor will do the sameTriggers the last outbound handler
    • If you comment out 6 of the code, only 1, 2, 3, 6 will be printed
  • ctx.channel().write(msg) vs ctx.write(msg)
    • Both trigger the execution of the outbound processor
    • Ctx.channel ().write(MSG) looks for the outbound processor == from the end
    • Ctx.write (MSG) finds an outbound processor from the current node ==> from the current
    • Ctx.channel ().write(MSG) at 3 will only print 1, 2, and 3 because there are no other outbound handlers before node 3
    • Write (MSG, promise) if changed to ctx.channel().write(MSG) will print 1, 2, 3, 6, 6, 6… Because ctx.channel().write() starts at the end, the result is node 6 itself again

3.5 ByteBuf

It encapsulates byte data and expands automatically

1) create

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
log(buffer);
Copy the code

The above code creates a default ByteBuf (pooled direct memory-based ByteBuf) with an initial capacity of 10

The output

read index:0 write index:0 capacity:10
Copy the code

The log method is as follows

private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15= =0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
        .append("read index:").append(buffer.readerIndex())
        .append(" write index:").append(buffer.writerIndex())
        .append(" capacity:").append(buffer.capacity())
        .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}
Copy the code

2) Direct memory vs. heap memory

You can use the following code to create pooled heap-based ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
Copy the code

You can also use the following code to create pooled direct memory-based ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
Copy the code
  • Direct memory creation and destruction are expensive, but have high read/write performance (one less memory copy) and are suitable for use with pooling
  • Direct memory is less stressful to the GC because it is not managed by JVM garbage collection, but it is important to be proactive about releasing it in a timely manner

3) Pooled vs. unpooled

The biggest advantage of pooling is that you can reuse ByteBuf

  • Without pooling, a new instance of ByteBuf would have to be created each time, which is expensive for direct memory and, even for heap memory, increases GC stress
  • With pooling, ByteBuf instances in the pool can be reused, and a memory allocation algorithm similar to jemalloc is used to improve allocation efficiency
  • With high concurrency, pooling saves memory and reduces the possibility of memory overflow

You can use the following system environment variables to determine whether the pooling function is enabled

-Dio.netty.allocator.type={unpooled|pooled}
Copy the code
  • After 4.1, pooled implementation is enabled by default on non-Android platforms and non-pooled implementation is enabled on Android platforms
  • Before 4.1, the pooling function was not mature and the default was non-pooling

4)

ByteBuf consists of four parts

Both read and write Pointers start at 0

5) write

List of methods, omitting some unimportant methods

The method signature meaning note
writeBoolean(boolean value) Write a Boolean value 01 00 | in a byte represents the true | false
writeByte(int value) Write byte value
writeShort(int value) Writing short values
writeInt(int value) Write an int value Big Endian, 0x250, 00, 00, 02, 50
writeIntLE(int value) Write an int value Little Endian, 0x250, 50, 02, 00, 00
writeLong(long value) Writing long values
writeChar(int value) Writing char values
writeFloat(float value) Writes a float value
writeDouble(double value) Write a double value
writeBytes(ByteBuf src) Write netty ByteBuf
writeBytes(byte[] src) Write byte []
writeBytes(ByteBuffer src) Write to niO’s ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) Write string

Pay attention to

  • The undefined return value of these methods is ByteBuf, which means they can be called chained
  • Network transport, the default is Big Endian

Write four bytes first

buffer.writeBytes(new byte[] {1.2.3.4});
log(buffer);
Copy the code

As a result,

read index:0 write index:4 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d  e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 |.... | +--------+-------------------------------------------------+----------------+Copy the code

Write an int integer, also 4 bytes

buffer.writeInt(5);
log(buffer);
Copy the code

As a result,

read index:0 write index:8 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d  e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 00 00 00 05 |... | +--------+-------------------------------------------------+----------------+Copy the code

Another class of methods is a set of methods that also write data but do not change the position of the write pointer

6) capacity

When a second int is written, the capacity is insufficient (the initial capacity was 10), which causes expansion

buffer.writeInt(6);
log(buffer);
Copy the code

Expansion rules are

  • How do I select the next integer multiple of 16 if the data size does not exceed 512? For example, if the data size is 12, the capacity is 16 after capacity expansion
  • If the data size exceeds 512, select the next 2^n. For example, if the data size is 513, the capacity is 2^10=1024 (2^9=512 is insufficient).
  • An error occurs when the capacity cannot exceed Max Capacity

As a result,

read index:0 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 00 00 00 05 00 00 00 6 |... | +--------+-------------------------------------------------+----------------+Copy the code

7) read

For example, four times, one byte at a time

System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);
Copy the code

Read content, belong to the discarded parts, reread can only read those parts that have not been read

1 2 3 4 read index:4 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 05 00 00 00 6 |... | +--------+-------------------------------------------------+----------------+Copy the code

What if I need to read the int integer 5 repeatedly?

Mark can be used before read

buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);
Copy the code

The results of

5
read index:8 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 06                                     |....            |
+--------+-------------------------------------------------+----------------+
Copy the code

If you want to read again, reset to the marked position reset

buffer.resetReaderIndex();
log(buffer);
Copy the code

At this moment

read index:4 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 05 00 00 00 06 |... | +--------+-------------------------------------------------+----------------+Copy the code

Another option is to use a set of methods starting with GET that do not change the read index

8) Retain & Release

Since there is a ByteBuf implementation of out-of-heap memory in Netty, out-of-heap memory is best freed manually rather than waiting for GC garbage collection.

  • UnpooledHeapByteBuf uses JVM memory and just waits for GC to reclaim it
  • UnpooledDirectByteBuf uses direct memory and requires a special method to reclaim memory
  • PooledByteBuf and its subclasses use the pooling mechanism and require more complex rules to reclaim memory

Reclaim memory source implementation, please pay attention to the different implementations of the following methods

protected abstract void deallocate()

Netty uses reference counting to control reclaimed memory. Each ByteBuf implements the ReferenceCounted interface

  • Each ByteBuf object has an initial count of 1
  • Call the release method to decrement the count by 1, and if the count is 0, ByteBuf memory is reclaimed
  • A call to the retain method counts up to 1, indicating that no other handler will reclaim even if it calls release until the caller runs out of use
  • When the count reaches zero, the underlying memory is reclaimed, and the methods of the ByteBuf object cannot be used properly, even if it is still there

Who’s going to be responsible for release?

Not what we think (generally)

ByteBuf buf = ...
try{... }finally {
    buf.release();
}
Copy the code

Consider that because of pipeline, ByteBuf is usually passed to the next ChannelHandler. If it is released in finally, transitivity is lost (of course, If the ByteBuf has done its job in this ChannelHandler, it doesn’t need to pass.)

The basic rule is who is the end-user and who is responsible for release, as detailed below

  • Starting point, for the NIO implementation, On the io.net ty. Channel. Nio. AbstractNioByteChannel. NioByteUnsafe# create ByteBuf in the pipeline for the first time in the read method (line 163 Pipeline. FireChannelRead (byteBuf))
  • Inbound ByteBuf processing principles
    • The original ByteBuf is not processed and is passed backwards by calling ctx.FireChannelRead (MSG) without release
    • To convert the original ByteBuf to some other Type of Java object, ByteBuf is no longer useful and must be released
    • If ctx.fireChannelRead(MSG) is not passed backwards, then release must also be required
    • Note the exceptions that must be released if ByteBuf is not successfully passed to the next ChannelHandler
    • TailContext is responsible for releasing unprocessed messages (raw ByteBuf), assuming messages are always passed back.
  • Outbound ByteBuf processing principle
    • Outbound messages are eventually converted to ByteBuf output, all the way forward, and released by HeadContext flush
  • Exception Handling Principles
    • Sometimes it’s not clear how many times ByteBuf is referenced, but it must be released completely, so you can loop through release until it returns true

TailContext frees unprocessed message logic

// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally{ ReferenceCountUtil.release(msg); }}Copy the code

Specific code

// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
    if (msg instanceof ReferenceCounted) {
        return ((ReferenceCounted) msg).release();
    }
    return false;
}
Copy the code

9) slice

The original ByteBuf is sliced into multiple BytebuFs. Memory replication does not occur in the sliced ByteBuf, and the memory of the original ByteBuf is still used. The sliced ByteBuf maintains independent read and write Pointers

10) duplicate.

One of the representations of zero copy is that all the contents of the original ByteBuf are intercepted, and there is no Max capacity limit. It also uses the same underlying memory as the original ByteBuf, but the read and write Pointers are independent

4. Two-way communication

4.1 practice

Implement an Echo Server

Write the server

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    Ctx.alloc () is recommended to create ByteBuf
                    ByteBuf response = ctx.alloc().buffer();
                    response.writeBytes(buffer);
                    ctx.writeAndFlush(response);

                    // Think: Do I need to release buffer
                    // Think: Do I need to release response}}); } }).bind(8080);
Copy the code

Write the client

NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringEncoder());
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf buffer = (ByteBuf) msg;
                    System.out.println(buffer.toString(Charset.defaultCharset()));

                    // Think: Do I need to release buffer}}); } }).connect("127.0.0.1".8080).sync().channel();

channel.closeFuture().addListener(future -> {
    group.shutdownGracefully();
});

new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while (true) {
        String line = scanner.nextLine();
        if ("q".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}).start();
Copy the code

💡 Misunderstanding of reading and writing

I initially had a misconception that only in the multiplexing IO model such as Netty and NIO, read and write will not block each other, and efficient two-way communication can be achieved, but in fact, Java sockets are full duplex: At any time, there is two-way signal transmission from A to B and B to A on the line. Even if blocking IO, read and write can be done simultaneously, as long as the reader thread and the writer thread are used separately. Read does not block write and write does not block read

For example,

public class TestServer {
    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8888);
        Socket s = ss.accept();

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) { System.out.println(reader.readLine()); }}catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                // For example, if you add a thread level breakpoint at this point, you can see that the previous thread does not prevent it from reading the client data, even if no data is written
                for (int i = 0; i < 100; i++) { writer.write(String.valueOf(i)); writer.newLine(); writer.flush(); }}catch(IOException e) { e.printStackTrace(); } }).start(); }}Copy the code

The client

public class TestClient {
    public static void main(String[] args) throws IOException {
        Socket s = new Socket("localhost".8888);

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) { System.out.println(reader.readLine()); }}catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                for (int i = 0; i < 100; i++) { writer.write(String.valueOf(i)); writer.newLine(); writer.flush(); }}catch(IOException e) { e.printStackTrace(); } }).start(); }}Copy the code