“This article has participated in the call for good writing activities, click to view: the back end, the big front end double track submission, 20,000 yuan prize pool waiting for you to challenge!”

NioEventLoopGroup initialization source code

One, the process of looking for source code

As we mentioned earlier, NioEventLoopGroup we can almost think of it as a thread pool that performs tasks one by one. There are two types of NioEventLoopGroup that we use, NioEventLoopGroup(int nThreads),NioEventLoopGroup(),NioEventLoopGroup() Here we take the no-parameter structure as the entrance for analysis! (Welcome to pay attention to the public number [source apprentice])

EventLoopGroup work = new NioEventLoopGroup();
Copy the code
public NioEventLoopGroup(a) {
    this(0);
}
Copy the code

When we use the default number, it passes a 0, and we keep going!

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
Copy the code

Note that the arguments passed here are :0,null

public NioEventLoopGroup(int nThreads, Executor executor) {
    // Each group maintains a SelectorProvider that is used primarily to get the Selector selector
    this(nThreads, executor, SelectorProvider.provider());
}
Copy the code

. It has a more SelectorProvider provider (), the method is the JDK NIO provides API key can obtain the NIO selector or Channel, the following figure:

Let’s go back to the main story:

public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
Copy the code

There is an additional DefaultSelectStrategy that is passed, which will be explained later in NioEventLoop.

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
Copy the code

We find here will be the default delivery a rejection policies RejectedExecutionHandlers. Reject (), the rejection strategy is why?

@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
    throw new RejectedExecutionException();
}
Copy the code

We get a conclusion, when certain conditions to trigger this rejection policies, then he will throw a RejectedExecutionException abnormalities, specific when to trigger, follow-up will also detail, here only need to remember with respect to OK!

Let’s go back to the main line, where we call the parent class, remember the parent class of the NioEventLoopGroup that we analyzed last time? That’s right: MultithreadEventLoopGroup, we will enter into MultithreadEventLoopGroup:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    // Use the default CPU * 2 when the number of threads is 0
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
Copy the code

Do you remember what nThreads is? If you have zero threads, then you use DEFAULT_EVENT_LOOP_THREADS as the number of threads in the pool. What is DEFAULT_EVENT_LOOP_THREADS?

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
Copy the code

The default is twice the number of cpus, so we now have a conclusion that when we use the default NioEventLoopGroup, the system will default to the number of system CPU cores *2 as the number of thread pools!

The selectorProvider, rejection policy, and selectStrategyFactory we passed in the previous step are wrapped into arrays and placed in args[0], args[1], and args[2]!

We continue to return to the main line, here again the call to the parent class, MultithreadEventExecutorGroup:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
Copy the code

Note, again send a more parameters: DefaultEventExecutorChooserFactory a selector factory, it will return a selector, he is DefaultEventExecutorChooserFactory type, specific analysis later! Let’s go back to the main line:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {... Follow-up source supplement.......... }Copy the code

At this point we finally see a large piece of code, which is the main logic of the EventLoopGroup, which we analyze line by line:

Second, build thread executor

1. Source code analysis

/ / newDefaultThreadFactory build thread factory
if (executor == null) {
    The default for executing a task by an executor is the DefaultThreadFactory thread pool
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
Copy the code

It’s going to tell us if the executor we passed in is empty or if it’s not, we’re going to create a new one. Do we remember what executor is? Null, right? So it’s going to go into the logic here let’s go into the source code for newDefaultThreadFactory and see:

NewDefaultThreadFactory () main logic

protected ThreadFactory newDefaultThreadFactory(a) {
    return new DefaultThreadFactory(getClass());
}
Copy the code

As you can see, a DefaultThreadFactory is passed into the executor — a DefaultThreadFactory!

ThreadPerTaskExecutor main logic:

/ io.net * * * ty. Util. Concurrent. DefaultThreadFactory# newThread (Java. Lang. Runnable) * * Every time they perform a task to create a thread * entity object@paramThe command thread * /
@Override
public void execute(Runnable command) {
    // Execute the task
    threadFactory.newThread(command).start();
}
Copy the code

We found that we called a thread factory that we passed in, created a new thread and started it by calling the start method, so how did it do that? DefaultThreadFactory is the DefaultThreadFactory, so DefaultThreadFactory#newThread

@Override
public Thread newThread(Runnable r) {
    Each time a task is executed, a thread entity is created
    Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
    try {
        if(t.isDaemon() ! = daemon) { t.setDaemon(daemon); }if (t.getPriority() != priority) {
            t.setPriority(priority);
        }
    } catch (Exception ignored) {
        // Doesn't matter even if failed to set.
    }
    return t;
}
Copy the code

There is not much to do here, but to wrap a Runnable into a Thread and return it. Let’s focus on this Thread. Is it the same as the Thread we traditionally use? Let’s follow up to the newThread method:

protected Thread newThread(Runnable r, String name) {
    //Netty encapsulates its own thread
    return new FastThreadLocalThread(threadGroup, r, name);
}
Copy the code

The logic is simple: wrap a Thread as a Netty custom FastThreadLocalThread. We won’t explain why, but we will explain it in more detail in the following sections.

2. Summary of thread executor

Here we create a thread executor called ThreadPerTaskExecutor. Using the DefaultThreadFactory DefaultThreadFactory, the thread executor wraps a task as a FastThreadLocalThread object. Then call the start method to start a new thread to execute the task!

3. Create a corresponding number of actuators

// The number of created executor arrays is the same as the preset number of threads
children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
    boolean success = false;
    try {
        // Create an executor. Start creating an executor. The executor here will probably be NioEventLoop
        children[i] = newChild(executor, args);
        success = true;
    } catch(Exception e) { ..... Omit unnecessary code}finally{... Omit unnecessary code}}Copy the code

1. Source code analysis

children = new EventExecutor[nThreads];
Copy the code

First he creates an empty Array of EventExecutor executors and then walks through the fill!

Remember what nThreads is? The default is CPU x 2, so a CPU x 2 actuator is created! We see that the main logic that fills in the for loop is newChild, so we go to the newChild method, and here’s a hint: What object is the Group object that we’re creating? NioEventLoopGroup: NioEventLoopGroup: NioEventLoopGroup: NioEventLoopGroup: NioEventLoopGroup:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
Copy the code

We passed args of length 3, which was resolved earlier:

Args [0] is selectorProvider, args[1] is rejection policy, and args[2] is selectStrategyFactory

So queueFactory is null, and then we focus on NioEventLoop. NewChild returns NioEventLoop. The NioEventLoop object is in the EventExecutor array! For now, we won’t delve too deeply into the source code of NioEventLoop initialization, which I will analyze in the next class. Here we can be sure of one thing: An EventExecutor array contains a NioEventLoop object! Let’s go back to the main line:

2. Summary of the actuator array

After the for loop completes, the EventExecutor[nThreads]; Each element is a NioEventLoop object, and each NioEventLoop object contains a ThreadPerTaskExecutor thread executor object!

Create an actuator selector

1. Source code analysis

chooser = chooserFactory.newChooser(children);
Copy the code

Remember what type of chooserFactory is? Is DefaultEventExecutorChooserFactory type, forget can look up to find the source code or in the process of debugging!

We entered the DefaultEventExecutorChooserFactory# newChooser source logic, and introduced to just our circulation filled array:

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    // Determine the power of 2 isPowerOfTwo
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        / / simple
        return newGenericEventExecutorChooser(executors); }}Copy the code

Here you can see, there seems to be two cases, return is a different strategy objects, when your array length is a power of 2 when power returns PowerOfTwoEventExecutorChooser object, otherwise returns GenericEventExecutorChooser object, Let’s analyze all two cases:

I, PowerOfTwoEventExecutorChooser
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
    this.executors = executors;
}

@Override
public EventExecutor next(a) {
    //2 is idempotent. //2 is idempotent
    // Executors are the NioEventLoop array
    return executors[idx.getAndIncrement() & executors.length - 1];
}
Copy the code

The main logic of this code is to take an incremented CAS class and do & with the array length.

As can be seen from the above picture, this function can implement a loop fetch function, each time to reach the end of the array will be back to the head to retrieve!

Code examples:

public static void main(String[] args) {
    String[] strings = {"The first"."The second"."The third"."The fourth"};
    AtomicInteger idx = new AtomicInteger();
    for (int i = 0; i < 9; i++) {
        System.out.println(strings[idx.getAndIncrement() & strings.length -1]); }}Copy the code

The result set

First, second, third, fourth, first, second, third, fourth, firstCopy the code
II, GenericEventExecutorChooser

When your thread number is not 2 to the power of the time, will go a general selector, specific implementation source code is as follows:

GenericEventExecutorChooser(EventExecutor[] executors) {
    this.executors = executors;
}

@Override
public EventExecutor next(a) {
    // Incrementing the module to achieve the purpose of the loop
    // If the length of executors is 5, the loop keeps going up to 0, 1, 2, 3, 4, 1, 2, 3, 4...
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
Copy the code

I don’t have to do this code and I’m going to show you, but it does the same thing as the one above and it can do a loop

thinking

Why does Netty implement two policy classes instead of the second?

Netty official performance requirements reached the extreme, we should know that the bit operation speed is higher than the direct modular operation, so Netty official even this point also made an optimization!

2. Summary of actuator selector

As you can see from above, a selector is created from a selector factory and stored in the NioEvenetLoopGroup. The next method calling the selector returns a NioEventLoop object, which is retrieved in an endless loop. Get NioEventLoop objects in turn, this is also a NioEventLoop to SocketChannel one-to-many basis! That’s all for later!

NioEventLoopGroup source summary

  1. Creates a thread executor when the thread executor is calledexecuteMethod, we wrap a Runable object as a Thread object, and a Thread object as a Thread objectFastThreadLocalThreadObject, and then start! Simply put, every time you call the execute method, create and start a new thread to execute the task!
  2. Create an array of actuators, the length of which depends on how many we pass, default to CPU*2, and then loop through the empty array. The elements in the array are a NioEventLoop object. Each NioEventLoop pair holds a reference to a thread’s actuator!
  3. Create an executor selector that calls the next method to return a NioEventLoop object. Each NioEventLoop can be fetched multiple times!