An overview of the

It took another three days or so to understand the whole framework of the Reactor model. It was fucking hard to learn.

Relationship between EventLoopGroup and EventLoop

Take a look at the inheritance of EventLoop and EventLoopGroup, and some key methods and objectsThere are several key pieces of information to note

  1. NioEventLoopGroup contains a Childern array that stores nioEventLoops

2. NioEventLoopGroup submit,execute and other thread pool methods actually select an EventLoop from Childern to execute

3. NioEventLoop is a single-threaded actuator. To hold aSelector, the thread just iterates over the Selector, handles the event on that Selector, and thenSee if there are any other tasks in the queue that need to be executed, such as a new Channel registration.The diagram above shows the code in action below

The source code to read

This source analysis is mainly based on the above figure to find the corresponding source

EventLoopGroup

Research on EventLoopGroup mainly starts from several aspects

  1. Loop array creation process
  2. How are tasks assigned to the internal loop after being submitted to the group

Loop array creation process

There are a lot of eventLoops in the EventLoopGroupNioEventLoopGroup newChild is implemented in the NioEventLoopGroup class, so all arrays are nioEventLoops

How are tasks assigned to the internal loop after being submitted to the group

The execute method is called when we’re looking for a thread pool to submit a task. Let’s look at the EventLoopGroup source code call

@Override
public void execute(Runnable command) {
    next().execute(command);
}
Copy the code

Call an abstract method and perform the execute, the realization of the abstract methods in MultithreadEventexecutorGroup

@Override
public EventExecutor next(a) {
    return chooser.next();
}
Copy the code

This Chooser is already initialized in the constructor and contains the entire EventLoop array

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {# check the length of the array2Multiple of PI, if it is, use another onechoose
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return newGenericEventExecutorChooser(executors); }}private static boolean isPowerOfTwo(int val) {# this bit operation is to determine whether2The number of powerreturn(val & -val) == val; } # these are all round arrays, just to increase efficiencyprivate static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    @Override
    public EventExecutor next(a) {    
        return executors[idx.getAndIncrement() & executors.length - 1]; }}private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    @Override
    public EventExecutor next(a) {
        returnexecutors[Math.abs(idx.getAndIncrement() % executors.length)]; }}Copy the code

EventLoopGroup summary

So we know that an EventLoopGroup is a bunch of EventLoopgroups, and then somebody calls the thread pool method, and they find an EventLoop in the array and throw it to them to execute. So the next logical focus is EventLoop

EventLoop

EventLoop is designed to understand the architecture again, but other details, such as channels, promises, etc., will be covered in other chapters.

  1. EventLoop listens for the event’s code logic for handling the event
  2. What is the process by which BossGroup registers new connections to WorkerGroup

EventLoop listens for the event’s code logic for handling the event

Before we start, let’s look at what happens when an EventLoop is created. Recall that the EventLoop creation is implemented in the newChild method of the NioEventLoopGroup constructor

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}
Copy the code

Here you can see that you put a provider and a selector, and then call the constructor of the parent class

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    tailTasks = newTaskQueue(maxPendingTasks);
}
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
Copy the code

The parent class creates a LinkedBlockingQueue and then calls the parent class’s constructor

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
Copy the code

You can see that the parent class holds an executor of ThreadPerTaskExecutor and then a LinkedBlockingQueue. Call the parent class to save which EventLoopGroup you belong to. At this point, we know what’s inside a NioEventLoop

To understand the logic for handling event notifications, start withexecuteMethods to obtain, in SingleThreadEventExecutor class implements the execute method

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task"); } # check whether the EventLoop thread called this methodboolean inEventLoop = inEventLoop();
    if(inEventLoop) {# If yes, add the task to the queue. }else{# otherwise start an EventLoop thread startThread(); Queue addTask(task);if(isShutdown() && removeTask(task)) { reject(); }}if(! addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); }} # add task to queueprotected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (!offerTask(task)) {
        reject(task);
    }
}

final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}
Copy the code

If the method is not called from an Executor thread, then try to open the thread pool

private void startThread(a) {# note that if the thread pool is not opened, it will not be opened, so this is a single thread poolif (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); }}}} # I deleted a large number of statements irrelevant to this logicprivate void doStartThread(a) {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run(a) Thread = thread.currentThread (); # perform abstract methods run SingleThreadEventExecutor.this.run(); }}); }Copy the code

Can be seen from the above code, SingleThreadEventExecutor keep single-threaded logic is that to tasks in the queue, if the state of the thread pool is not open, open it. So executor’s execute method can only be executed once, on a single thread. So our logic comes into this abstract run method that I took the first screenshot of, so let’s take a closer look at the logic.

@Override
protected void run(a) {# create an infinite loop where the entire EventLoop thread is stuck. Why is that oneloop
    for (;;) {
        tryEventLoop () {EventLoop () {EventLoop () {EventLoop () {EventLoop ()Select
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                caseSELECT (wakenup.getandSet (wakenup.getandSet (wakenup.getandSet (wakenup.getandSet (wakenup.getandSet ())))false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }
            
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try{# handle selector generated by selectorselectorKey
                    processSelectedKeys(a);
                } finally {
                    // Ensure we always run tasks.RunAllTasks (); }}else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}}catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return; }}}catch(Throwable t) { handleLoopException(t); }}}Copy the code

When out of the select (wakenUp getAndSet (false)); So when we loop, we’re either saying that there’s a taskQueue task, or that the selector has received the data, and we’re doing both of those things in our code. So let’s do the Selector

private void processSelectedKeysOptimized(a) {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        finalObject a = k.attachment(); processSelectedKey(k, (AbstractNioChannel) a); }}Copy the code

Iterate over the key produced by this selector. Call the processSelectedKey method, in which the event is distributed

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    try {
        
        intreadyOps = k.readyOps(); OP_CONNECT event is raised if the client connects to the serverif((readyOps & SelectionKey.OP_CONNECT) ! =0) {
            intops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } # write events are not registered. If there is a buffer in the kernel, write events will be responded immediatelyif((readyOps & SelectionKey.OP_WRITE) ! =0) { ch.unsafe().forceFlush(); } # read events and accept client eventsif((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code

Even though OP_WRITE is rarely used, the unsafe.read () and OP_ACCEPT () methods are both used for reading events

What is the process by which BossGroup registers new connections to WorkerGroup

The above analysis stops at the safe.read method, which handles both read events and client connection events, which brings us to our second question, how Netty drops a client connection from the BossGroup into the WorkerGroup. First, analyze the process after the event from the perspective of NIO

  1. The BossGroup NioEventLoop accepts a Channel, which is the client.
  2. The BossGroup registers this Channel with the Selector of the EventLoop in the WorkerGroup.
  3. The EventLoop in the WorkerGroup then proceeds to iterate over its Selector

Because a large number of Channel and Pileline operations are involved in this process, it is necessary to skip this process and look directly at the results to sort out the overall process


Unbroadening. Read Already reads the client’s Channel, in readBufThe read data will come inServerBootStrapIn channelRead, Boos and Worker groups are bound at startupAnd we know that this childGroup is a workergroupThen we call next’s register, and we know we’re picking a register from our array, so the method goes to the register in the SingleThreadEventLoop. And wrapped a PromiseThis step is to register the Channel with the EventLoop, which has a Selector in it, so you need to find the Selector. Register in the NIO methodAbstractChannel. Note that many Netty methods will check the inEventLoop, but don’t really care, just know that it will end up in the queue of the Loop and wait for the Loop to pull the taskNow what’s going on in register0This doRegister lets different channels implement custom registration methods, so let’s go straight to AbstractNioChannelIt’s pretty clear up to this point that the boss is done assigning a channel to the worker’s selector, but there is one more point that the registered listening event is 0. Because there is a bug in the JDK,Netty author’s explanation. If 0 is registered successfully, the listener event will be registered again, which is shown by a breakpoint in AbstractNioChannel’s doBeginRead method

conclusion

So far the whole process is finished, review again

  1. BossGroup starts EventLoop, which is a single-thread loop that keeps listening for events generated by the selector. In the run method of NioEventLoop
  2. Process the event in the processSelectedKey method of the NioEventLoop when it is heard. Found the registration event, so go to the ServerBootStrap channelRead method,This completes the step of the boss passing channel to the worker
  3. The worker gets the channel and registers it with her ownselectorAnd then I’m going to go through my selector. Complete event listening