Image credit: Libuv

Author: Xiao Siyuan

Learning node.js is, in any case, impossible without Libuv. This article takes a look inside Libuv along the lines of its Linux implementation

Why Linux

As an asynchronous event-driven JavaScript runtime, Node.js is designed to build scalable network applications

About Node.js

Node.js, as a sharp tool for front-end students to explore the service side business, is determined to build a scalable network application. The current server environment is mainly Linux, but the other major server environment, Unix, has a very similar API to Linux, so choosing Linux as a starting point may be a double gain and double pleasure

Libuv with Linux

Here is the architecture of libuv’s official website:

From the perspective of Linux platform alone, libuv’s main work can be divided into two parts:

  • This section describes how to process THE I/O operations supported by epoll
  • Thread pool, which processes I/O operations not supported by epoll

Epoll profile

To get to the bottom of it, we’ll start with epoll

Simply put, epoll is a system call provided by the Linux kernel through which our application can:

  • Tell the system to help us monitor multiple file descriptors simultaneously
  • When the I/O operational status of one or more of these file descriptors changes, our application receives an Event notification from the system.

Event loop

We use a little pseudocode to demonstrate the core steps when using epoll:

// Create epoll instance
int epfd = epoll_create(MAX_EVENTS);
// Add the file descriptor you want to listen on to the epoll instance, in this case 'listen_sock'
epoll_ctl_add(epfd, listen_sock, EPOLLIN | EPOLLOUT | EPOLLET);

while(1) {
  // Wait for notifications from epoll when the state of the file descriptor changes
  // The system notifies the application. The notice shall take the following form:
  //
  // The epoll_wait call will not return immediately, the system will take place in the file descriptor state
  // return when changing
  //
  // after the epoll_wait call returns:
  // NFDS indicates the number of file descriptors that have changed
  // Events will hold the current event and its number is NFDS
  int nfds = epoll_wait(epfd, events, MAX_EVENTS, - 1);

  // Iterate through events to respond to events as expected by the application
  for (int i = 0; i < nfds; i++) {
    // consume events[i]}}Copy the code

See epoll-echo-server for a complete example

Comments are already included in the code above, which can be summarized as follows:

Therefore, epoll at the bottom of Libuv also has the concept of “event cycle”, which shows that event cycle is not unique to Libuv

When it comes to epoll, it is necessary to mention two triggering modes: level-triggered and edge-triggered. I have to mention them because they are related to epoll’s event triggering mechanism, plus the name is somewhat obscure

Level trigger

Both terms come from the field of electronics, and we understand them from their original meanings

The first is the horizontal trigger:

Electrical Concepts

The diagram above shows the time sequence of voltage changes, with VH representing the peak voltage and VL representing the valley value of the phone. Horizontal triggering means that the system activates the corresponding circuit (trigger) whenever the voltage is at its peak over time.

Edge trigger

Electrical Concepts

The diagram above is still a sequence diagram of voltage changes, but the condition that activates the circuit (trigger) is a voltage change, that is, a voltage change from VH -> VL, VL -> VH state. This change is represented by Rising Edge and Falling Edge, So it’s called edge-triggered

We can get a general idea of their forms and differences, continuing with the following epoll

In the epoll

Back to epoll, horizontal triggering and edge triggering, as derivatives of the original meaning, of course still have meanings similar to those in the field of electronics

So, to understand this, let’s say we have a fd (File Descriptor) that means we just set up a client connection, and then the client sends us 5 bytes of content,

If the trigger is horizontal:

  • Our application will be woken up by the system because the fd state has become readable
  • We read 1 byte from the system buffer and did some business operations
  • Enter a new event loop, waiting for the system to wake up next time
  • The system continues to wake up our application because there are 4 bytes of unread content in the buffer

If it is edge trigger:

  • Our application will be woken up by the system because the fd state has become readable
  • We read 1 byte from the system buffer and did some business operations
  • Enter a new event loop, waiting for the system to wake up next time
  • The system will not wake up our application until the next time the client sends something, such as 2 bytes. (Because the state of the FD has not changed until the next time the client sends the request, the system will not wake up the application under the edge trigger.)
  • The system wakes up our application with a buffer of 6 bytes = (4 + 2) bytes

It is difficult to relate the literal meaning of horizontal and edge triggering to the above behavior, but we have a prior understanding of their meaning in the field of electronics

Horizontal firing, because it is already readable, will keep firing until we have read through the buffer and the system buffer has no new client content to send; Edge triggering corresponds to a change in state. Each time a new client sends content, the state is set to readable, so it only fires at this time

Horizontal triggering is the default triggering mode for epoll and is used in Libuv. Now that we know the difference between horizontal and edge triggers, we can actually guess the reasoning behind libuv’s use of horizontal triggers instead of edge triggers:

If it is an edge trigger, epoll’s objective ability does not require us to read the contents of the buffer all at once (it can wait until the next time the client sends the contents). But in real business, the client is probably waiting for our response (understood in connection with HTTP protocol), and we are still waiting for the next write from the client, so deadlock logic. As a result, reading the contents of the buffer all at once is almost mandatory in edge-triggered mode, which inevitably leads to longer waits for other callbacks, making CPU time unevenly distributed among callbacks

limitations

Epoll cannot be used for all I/O operations. For example, read and write operations of files cannot enjoy the convenience of epoll

So Libuv’s work can be summarized as follows:

  • Abstract epoll-like system calls on various operating systems (such as Kqueue on Unix and IOCP on Windows) out of a unified API (internal API)
  • For IO operations that can utilize system calls, the unified API is preferred
  • For the I/O operations that are not supported or not supported enough, the asynchronous API is simulated by using Thread pools
  • Finally, encapsulate the above details internally, providing a unified API externally

Back to libuv

Back to Libuv, we will continue to explore the details of libuv implementation on Linux with the event-loop as the main context, combined with the epoll mentioned above and the thread pool introduced below

event-loop

We’ll review the basic concepts of Event-loop in conjunction with the source code

The following image, also taken from the Libuv website, describes the inner workings of event-loop:

From libuv-Design Overview

It may be too abstract to look at the flow chart alone, but here is the complete internal implementation of libuv:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if(! r)uv__update_time(loop);

  // It's a loop
  while(r ! =0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    // Process the timer queue
    uv__run_timers(loop);
    // Process pending queues
    ran_pending = uv__run_pending(loop);
    // Process idle queues
    uv__run_idle(loop);
    // Process the prepare queue
    uv__run_prepare(loop);

    / / io_poll execution
    uv__io_poll(loop, timeout);
    uv__metrics_update_idle_time(loop);

    // Execute the check queue
    uv__run_check(loop);
    // Execute closing queues
    uv__run_closing_handles(loop);

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) break;
  }

  return r;
}
Copy the code

The reason the various forms of callbacks (such as setTimeout) have different priorities is because they use different queues, and different queues execute in different order during each iteration of the event loop

Handle and Request

As described on the official website, they are abstractions of the operations performed in event-loops, with the former representing long-lived operations and the latter representing ephemeral operations. It’s not easy to read the text alone, but let’s see how they’re used differently

For long-running operations represented by Handle, their apis have a form similar to the following:

/ / IO operations
int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, uv_os_sock_t socket);
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb);
int uv_poll_stop(uv_poll_t* poll);

// timer
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle);
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat);
int uv_timer_stop(uv_timer_t* handle);
Copy the code

There are generally (but not all) three steps: initialize -> Start -> Stop. This makes sense, because it is a long-running operation that will continue to be processed once it has started, so you need to schedule a “stop” API

For transient operations represented by Request, such as domain name resolution operations:

int uv_getaddrinfo(uv_loop_t* loop, uv_getaddrinfo_t* req, uv_getaddrinfo_cb getaddrinfo_cb, / *... * /);
Copy the code

The interactive form of a domain name resolution operation is that we submit the address to be resolved and the method returns the resolution result (this feels a bit like an HTTP 1.0 Request), so the reason for naming such an operation request-request becomes pictorial

However, Handle and Request are not mutually exclusive concepts, and Request may also be used in the internal implementation of Handle. Some long-term operations in macro view can be regarded as requests in each time slice. For example, if we process a Request, it can be regarded as a Handle. However, in the current Request, we may perform some read and write operations, which can be regarded as requests

timer

We analyze its internal implementation by using the API exposed by Timer as a clue:

int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle);
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat);
int uv_timer_stop(uv_timer_t* handle);
Copy the code

Uv_timer_init nothing special, just initialize the state of handle and add it to loop->handle_queue

Uv_timer_start does this internally:

int uv_timer_start(uv_timer_t* handle,
                   uv_timer_cb cb,
                   uint64_t timeout,
                   uint64_t repeat) {
  uint64_t clamped_timeout;

  // loop->time indicates the current time of loop. Loop updates the value with the next time at the start of each iteration
  // Clamped_timeout is the future time of the timer, which is calculated directly, so that it is not needed in the future
  // Run the timers command to query the timers
  if (clamped_timeout < timeout)
    clamped_timeout = (uint64_t) - 1;

  handle->timer_cb = cb;
  handle->timeout = clamped_timeout;
  handle->repeat = repeat;

  // In addition to the pre-calculated Clamped_timeout, in the future when clamped_timeout is the same, use this
  // Add start_id as a comparison condition to determine the execution order of Handle
  handle->start_id = handle->loop->timer_counter++;

  // Insert handle into timer_heap, where the heap is binary min heap, so the root node is
  // Clamped_timeout (or start_id) specifies the handle with the smallest value
  heap_insert(timer_heap(handle->loop),
              (struct heap_node*) &handle->heap_node,
              timer_less_than);
  // Set the start state of handle
  uv__handle_start(handle);

  return 0;
}
Copy the code

Uv_timer_stop does this internally:

int uv_timer_stop(uv_timer_t* handle) {
  if (!uv__is_active(handle))
    return 0;

  // Remove handle from timer_heap, as with heap_INSERT, except that it is removed
  // Timer_heap is also maintained to ensure that it is always binary min heap
  heap_remove(timer_heap(handle->loop),
              (struct heap_node*) &handle->heap_node,
              timer_less_than);
  // Set the state of handle to Stop
  uv__handle_stop(handle);

  return 0;
}
Copy the code

So far we have seen that start and stop can be roughly summarized as inserting or removing handle into and out of the loop->timer_heap property using a data structure called binary min heap

Then let’s go back to uv_run:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  // ...
  while(r ! =0 && loop->stop_flag == 0) {
    // ...
    uv__update_time(loop);
    uv__run_timers(loop);
    // ...
  }
  // ...
}
Copy the code

Uv__update_time we’ve already seen, which sets the loop->time property with the current time at the beginning of the loop

We only need to take a final look at the contents of uv__run_timers to concatenate the process:

void uv__run_timers(uv_loop_t* loop) {
  struct heap_node* heap_node;
  uv_timer_t* handle;

  for (;;) {
    // Take the root node, which is always the handle to be executed
    // The first one to time out
    heap_node = heap_min(timer_heap(loop));
    if (heap_node == NULL)
      break;

    handle = container_of(heap_node, uv_timer_t, heap_node);
    if (handle->timeout > loop->time)
      break;

    // Stop, remove handle, and maintain timer_heap
    uv_timer_stop(handle);
    // If it is a handle that requires a repeat, add it back to timer_heap
    // This method will continue execution in the next event loop
    uv_timer_again(handle);
    // Execute the callback corresponding to timeout handle
    handle->timer_cb(handle); }}Copy the code

The above is the approximate implementation of Timer in Libuv

min heap

As we will see later, all handles except the timer are stored in a data structure named Queue, while the data structure that holds the timer handle is the min heap. So let’s see what this differential choice means

A min heap is actually (see Binary Tree for a more thorough introduction) :

  • complete binary tree
  • The root node is the smallest node in the tree

Binary tree = binary tree = binary tree

  • All nodes have a maximum of two children

The complete Binary tree is defined as:

  • Each node in the layer except the last has two child nodes
  • The last layer of manipulation logic is from left to right (try to fill the left side)

Here are a few examples:

Complete binary Tree 18 / \ 15 30 / \ / \ 40 50 100 40 / \ / 8 7 9 Because the last layer does not prioritize the left 18 / \ 40 30 / \ 100 40 min heap example, the root node is the minimum and the parent node is always smaller than its children: 18 / \ 40 30 / \ 100 40Copy the code

The required operations for the Timer Handle in Libuv are:

  • Add and remove a Timer Handle
  • To quickly getclamped_timeoutThe smallest Timer handle

Min Heap addresses these requirements:

  • Compared with arrays, it is more efficient to insert and remove
  • More efficient to maintain extremums (here minimums) than linked lists

Heap is implemented in the file heap-inl. H, I added some comments, interested students can continue to explore

pending

Now that we have seen the processing of the first timer in each iteration of the event loop, let’s look at the processing of the second pending queue:

static int uv__run_pending(uv_loop_t* loop) {
  QUEUE* q;
  QUEUE pq;
  uv__io_t* w;

  if (QUEUE_EMPTY(&loop->pending_queue))
    return 0;

  QUEUE_MOVE(&loop->pending_queue, &pq);

  // Continuously pop elements from the queue for operation
  while (!QUEUE_EMPTY(&pq)) {
    q = QUEUE_HEAD(&pq);
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);
    w = QUEUE_DATA(q, uv__io_t, pending_queue);
    w->cb(loop, w, POLLOUT);
  }

  return 1;
}
Copy the code

From the source, it simply pops elements from the loop->pending_queue and executes them, and the pop-up elements are attributes of the uv__io_t structure, which by definition should be IO related operations

In addition, the loop->pending_queue inserts only the uv__io_feed function, which is called to perform some io-related finishing touches

queue

Queue, like the min Heap mentioned above, is the main data structure used, so we’ll introduce it when we first see it

Min heap implementation is relatively deeper, so provide a source-based comment heap-inl. H for interested readers to understand in depth, and queue is relatively simple, plus the source code will appear everywhere operation queue macro, understand what these macros do, will let read the source code more at ease

Let’s take a look at the queue and its macros for some common operations, starting with the initial state:

Queue is designed as a ring structure in Libuv, so the starting state is next and prev pointing to themselves

Let’s see what happens when we add a new element to the queue:

The figure above is divided into two parts. The upper part is the existing queue, h is its current head, and Q is the element to be inserted. The lower part is the result of insertion. The red in the figure represents the path of PREV, and the purple represents the path of Next. Along the path, we can find that they are always a ring structure

The QUEUE_INSERT_TAIL shown above is inserted at the end of the queue as the name implies, but because it is a ring structure, we need to modify the reference relationships between the head, tail, and element to be inserted

Now look at the form of removing an element:

Removing an element is easy by concatenating the element’s prev to next. Once concatenated, the element is skipped, leaving it in the removed state (not accessible in the path).

Moving on to the operation of connecting two queues:

It may seem very complicated, but in fact it is just a matter of untying the two rings and joining them end to end to form a new ring. Here, the corresponding relationship between the code and the connection action is marked by 1 and 2 through the diagram of stream of consciousness

Finally, take a look at the operation that splits the queue:

In the figure above, the corresponding relationship between the code and the connection action is also marked by 1 and 2 in the way of stream of consciousness drawing. Cut the queue starting with h at q, and connect h to the elements before q to form a new queue; N is the beginning of another queue, q and the end of the queue before the break form another queue

The above shows some typical queue operations, so those interested can check out Queue.h

Idle, check, prepare

You may wonder why they are not introduced in the order in which they appear in the event loop, and why they are grouped together

It would be even weirder if you searched uv__run_idle or uv__run_check in the source code, because we could only find their declarations, not even their definitions

In fact, they are all macros generated in loop-watcher.c, because they all do the same thing — just pull the Handle from the respective queue and execute it

Idle is not a queue that is executed when the event loop is idle. It is executed during each iteration of the time loop, not idle at all

The idle queue and prepare queue are executed in sequence. The idle queue and prepare queue are executed in sequence.

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  // ...
  while(r ! =0 && loop->stop_flag == 0) {
    // ...
    uv__run_idle(loop);
    uv__run_prepare(loop);
    uv__io_poll(loop, timeout);
    // ...
  }
  // ...
}
Copy the code

Now we have a handle that we want to execute before uv__io_poll. Do we add it to idle or prepare?

If the handle is prepared for the following uv__io_poll, it can be added to the prepare queue, while the rest can be added to idle. I think the same setting applies to check, which runs after io_poll and allows the user to do some work to verify the I/O execution results, making the task queue more semantic

io poll

For io_poll, we start with the event loop

Start with the event loop

Here is a snippet of the loop of events already described above:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  // ...
  while(r ! =0 && loop->stop_flag == 0) {
    // ...
    timeout = 0;
    if((mode == UV_RUN_ONCE && ! ran_pending) || mode == UV_RUN_DEFAULT) timeout =uv_backend_timeout(loop);

    uv__io_poll(loop, timeout);
    // ...
  }
  // ...
}
Copy the code

The above code calculates a timeout for calling uv__io_poll(loop, timeout)

Is indeed epoll

Uv__io_poll is defined in linux-core.c, and although it is a function with comments that are close to 300 lines long, the core logic is the use of epoll shown at the beginning:

void uv__io_poll(uv_loop_t* loop, int timeout) {
  while (!QUEUE_EMPTY(&loop->watcher_queue)) {
    // ...
    // 'loop->backend_fd' is an epoll instance created using 'epoll_create'
    epoll_ctl(loop->backend_fd, op, w->fd, &e)
    // ...
  }

  // ...
  for (;;) {
  // ...
    if (/ *... * /) {
      // ...
    } else {
      // ...
      // 'epoll_wait' and 'epoll_pwait' are only slightly different, so only the former will be considered here
      nfds = epoll_wait(loop->backend_fd,
                        events,
                        ARRAY_SIZE(events),
                        timeout);
      // ...}}// ...

  for (i = 0; i < nfds; i++) {
    // ...
    w = loop->watchers[fd];
    // ...
    w->cb(loop, w, pe->events); }}Copy the code

timeout

The timeout parameter of epoll_wait has the following meanings:

  • If it is- 1Wait until an event occurs
  • If it is0Returns immediately, containing the event generated when the call was made
  • If it is the rest of the integer, thenmillisecondsIs a unit specified to a future system time slice

Uv_backend_timeout () {backend_timeout ();

int uv_backend_timeout(const uv_loop_t* loop) {
  // The time loop was stopped externally, so let 'uv__io_poll' understand return
  // To close the loop as quickly as possible
  if(loop->stop_flag ! =0)
    return 0;

  // If there is no handle and request to be processed, there is no need to wait, which also makes' uv__io_poll '
  Return as soon as possible
  if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
    return 0;

  // The idle queue is not empty and also requires' uv__io_poll 'to be returned as soon as possible so that the next time loop can be entered as soon as possible
  // Otherwise, idle generates too much latency
  if (!QUEUE_EMPTY(&loop->idle_handles))
    return 0;

  // The same purpose as the previous step, except that the pending queue is replaced
  if (!QUEUE_EMPTY(&loop->pending_queue))
    return 0;

  // Same purpose as the previous step, except that the handles are closed to avoid the creation of the target queue
  // Too much latency
  if (loop->closing_handles)
    return 0;

  return uv__next_timeout(loop);
}

int uv__next_timeout(const uv_loop_t* loop) {
  const struct heap_node* heap_node;
  const uv_timer_t* handle;
  uint64_t diff;

  heap_node = heap_min(timer_heap(loop));
  // If there is no timer to be processed, the block can safely wait for the event to arrive
  if (heap_node == NULL)
    return - 1; /* block indefinitely */

  handle = container_of(heap_node, uv_timer_t, heap_node);
  // There is a timer, and the timer has reached the time to be executed, the need to 'uv__io_poll'
  // Return as soon as possible to process the timed timer within the next iteration of the event loop
  if (handle->timeout <= loop->time)
    return 0;

  // There is no timer timeout. Use the minimum timeout time minus the difference between the current loop time and the current loop time as the timeout time
  // Since there is no timer timeout for this difference time, the block can safely wait
  / / epoll event
  diff = handle->timeout - loop->time;
  if (diff > INT_MAX)
    diff = INT_MAX;

  return (int) diff;
}
Copy the code

The uv__next_timeout implementation above is divided into three main parts:

  • Only if there are no timers to be processed- 1, combined with the pairs at the beginning of this sectionepoll_waittimeoutInterpretation of parameters,- 1Will let the follow-upuv__io_pollEnter the block state and wait completely for the event to arrive
  • When there is a timer and there is a timeout timerhandle->timeout <= loop->time, the return0So that theuv__io_pollDoes not block the event loop in order to quickly enter the next event loop to execute the timeout timer
  • When there is a timer but no timeout, the minimum timeout time is calculateddiffAs auv__io_pollBlocking time of

I don’t know if you have noticed that the core guiding idea of timeout calculation is to make CPU time as evenly distributed as possible in multiple iterations of the event cycle and execution of multiple different task queues, so as to avoid high delay caused by a certain type of task

chestnut

Now that we know how io_poll queues are executed, let’s take a look at the io_poll queue from a small echo server:

uv_loop_t *loop;

void echo_write(uv_write_t *req, int status) {
  // ...
  // Some do not have, but there are so-called no finishing touches
}

void echo_read(uv_stream_t *client, ssize_t nread, uv_buf_t buf) {
  // ...
  // Create a write Request (the difference between Request and Handle has been described above).
  // Write the read client content back to the client and enter the callback 'echo_write' when the write is complete
  uv_write_t *write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
  uv_write(write_req, client, &buf, 1, echo_write);
}

void on_new_connection(uv_stream_t *server, int status) {
  // ...
  // Create a client instance and associate it with the event loop
  uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
  uv_tcp_init(loop, client);
  // Set up a connection with the client, read the client input, and then enter the 'echo_read' callback
  if (uv_accept(server, (uv_stream_t*) client) == 0) {
    uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
  }
  // ...
}

int main(a) {
  // Create an event loop
  loop = uv_default_loop(a);// Create a server instance and associate the event loop
  uv_tcp_t server;
  uv_tcp_init(loop, &server);
  // ...
  // Bind the server to a port and accept the request
  uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0".7000));
  // When a new client request arrives, it goes into the 'on_new_connection' callback
  uv_listen((uv_stream_t*) &server, 128, on_new_connection);
  // ...

  // Start the event loop
  return uv_run(loop, UV_RUN_DEFAULT);
}
Copy the code

Thead pool

So far, we have confirmed that the io_poll internal implementation is indeed using epoll. At the beginning of this article, we mentioned that ePoll does not currently handle all I/O operations, and libuv uses its internal thread pool to simulate asynchronous I/OS for those that ePoll does not support. Let’s take a look at how thread pools work in general

create

Since we already know that we can’t use epoll to read or write files, we can follow this clue by going to the uv__work_submit method via the internal implementation of uv_fs_read and finding the thread pool initialized in it:

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  uv_once(&once, init_once);
  // ...
  post(&w->wq, kind);
}
Copy the code

So the creation of a thread pool is a lazy singleton. Init_once calls init_threads internally to initialize the thread pool:

static uv_thread_t default_threads[4];

static void init_threads(void) {
  // ...
  nthreads = ARRAY_SIZE(default_threads);
  val = getenv("UV_THREADPOOL_SIZE");
  // ...
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort(a);// ...
}
Copy the code

From the above implementation, we know that the default thread pool has 4 threads and can be respecified using the UV_THREADPOOL_SIZE environment variable

Post (& W ->wq, kind) is used to submit tasks to the thread pool.

static void post(QUEUE* q, enum uv__work_kind kind) {
  uv_mutex_lock(&mutex);
  // ...
  // Insert the task into the queue shared by the thread 'wq'
  QUEUE_INSERT_TAIL(&wq, q);
  // If there are idle threads, tell them to start working
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  uv_mutex_unlock(&mutex);
}
Copy the code

For submitted tasks, insert the task into the thread sharing queue wq and notify them to work only when there are idle threads. If there are no idle threads, is the task being ignored? If there is any work waiting to be done in the wq queue, it will continue to complete. If there is no work waiting to be done in the WQ queue, it will go to sleep and wait for the next time to wake up.

How to schedule tasks

Threads create(threads + I, worker, &sem) {threads create(threads + I, worker, &sem);

// thread pool wq to which all submitted tasks are linked first
static QUEUE wq;

static void worker(void* arg) {
  // ...
  // 'IDLE_THREADS' and 'run_slow_work_message' are shared by threads, so add a lock
  uv_mutex_lock(&mutex);
  for (;;) {
    // "no task" is true
    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      // There is nothing to do because there is no task when rolling to the current process
      // Number of idle threads +1
      idle_threads += 1;
      
      // Inside 'uv_cond_wait' is called with 'pthread_cond_wait' :
      // - Puts the thread into a wait state, waiting for the condition variable 'cond' to change
      // - unlock 'mutex'
      //
      // After that, other threads can use 'uv_cond_signal' internal is' pthread_cond_signal '
      // To broadcast a change in the condition variable 'cond', the operating system will randomly wake up a waiting 'cond'
      // Change the thread and call the previously passed 'mutex' before the waked thread's uv_cond_wait call returns
      // The parameters are locked
      //
      // Therefore, 'mutex' must be locked after the loop breaks out
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }
    // ...
    // The queue is locked, so it is safe to do the eject operation
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);
    // ...
    // Since the eject is complete, it can be unlocked so that other threads can continue to manipulate the queue
    uv_mutex_unlock(&mutex);

    // take advantage of the small feature of c struct, do field offset, get 'q' belongs to 'uv__work' instance
    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    'w->loop->wq'
    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL; 

    // this is different from wq in the thread pool
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);

    // Wake up the main thread event loop
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);

    // This step is necessary because it is needed again at the beginning of the next iteration
    // Operate on shared memory, but don't worry about deadlocks, because it's the same as the next iteration
    // The 'uv_cond_wait' unlock operation is corresponding
    uv_mutex_lock(&mutex);
    // ...}}Copy the code

Above, we reserved the relatively important content and annotated it. It can be roughly summarized as:

  • For threads in the thread pool, passesuv_cond_waitTo be awakened
  • The thread is woken up fromwqTo find a task to do, complete the task and wake up the main thread, because the callback needs to be executed on the main thread
  • Then the next iteration, if there are tasks, continue to complete, until there are no tasks, passuv_cond_waitFall asleep again
  • Wakeup is done by using it in another threaduv_cond_signalTo tell the operating system to do scheduling
  • Thread pooling is a scalable design where threads go to sleep when there are no tasks, and active threads try to wake up sleeping threads when there are more tasks

Wake up main thread

When the thread pool completes the task, the main thread needs to be notified to perform the corresponding callback. The notification method is very interesting. Let’s look at the event loop initialization operation uv_loop_init:

int uv_loop_init(uv_loop_t* loop) {
  // ...
  // Initialize the min heap and various queues to hold various handles
  heap_init((struct heap*) &loop->timer_heap);
  QUEUE_INIT(&loop->wq);
  QUEUE_INIT(&loop->idle_handles);
  QUEUE_INIT(&loop->async_handles);
  QUEUE_INIT(&loop->check_handles);
  QUEUE_INIT(&loop->prepare_handles);
  QUEUE_INIT(&loop->handle_queue);

  // ...
  // Call 'epoll_create' to create an epoll instance
  err = uv__platform_loop_init(loop);
  if (err)
    goto fail_platform_init;

  // ...
  // For initialization of thread pool notifications
  err = uv_async_init(loop, &loop->wq_async, uv__work_done);
  // ...
}
Copy the code

Uv_async_init is used to initialize thread pool notification related work in the above code. Here is its function signature:

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb);
Copy the code

So the third argument, uv__work_done, is actually a callback function, and we can look at its contents:

void uv__work_done(uv_async_t* handle) {
  struct uv__work* w;
  uv_loop_t* loop;
  QUEUE* q;
  QUEUE wq;
  int err;

  loop = container_of(handle, uv_loop_t, wq_async);
  uv_mutex_lock(&loop->wq_mutex);
  // Move the current 'loop->wq' to the local variable 'wq',
  //
  // the contents in 'loop->wq' are used after the tasks in the worker above are completed
  // 'QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq)' added
  //
  // Release the lock as soon as possible, so that other tasks can access it as soon as possible
  QUEUE_MOVE(&loop->wq, &wq);
  uv_mutex_unlock(&loop->wq_mutex);

  // Iterate through 'wq' to perform the completion callback for each of the tasks
  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    w->done(w, err); }}Copy the code

Uv__work_done: uv__work_done: uv__work_done

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  // ...
  / / for investigation
  err = uv__async_start(loop);
  // ...

  // Create an async Handle
  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  // In the current context async_cb is' uv__work_done '
  handle->async_cb = async_cb;
  handle->pending = 0;

  // Add async Handle to queue 'loop-> Async_handles'
  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  // ...
}
Copy the code

Let’s continue with the uv__async_start we investigated earlier:

static int uv__async_start(uv_loop_t* loop) {
  // ...
  // 'eventfd' can create an epoll internally maintained FD that looks like any other real FD (such as a socket FD)
  // Added to an epoll instance to listen for its readable events and write to it, so user code can use this
  // Seemingly virtual fd to implement the event subscription
  err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  if (err < 0)
    return UV__ERR(errno);

  pipefd[0] = err;
  pipefd[1] = - 1;
  // ...

  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}
Copy the code

We know that epoll supports Socket FDS, and epoll event scheduling is very efficient for supported FDS. For unsupported IO operations, Libuv creates a virtual FD using EventFD and continues to utilize the event scheduling capabilities of fd

Let’s look at the details of uv__io_start above to confirm the steps for the event subscription:

void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  // ...

  // If you look at the 'uv__io_poll' section above, you will find the loop->watcher_queue
  // Add all FDS to the epoll instance to subscribe to the action of their events
  if (QUEUE_EMPTY(&w->watcher_queue))
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);

  // The operation associated with the fd and the corresponding task can also be seen above 'uv__io_poll' when the event is received
  // After the notification, there will be an action to retrieve the task from 'loop-> Watchers' based on fd and execute its completion callback
  // Also, make sure watcher is not added repeatedly according to fd
  if (loop->watchers[w->fd] == NULL) { loop->watchers[w->fd] = w; loop->nfds++; }}Copy the code

With the event subscription step confirmed, let’s look at the event callback. The argument w above is loop->async_io_watcher It is initialized with uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]).

void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);
Copy the code

So uv__async_io is the callback function that receives the virtual FD event, so let’s see what it says:

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  // ...
  // Make sure 'w' is always' loop->async_io_watcher '
  assert(w == &loop->async_io_watcher);

  for (;;) {
    // Read something from it, 'w->fd' is the virtual fd created with 'eventfd' above
    // If nothing else, the way to notify the other end is to write something to the fd. We can confirm later
    // The purpose of reading something from it is to prevent the buffer from being filled with meaningless bytes used for notifications
    r = read(w->fd, buf, sizeof(buf));
    // ...
  }

  // Execute the 'loop-> async_HANDLES' queue, the actual callback of the task
  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    q = QUEUE_HEAD(&queue);
    h = QUEUE_DATA(q, uv_async_t, queue);

    QUEUE_REMOVE(q);
    QUEUE_INSERT_TAIL(&loop->async_handles, q);

    // ...
    h->async_cb(h); }}Copy the code

We already know how events are subscribed to, and how events are responded to

Then continue to confirm how event notifications are triggered in the thread pool. Uv_async_send is the open API for waking up the main thread, which is actually the internal API for calling uv__async_send:

static void uv__async_send(uv_loop_t* loop) {
  const void* buf;
  ssize_t len;
  int fd;
 
  // ...
  fd = loop->async_io_watcher.fd; 

  do
    // Write data to a virtual fd created by 'eventfd'
    // All that's left is for EPoll's efficient event scheduling mechanism to wake up the event subscriber
    r = write(fd, buf, len);
  while (r == - 1 && errno == EINTR);

  // ...
}
Copy the code

We conclude with a stream of consciousness diagram to summarize the thread pool flow above:

In the figure above, our task is at uv__run_idle(loop); The callback is executed via uv__work_submit, but in practice, for an application that uses an event loop, the entire application time slice is divided into different queue callbacks, so it is actually possible to submit tasks from other queues

closing

As we explained at the beginning, only Handle has a closed API because Request is a short task. To close Handle, use uv_close:

void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
  assert(!uv__is_closing(handle));

  handle->flags |= UV_HANDLE_CLOSING;
  handle->close_cb = close_cb;

  switch (handle->type) {
  // Perform resource reclamation according to the different Handle types
  case UV_NAMED_PIPE:
    uv__pipe_close((uv_pipe_t*)handle);
    break;

  case UV_TTY:
    uv__stream_close((uv_stream_t*)handle);
    break;

  case UV_TCP:
    uv__tcp_close((uv_tcp_t*)handle);
    break;
  // ...

  default:
    assert(0);
  }
  
  // Add to 'loop->closing_handles'
  uv__make_close_pending(handle);
}

void uv__make_close_pending(uv_handle_t* handle) {
  assert(handle->flags & UV_HANDLE_CLOSING);
  assert(! (handle->flags & UV_HANDLE_CLOSED)); handle->next_closing = handle->loop->closing_handles; handle->loop->closing_handles = handle; }Copy the code

After closing the Handle with a call to uv_close, libuv frees the Handle (such as closing the FD) and then connects the Handle to the Closing_handles queue with a call to uv__make_CLOSE_pending. This queue is executed in an event loop by a uv__run_closing_handles(loop) call

Since closing_handles is the last queue to be executed, callbacks on the remaining queues, those passed when uV_CLOSE is executed, will be executed in the next iteration

summary

This article along the context of Libuv Linux implementation of its internal implementation of a simple exploration, trying to unlock the mystery of Libuv. Obviously, reading this article alone is not enough, and HOPEFULLY it will be a starting point for further understanding of Libuv. We’ll look at node.js later to see how they connect internally

This article is published from netease Cloud Music big front end team, the article is prohibited to be reproduced in any form without authorization. Grp.music – Fe (at) Corp.Netease.com We recruit front-end, iOS and Android all year long. If you are ready to change your job and you like cloud music, join us!