The Redis version improved iteratively

Redis 3.x single-threaded era but still fast performance is the main reason

  1. Memory based operation: All data is stored in memory, the read and write speed is very fast, the memory response time is about 100 nanoseconds, all operations are memory level, so the performance is relatively high;
  2. Simple data structure: Some of the commonly used data structures are specially designed. For example, the Simple Dynamic String designed by ourselves is used as the underlying data structure of the String object, and the time complexity of acquiring the String length is increased to O(1).
  3. I/O multiplexing: the use of I/O multiplexing function to monitor multiple socket connection client, also can use a thread to process multiple requests, to achieve the effect of concurrent processing requests;
  4. Avoid context switching: The single-threaded model avoids the time and performance costs of multi-thread switching and multi-thread contention, and avoids deadlocks.

There are three main reasons why single threads were used until Redis 4.0:

  1. Using a single-threaded model, simple development and maintenance;
  2. By using a non-blocking I/O multiplexing model, requests from multiple clients can be processed concurrently;
  3. The main performance bottleneck for Redis is memory or Network bound, not CPU. (The primary and decisive reason)

Redis 4.0 introduced multithreading for delete operations for the following reasons:

Normally, the deletion of two objects with small data is ok, but when the deleted objects are very large, such as tens of megabytes of objects, the memory space cannot be freed in a short time, at this time, the single-thread deletion will block other read and write operations.

Therefore, the natural idea is to stop letting the main thread handle the deletion and start an asynchronous thread to slowly delete the object. Redis 4.0 introduces commands such as unlink key, flushall async, and FLUSHDB async to lazily process deletion operations, which are mainly used for asynchronous deletion of Redis data. It still has only one thread handling read and write requests, so Redis is still called single-threaded. (The Redis fathers initially wanted to use single-threaded removal optimization, but the performance degradation was greater when the system was busy, so they used asynchronous threads instead.)

Redis 6.0 introduces multithreading to improve network I/O performance

With the increasing traffic of Internet service system, the network I/O performance bottleneck of Redis becomes more and more obvious. Single threads do not take full advantage of the multicore CPUS of modern computers, although the official recommendation is to open several instances to take advantage of multicore.

Therefore, I/O multithreaded read and write is introduced in Redis 6.0. The main idea is that the main thread no longer carries out socket data read and write, but opens a group of independent threads to monitor and process socket data read and write and request parsing, so as to achieve parallel read and write effect. This allows for more efficient and concurrent network I/O processing. The read-write process is multi-threaded, and the execution of the command is actually single-threaded.

In Redis6.0, multi-threading is disabled by default. To use multi-threading, you need to configure it in redis.conf: 1 IO -thread-do-reads yes, indicating that multi-threading is enabled. 2 IO -thread 3 Sets the number of threads. The official recommendation is 2 or 3 for a 4-core CPU and 6 for an 8-core CPU. The number of threads must be smaller than the number of machine cores. Larger threads are not always better.

Development of I/O multiplexing models

There are five IO models for Unix network programming, Multiplex is Blocking IO, NoneBlocking IO, IO multiplexing, signal driven IO, asynchronous IO. Redis uses IO multiplexing model implementation, this paper focuses on the development of blocking IO to non-blocking IO, and then the development of IO multiplexing model process, as for the signal-driven IO and asynchronous IO, this paper will not be introduced.

The basic concept

User space and kernel space

First, the CPU instruction set is the set of instructions that enable software to execute hardware. CPU instruction operation is not standard, will have a significant impact on the entire system. Therefore, CPU instruction sets must have permissions. For example, Intel divides CPU instruction sets into ring 0, Ring 1, Ring 2, and Ring 3 in descending order. In Linux, only ring 0 and ring 3 permissions are used. Ring 0 has the highest permission and can use all CPU commands. Ring 3 has the lowest permissions and can only use regular CPU instructions without direct access to hardware and all memory. (The user state and kernel state described below can be simply understood as the instruction set corresponding to ring 3 and Ring 0 permissions respectively.) Ok, why can’t I access all of the memory here?

In terms of memory resources, if applications are allowed to access arbitrary locations in the memory space, the system is likely to crash due to application running or errors, such as deleting all memory data operations. Therefore, it is dangerous for an application to access arbitrary memory space. Therefore, the operating system divides the memory into the memory space used by the operating system kernel and the memory space accessed by applications. For example, in Linux, the memory is 4 gb, and 1 GB of high-address memory is exclusively used by the operating system kernel. This part of memory is called kernel space. Low-address 3G of memory is used by applications and is called user space. However, this does not mean that the system kernel cannot use low-address 3G memory space, that is, the kernel space can only be used by the system kernel, user space can be used by the system kernel and applications.

User mode and kernel mode

As the name implies, a process or thread running in user space is in user state, and a process or thread running in kernel space is in kernel state. There are three ways to switch from user mode to kernel mode: system call, soft interrupt (such as exception), and hard interrupt (such as I/O operation).

Why is switching between user mode and kernel mode expensive?

For example, when switching from user mode to kernel mode, you need to record user state such as register context, copy user state parameters to kernel mode, consistency and security check, and so on. If the user mode is switched again, the data needs to be copied to the user mode and the user context needs to be restored. This process is time-consuming, so switching between the two states is expensive.

Synchronous and asynchronous

For the called (service provider), the notification method used to describe the result returned by the called.

If the result is returned after the called is finished, the caller is always waiting for the result to be returned, which is called the synchronization process. If the called returns a value immediately after being requested (it is possible that the program is not executed, and the value is not necessarily the result of the program execution, but returns a value first), the called actively notifies the caller (such as Callback) of the result after execution, which is called asynchronous.

Blocking and non-blocking

For the caller, it describes the state of the caller while waiting for the result.

If the caller waits for the service to return the result after the service is called, the current thread is suspended, which is called blocking. If the caller returns immediately and does not wait for the result to return, the current thread continues and is said to be non-blocking.

Blocking IO (BIO)

The recvfrom() function is a system call to receive data from the socket and capture the send address of the data source.

When recvfrom() is called by the user process, the entire process is blocked. When the kernel data is ready, the data is copied from the memory buffer to user space (the user memory buffer), and the result is returned. The user process is unblocked and continues to run.

Why does the kernel need to prepare data? Because for network IO, there are many times when data does not arrive, such as waiting for the client to input data or not receiving a complete UDP packet. Therefore, a preparation process is required for data to be copied into the operating system kernel space (kernel buffer).

Disadvantages: Multi-socket connections are not supported, because the whole process is blocked and the next socket client can only be connected after the current socket connection is released.

Solutions:

  1. Multithreading? Every time a socket is connected, a thread should be opened up. If it comes to 100,000, it should be opened up 100,000 threads. This method is obviously not feasible.
  2. Using thread pools? The socket client can be used when the connection is small, which can alleviate the resource occupation to a certain extent. However, when the number of users is large, the size of the thread pool is difficult to estimate, and the pool size must be adjusted according to the response scale. If it is too small, the response to the outside world is no better than if there is no pool; Too large, wasting memory space.
  3. Non-blocking IO: The socket is set to non-blocking. If data is not ready, an error message is returned indicating that data is noready. Instead of opening multiple threads to handle multiple socket connections, you can poll through the socket connection and ask if the data is ready. This is non-blocking IO.

Non-blocking IO (NIO)

Whenever a client connects to a server, the socket is placed into an array, and the main process polls to make an IO call until data is ready.

Disadvantages: Polling questions, polling will constantly ask the kernel, this process involves the switch between user mode and kernel mode, the overhead is very high, it will occupy a lot of CPU time, low utilization of system resources; If 100,000 sockets are connected, 100,000 sockets are traversed each time. If only 10 sockets have data, 100,000 sockets are traversed, wasting resources and reducing efficiency.

Solution: Put the polling process in kernel state. Since the overhead between two states is high and takes up a lot of CPU time, avoid switching between two states and put the polling process in kernel state. Instead of switching between two states, get results directly from the kernel. (However, this method solves the problem of two-state switchover caused by polling, and does not solve the problem of efficiency reduction and resource waste in the case of “massive” socket connections. In the follow-up introduction of THE SELECT, poll and epoll implementations of I/O multiplexing, only epoll solves the problem of “massive” socket connections.)

I/O multiplexing

By using a process to monitor multiple file descriptors (socket connection for easy to understand), in particular, using select, poll, epoll to monitor the occurrence of multiple FILE descriptors I/O events, once one or more descriptors are ready, wake up from the blocking state. The program then polls all file descriptors (epoll polls only those for which I/O events actually occurred), processing the ready descriptors in turn. Therefore, I/O multiplexing is characterized by a process waiting for any or more of multiple file descriptors to enter the ready state at the same time, and then the block can be unblocked and the result returned for processing.

How do SELECT, poll, and epoll monitor multiple file descriptors at the same time? That is, the polling process of NIO is carried out in the kernel state, that is, the implementation of SELECT, poll and epoll is the polling process described by NIO. The following describes the three I/O multiplexing modes: SELECT, poll, and epoll

File descriptors is a computer science term, an abstract concept used to describe references to files. The file descriptor is formally a non-negative integer. In fact, it is an index value that points to the record table of open files that the kernel maintains for each process. When a program opens an existing file or creates a new file, the kernel returns a file descriptor to the process. In programming, some low-level programming tends to revolve around file descriptors. But the concept of file descriptors is usually only applicable to operating systems such as UNIX and Linux.

select

An array of file descriptors passed in by the user is copied into kernel space. The select function monitors these file descriptors until a descriptor is ready (readable, writable, or except) or times out. When the select function returns, the file descriptor is traversed to find the ready descriptor.

int select(
    int maxfdpl, // The maximum number of file descriptors to monitor is incremented by 1
    fd_set *readset, // The file descriptor for the monitored read event
    fd_set *writeset, // Monitor the file descriptor for write events
    fd_set *exceptset, // The file descriptor for the exception event being monitored
    struct timeval *timeout // How long can it take to wait for any of the specified descriptors to be ready: wait forever, wait a fixed amount of time, wait no
) 
// Set fd_set (a collection of file descriptors) with the following four macros. The underlying fd_set is implemented by bitmap
FD_ZERO(int fd, fd_set* fds)   // Clear the collection
FD_SET(int fd, fd_set* fds)    // Add the descriptor to the collection
FD_ISSET(int fd, fd_set* fds)  // Determine whether the descriptor is in the collection
FD_CLR(int fd, fd_set* fds)    // Delete the descriptor
Copy the code
// An example
int fds[SIZE];
int maxfd;
fd_set readfds;
struct timeval timeout;
fd = socket(AF_INET, SOCK_STREAM, 0);
bind(fd,(struct sockaddr*)&servaddr,sizeof(servaddr))
listen(fd,LISTENQ);

for (i = 0; i < SIZE; i++) {  // Add the new connection descriptor to the array to simulate the SIZE of client connections
    fds[i] = accept(fd,(struct sockaddr*)&clint,&addrlen);
    if (fd[i] > maxfd) {  // Find the largest file descriptor
        maxfd = fd[i]
    }
}

while (1) {  
  FD_ZERO(&readfds);  // Initializes bits
  for(i = 0; i < SIZE; i++){  // Add the file descriptor to the collection
      FD_SET(fds[i],&readfds);
  }
  nfds = select(maxfd + 1, &readfds, null, null, &timeout);  // Block on this line when there is no data, set readfds to the corresponding position when the socket data is ready
  for (i = 0; i < SIZE; i++) {  // Iterate over all FDS to determine which bit was set and whether read/write events occurred
    if (FD_ISSET(fds[i], &readfds)) {
        read()// This handles the read event}}}Copy the code

Advantages: Select function NIO polling process in the kernel state, let the kernel state to traverse, avoid frequent switching between the two states, save overhead.

Disadvantages:

  1. The underlying fD_set type is implemented by bitmap with a maximum of 1024 bits, that is, a process can process a maximum of 1024 clients.
  2. Each time the select function is called, the array of file descriptors needs to be copied to the kernel state, which can be expensive in high-concurrency scenarios.
  3. Select returns the number of readable file descriptors, does not tell the user which socket has data, still requires O(n) traversal;
  4. For “massive” file descriptors, each call to SELECT loops through all file descriptors, which is inefficient and wasteful of resources.

poll

The only difference between poll() and select() is that poll() replaces Bitmap with a custom structure pollfd that encapsulates file descriptors, POLLIN/POLLRDNORM, POLLOUT/POLLWRNORM, and POLLERR are all read, write, and error events (POLLIN/POLLRDNORM, POLLOUT/POLLWRNORM, and POLLERR) registered with the Event attribute of the pollFD. Set the revents of pollfd and return. Polling all the PollFd to determine whether the file descriptor has an event based on the Revent field.

int poll(
    struct pollfd *fdarry,  // A pointer to the first element of the pollfd array
    nfds_t nfds,  // Queue length
    int timeout  // The timeout period) struct pollfd {int fd;                         // The file descriptor to monitor
    short events;                   // Events that require kernel monitoring
    short revents;                  // The actual event
};
Copy the code
// An example
struct pollfd pollfds[SIZE];
nfds_t nfds;
int timeout;
fd = socket(AF_INET, SOCK_STREAM, 0);
bind(fd,(struct sockaddr*)&servaddr,sizeof(servaddr))
listen(fd,LISTENQ);

for (i = 0; i < SIZE; i++) {  // Add the new connection descriptor to the array to simulate the SIZE of client connections
    pollfds[i].fd = accept(fd,(struct sockaddr*)&clint,&addrlen);
    pollfds[i].events = POLLIN;
}

while (1) {  
  poll(pollfds, nfds, timeout);  // Pass the pollfd array into the kernel to determine whether an event has occurred. If so, set the corresponding Revents
  for (i = 0; i < SIZE; i++) {  // Iterate through the array to determine which pollfd is set
    if (pollfds[i].revents & POLLIN)) {
        pollfds[i].revents = 0;  // Set revents to 0
      	read()// This handles the read event}}}Copy the code

Advantages: Use the structure PollFD instead of bitmap in SELECT, eliminating the 1024 limit and allowing more file descriptors to be processed at once.

Disadvantages:

  1. Copying pollFDS arrays into kernel mode is expensive in high-concurrency scenarios.
  2. Poll does not tell the user which socket has data and still requires O(n) traversal;
  3. For “massive” file descriptors, poll loops through all file descriptors each time it is called, which is inefficient and wasteful of resources.

epoll

Epoll_create (int size) is called to create an epollPoll object (this object exists in the kernel, that is, both red-black trees and double-endided lists exist in the kernel space). Size is the maximum number of FDS that the kernel can guarantee to handle correctly. However, in Linux 2.6.8, this parameter is weakened, as long as it is not 0, so that the maximum number of fd can be dynamically increased.

Call epoll_ctl() to delete the file identifier into the kernel red black tree. The FD in the red black tree is the fd that needs to be monitored. Second, bind a callback function to the memory interrupt handler that places the file identifier in a double-linked list (ready list) whenever an event occurs.

Epoll_wait () When an event occurs in a monitored FD, that is, when data arrives on a socket, copies the data to the kernel buffer, initiates an interrupt, uses a callback function to put the file identifier into the ready list, and returns the user-mode process.

int epoll_create(int size);  // The kernel initializes an eventPoll object
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);   //负责把文件标识符增删改到内核红黑树,增(EPOLL_CTL_ADD),删(EPOLL_CTL_DEL),改(EPOLL_CTL_MOD)
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);   // Wait for events

struct eventpoll {
    struct rb_root  rbr;  // The root node of the red-black tree, which stores all the added file identifiers to be monitored
    struct list_head rdlist;  // Double linked lists hold file identifiers generated by events
};
Copy the code
// An example
fd = socket(AF_INET, SOCK_STREAM, 0);
bind(fd,(struct sockaddr*)&servaddr,sizeof(servaddr))
listen(fd,LISTENQ);
struct epoll_event events[SIZE];  // The epoll_event data structure is similar to poll, except that there is no Revents attribute
int epfd = epoll_create(SIZE);  Create an eventPoll object in the kernel

for (i = 0; i < SIZE; i++) {  // Add the new file descriptor to the eventPoll object
    struct epoll_event events;
    events[i].data.fd = accept(fd,(struct sockaddr*)&clint,&addrlen);
    events[i].events = EPOLLIN;
    epoll_ctl(epfd,EPOLL_CTL_ADD,events[i].data.fd,&events); 
}

while (1) {  
  nfds = epoll_wait(epfd,events,20.0);  // When the file descriptor is ready, put it in the ready list and return the number of ready FDS
  for (i = 0; i < nfds; i++) {  // Iterate over the ready list
      read()// This handles the read event}}Copy the code

Advantages:

  1. Only one kernel-state copy of file identifiers is made, that is, usingepoll_ctl()Copy the new fd to the kernel state, call laterepoll_wait()When no longer copy, save overhead;
  2. The user process traverses the ready linked list without traversing all file identifiers, and the time complexity is reduced to O(1).
  3. IO efficiency does not decrease linearly with the increase of file identifiers. The problem of socket connection under large concurrency is solved by interrupt + callback + double linked list.

Cons: Only in Linux.

conclusion

select poll epoll
The data structure bitmap An array of Red and black tree
Maximum number of connections supported 1024 There is no upper limit There is no upper limit
Fd copy To call select, copy the FD array to the kernel state To call poll, copy the FD array to the kernel state When epoll_ctl is called, the fd is copied into the kernel and saved. When epoll_wait is called, the FD is not copied
Process time complexity O(n) O(n) O(1)
Underlying mode of operation polling polling The callback

Redis threading model

Single-threaded message processing model

Redis core network model adopts Reactor to implement network event processor, which is called file event processor. File event handlers use I/O multiplexing to monitor multiple sockets and associate different event handlers to the socket based on the task the socket is currently performing. It consists of four parts: multiple sockets, I/O multiplexer, file event dispatcher and event handler. Because the file event dispatcher runs in a single-threaded fashion, Redis is called the single-threaded model.

What is a file event? A file event is an abstraction of a socket operation. Each socket operation corresponds to a file event. For example, the connection, read, write, and close of a socket correspond to different file events.

Files event handler using I/O multiplexing procedures to monitor multiple socket at the same time, when the listening socket is ready to perform connection response, read, write, closed operation, such as with operation will generate the corresponding files events, put all the socket generate events in a queue, and then through the queue, Deliver sockets to the file event dispatcher in a sequentially, synchronously, one-socket at a time. After the event generated by the previous socket has been processed (the socket has been executed for the event handler associated with the event), the file event dispatcher proceeds to process the next socket.

The following figure shows the Redis thread model as a source code process. Description: The solid line arrow represents the sequential execution of functions at the same level, and the dotted line represents the internal running process of functions.

File event processing flow

  • The beforesleep() and aeProcessEvents() functions are looped until eventLoop->stop is 0.

  • Beforesleep (phase)

    • Traverse the queue clients_pending_write Writes back data;
    • If writeToClient() is not finished, register the write callback sendReplyToClient to wait for the next write;
  • The aeProcessEvents() function has four important stages, namely aeApiPoll, ready file event execution stage, and execution time event stage. The following three stages are introduced according to the process execution order:

    • AeApiPoll phase

      • Wait for the event to happen, and the block returns; (The timeout parameter of epoll_wait() depends on the detection of time before beforesleep.)
    • Ready file event execution phase

      • After the execution of aeApiPoll, a ready file event exists, and the ready file time is traversed for processing.
      • In case of a readable event, execute the callback function readQueryFromClient() to read the socket data into the input buffer and execute the command. After the command is executed, if any response data needs to be written back, add the client to the queue to be written out.
      • If there is any residual data in the write buffer, register a command regress sendReplyToClient for the client and wait for the write back.
    • Execution time event phase

      • If a time event exists, the command is executed.

Multithreaded message processing flow

The only difference with the single-threaded message processing process is that the main thread no longer performs data reading and writing on the socket. Instead, a set of independent threads are enabled to monitor and process data reading and writing of the socket, request parsing, etc. Command execution is still done by the main thread. Therefore, although multithreading is added to process network I/O, it still belongs to the single-thread Reactor model, and the difference between the Reactor multithreading model and the Reactor multithreading model is not introduced here.

In the figure below, you can simply think of the InitServerLast processing part as multithreading, with the main thread running sequentially in aeMain.

Description: The solid line arrow represents the sequential execution of functions at the same level, and the dotted line represents the internal running process of functions.

IO thread initialization and execution logic

  • The IOThreadMain() function is initialized once for each thread.

  • The IOThreadMain() function contains an infinite loop;

    • Inside the infinite loop, first polling 1 million times to determine the number of tasks in the task counter. If the number is 0, wait for the main thread to allocate I/O tasks.
    • If there is no task after polling (the current thread will be locked into sleep state when it is started for the first time), it will be unlocked after the main thread allocates tasks.
    • After the main thread allocates a task, io_threads_list traverses the task queue. If the current operation is a write operation, the data in the write buffer of the client is written back to the client. In the case of a read operation, readQueryFromClient() is called to read socket client data into the input buffer, parse the first command, but do not execute the command;
    • When all tasks in io_threads_list are complete, set the task counter io_threads_pending to 0.
void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    char thdname[16];   

    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);
    makeThreadKillable();

    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if(getIOPendingCount(id) ! =0) break;
        }

        /* Give the main thread a chance to stop this thread. */
        if (getIOPendingCount(id) == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue; } serverAssert(getIOPendingCount(id) ! =0);

        /* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        setIOPendingCount(id, 0); }}Copy the code

File event processing flow

  • AeMain () is called and aeProcessEvents() is looped until eventLoop->stop is 0;

  • The aeProcessEvents() function has four important stages, which are beforesleep, aeApiPoll, ready file event execution stage, and execution time event stage. The following four stages are introduced in order of process execution:

    • Beforesleep phase

      • The client queue waiting to be read is traversed clientS_pending_read, and the client polling is evenly distributed between the I/O thread and the main thread;
      • Set the io_threads_pending task counter for each thread based on the number of clients allocated.
      • Polling until all threads’ task counters are 0, indicating that all threads’ tasks are finished; (Now wait for the result of multi-thread execution above)
      • Traverse the queue clients_pending_read, remove the CLIENT_PENDING_READ flag, and run the command.
      • If response data needs to be written back after the command is executed, the client is added to the queue to be written out.
      • The queue is traversed clients_pending_write, with polling evenly distributed between the I/O thread and the main thread;
      • Polling until all threads’ task counters are 0, indicating that all threads’ tasks are finished;
      • Run through the clientS_pending_write queue again. If there is residual data in the client write buffer, register a command RSVP sendReplyToClient for the client and wait for the write back.
    • AeApiPoll phase

      • Wait for the event to happen, and the block returns; (The timeout parameter of epoll_wait() depends on the detection of time before beforesleep.)
    • Ready file event execution phase

      • After the execution of aeApiPoll, a ready file event exists, and the ready file time is traversed for processing.
      • If the event is readable, execute the callback readQueryFromClient() to add the client to the clients_pending_read task queue for subsequent IO threads to process.
      • If the event is writable, the callback function writeToClient() is executed to write the buffer’s data back to the client.
    • Execution time event phase

      • If a time event exists, the command is executed.
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}
Copy the code
/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags  the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if(! (flags & AE_TIME_EVENTS) && ! (flags & AE_FILE_EVENTS))return 0;

    /* Note that we want to call select() even if there are no * file events to process as long as we want to process time *  events, in order to sleep until the next time event is ready * to fire. */
    if(eventLoop->maxfd ! =- 1|| ((flags & AE_TIME_EVENTS) && ! (flags & AE_DONT_WAIT))) {int j;
        struct timeval tv, *tvp;
        int64_t usUntilTimer = - 1;

        if(flags & AE_TIME_EVENTS && ! (flags & AE_DONT_WAIT)) usUntilTimer = usUntilEarliestTimer(eventLoop);if (usUntilTimer >= 0) {
            tv.tv_sec = usUntilTimer / 1000000;
            tv.tv_usec = usUntilTimer % 1000000;
            tvp = &tv;
        } else {
            /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero * /
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */}}if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }

        if(eventLoop->beforesleep ! =NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if(eventLoop->aftersleep ! =NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsyncing a file to disk, * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */
            if(! invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd];/* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if(! fired || fe->wfileProc ! = fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; }}/* If we have to invert the call, fire the readable event now * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if((fe->mask & mask & AE_READABLE) && (! fired || fe->wfileProc ! = fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; }}/* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
Copy the code
void beforeSleep(struct aeEventLoop *eventLoop) {
    /* We should handle pending reads clients ASAP after event loop. */handleClientsWithPendingReadsUsingThreads(); ./* Handle writes with pending output buffers. */handleClientsWithPendingWritesUsingThreads(); . }int handleClientsWithPendingReadsUsingThreads(void) {
    if(! server.io_threads_active || ! server.io_threads_do_reads)return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;    
    }

    /* Run the list of clients again to process the new buffers. */
    while(listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); serverAssert(! (c->flags & CLIENT_BLOCKED));if (processPendingCommandsAndResetClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid * processing the client later. So we just go * to the next. */
            continue;
        }

        processInputBuffer(c);

        /* We may have pending replies if a thread readQueryFromClient() produced * replies and did not install a write handler (it can't). */
        if(! (c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) clientInstallWriteHandler(c); }/* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}

int handleClientsWithPendingWritesUsingThreads(void) {
    int processed = listLength(server.clients_pending_write);
    if (processed == 0) return 0; /* Return ASAP if there are no clients. */

    /* If I/O threads are disabled or we have few clients to serve, don't * use I/O threads, but the boring synchronous code. */
    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
        return handleClientsWithPendingWrites();
    }

    /* Start threads if needed. */
    if(! server.io_threads_active) startThreadedIO();/* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;

        /* Remove clients from the list of pending writes since * they are going to be closed ASAP. */
        if (c->flags & CLIENT_CLOSE_ASAP) {
            listDelNode(server.clients_pending_write, ln);
            continue;
        }

        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_WRITE;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        writeToClient(c,0);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    /* Run the list of clients again to install the write handler where * needed. */
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        /* Install the write handler if there are pending writes in some * of the clients. */
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    listEmpty(server.clients_pending_write);

    /* Update processed count on server */
    server.stat_io_writes_processed += processed;

    return processed;
}
Copy the code
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;

    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
    if (retval > 0) {
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[j];

            if (fe->mask == AE_NONE) continue;
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if(fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; }}return numevents;
}
Copy the code

reference

Redis5 Design and Source Code Analysis

Redis Design and Implementation

UNIX Network Programming Volume 1: Socket Networking apis (version 3)

Redis multithreaded network model full disclosure

Redis source code analysis and annotation technology blog column

Thoroughly understand the Reactor model and Proactor model

Redis 6.0 multithreaded IO processing process in detail

If there is infringement, please contact to delete; If there are any mistakes, please correct them.