This is the 21st day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

EventLoop and EventLoopGroup

The EventLoop object is essentially a single-threaded executor (which also maintains a Selector) with a run method that handles a stream of I/O events on one or more channels.

An EventLoopGroup is a group of eventloops. A Channel will call the Register method of the EventLoopGroup to bind one of the eventloops. Subsequent I/O events on this Channel are handled by EventLoop (to ensure thread-safe I/O event processing).

The use of the EventLoop

Create EventLoopGroup:

EventLoopGroup group = new NioEventLoopGroup();
EventLoopGroup group = new DefaultEventLoopGroup( );
Copy the code

The first NioEventLoopGroup is used to process I/O events, normal and scheduled tasks, and the second DefaultEventLoopGroup can only process normal and scheduled tasks. When creating a new NioEventLoopGroup, you can pass in an int number to indicate the number of threads in the group. Otherwise, use the default number of threads.

In addition, you can use group.next() to get the next LoopGroup object, use the execute() method to perform normal tasks, and use the scheduleAtFixedRate() method to handle scheduled tasks

Use the shutdownGracefully() method to disable the EventLoopGroup. This method first switches the EventLoopGroup to the closed state to reject new tasks, and then stops the thread when all tasks in the task queue have been processed. This ensures that the overall application exits in a normal and orderly manner.

Here is the code to operate the EventLoopGroup:

public static void main(String[] args) {
        EventLoopGroup group1 = new NioEventLoopGroup(3);
        EventLoopGroup group2 = new DefaultEventLoopGroup(2);


        System.out.println(group1.next());
        System.out.println(group2.next());

        group1.next().execute(()->{
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName()+"hello "+i); }}); group2.next().scheduleAtFixedRate(()->{ System.out.println(Thread.currentThread().getName()+" hello2");
        },0.1, TimeUnit.SECONDS);

        group1.shutdownGracefully();
        group2.shutdownGracefully();

    }
Copy the code

Code result:

Processing I/O Events

You can use the EventLoopGroup to handle THE IO events. The code is similar to the basic code in Netty programming (1) — Netty+ super full annotation – mining gold (juejin. Cn)

Server code:

public class HelloServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup(3))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {/ / add the handler
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception 							{
                                System.out.println(Thread.currentThread().getName()+""+s); }}); } }).bind(8080); }}Copy the code

Client code:

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel channel) throws Exception {
                        channel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost".8080))
                .sync()
                .channel()
                .writeAndFlush("hello world"+""+Thread.currentThread().getName()); }}Copy the code

Note that: Once the client and server are connected, a channel is bound to one of the eventloops in the eventloop group. This eventloop is responsible for all subsequent read and write events for the channel, as shown in the figure below. All events for each channel are handled by the same EventLoop.

Division of refinement

Refinement of 1

You can divide the eventloops in an EventLoop group even more finely, with one EventLoop handling accept events and the others handling read and write events. Bootstrap’s group() method can pass in two EventLoopGroup parameters, each responsible for handling different events. The Boss is responsible for Accept events on the ServerSocketChannel, and the Worker is responsible for read and write events on the SocketChannel

public class MyServer {
        public static void main(String[] args) {
            new ServerBootstrap()
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))... }}Copy the code

Refine the 2

EventLoop can be multiplexed, but if one read/write event takes too long, other events in the EventLoop will be affected. Therefore, an independent EventLoopGroup can be created to process events that take a long time. A non-NIoEventLoopGroup can be used to prevent other channels in the same NioEventLoop from being unprocessed for an extended period of time.

The question then arises, how does the created event loop group relate to the longer handle? AddLast () takes three arguments: the event loop group (which defaults to the top if empty), the loop group name, and the handler.

If the server receives a message from a client and needs 6s to process it (sleep for 6s), then the dormant event can be processed in a new EventLoopGroup. In addition, open 10 consecutive clients to connect to the server to test the effect. The only difference is that the for loop is used to connect to the server 10 times in a row.

Here is the server code, adding two handlers. The first is the default EventLoopGroup (the EventLoopGroup of the current ServerBootstrap). And use ctx.FireChannelRead (MSG) to pass the MSG to the second handler, which uses the newly created EventLoopGroup to handle the longer event.

public class MyServer {
    public static void main(String[] args) {
        EventLoopGroup group = new DefaultEventLoopGroup(3);

        new ServerBootstrap()
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast("nioHandler".new ChannelInboundHandlerAdapter(){

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println("nioHandler"+Thread.currentThread().getName() + "" + buf.toString(StandardCharsets.UTF_8));
                                // Pass the message to the next handler
                                ctx.fireChannelRead(msg);
                            }
                        }).addLast(group,"myhandler".new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                Thread.sleep(5000);
                                System.out.println("myhandler"+Thread.currentThread().getName() + ""+ buf.toString(StandardCharsets.UTF_8)); }}); } }) .bind(8080); }}Copy the code

The second handler does not block the first handler:

How to switch

You can switch from one EventLoopGroup to another EventLoopGroup when processing events. Another EventLoopGroup is dedicated to processing events that take longer to complete, reducing the impact of other events. How does netty switch between eventLoopGroups?

Switch the EventLoopGroup. When the Handler binds different EventLoopGroups, switch the EventLoopGroup to perform different tasks. Specifically, netty uses the following method to switch the EventLoopGroup:

  • If both handlers are bound to the same EventLoopGroup, they are called directly; Otherwise, encapsulate the code to be invoked as a task object that is invoked by the EventLoopGroup of the next handler
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    
    EventExecutor executor = next.executor(); // Get the next EventLoop, excutor is the EventLoopGroup
    
    // If the next EventLoop is in the current EventLoopGroup
    if (executor.inEventLoop()) {// Whether the current handler thread is the same thread as eventLoop
        // Use the EventLoop in the current EventLoopGroup to process the task
        next.invokeChannelRead(m);
    } else {
        // Otherwise, let EventLoop in another EventLoopGroup create the task and execute it
        executor.execute(new Runnable() {// Execute in the next thread
            public void run(a) { next.invokeChannelRead(m); }}); }}Copy the code