This is the 7th day of my participation in the August Text Challenge.More challenges in August

The Redis server is an event driver, and the server needs to handle two types of events:

  • File events: Redis server connects to clients through sockets, and file events are the server’s abstraction of socket operations
  • Time events: Some operations in the Redis server need to be performed at a given point in time

File events

Redis developed its own network event handler based on the Reactor model, which is called the file event handler:

  • File event handlerI/O multiplexingProgram to listen on multiple sockets at the same time and associate the socket with different event handlers based on the task the socket is currently performing.
  • When the listened socket is ready to perform accept, read, write, close, etc., the file event corresponding to the operation will be generated, and the file event handler will call the event handler associated with the socket for processing

File event handler composition

The file event handler is divided into four parts: socket, I/O multiplexer, file event dispatcher, and event handler.

Although multiple file events may occur concurrently, the I/O multiplexer will always place all event-generating sockets in a queue and deliver sockets to the file event dispatcher through the queue in an ordered, synchronous, one-socket at a time fashion. The next socket will continue to be sent when the previous one completes.

I/O multiplexing program implementation

All of Redis’s I/O multiplexing functions are implemented by wrapping the common SELECT, epoll, EVport, and Kqueue I/O multiplexing libraries, each of which corresponds to a separate file in the Redis source code.

Redis I/O multiplexing program in the implementation of the source code with macro defined the corresponding rules, the program will automatically select the system in the compilation of the highest performance I/O multiplexing function library as the underlying implementation of Redis.

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif
Copy the code

File event handler

Redis has written multiple handlers for file events, each of which is used to meet different network traffic requirements.

  • In order to respond to the various clients connecting to the server, the server associates the listening socketConnection response processor
  • In order to receive a command request from a client, the server associates a listening socketCommand request handler
  • To return the execution result of the command to the client, the server associates the listening socketCommand reply handler
  • When replication is performed on the master and slave servers, both the master and slave servers need to be related specifically to the replication functionCopy processor

Time event

Redis time events fall into two categories:

  • Timed event: To cause a program to execute once after a specified time
  • Periodic events: To cause a program to execute at specified intervals

A time event consists mainly of three properties:

  • Id: indicates the unique ID of a server
  • When: Indicates the arrival time of the event
  • TimeProc: time event handler

implementation

The server keeps all time events in an unordered linked list, and whenever the time event executor runs, it traverses the list, looking for all time events that have arrived, and invokes the appropriate handler.

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being * freed in recursive time event calls. */
} aeTimeEvent;
Copy the code

Example: The serverCron function

The continuously running Redis server needs to check and adjust its resources and status periodically to ensure long-term and stable operation of the server. These periodic operations are performed by server.c/serverCron functions, mainly including:

  • Update statistics of the server
  • Clean up stale key-value pairs in the database
  • Close and clean up clients with failed connections
  • An AOF or RDB persistence operation was attempted
  • If the server is the primary, the secondary server is synchronized periodically
  • If you are in cluster mode, perform periodic synchronization and connection tests on the cluster

Scheduling and execution of events

Because there are both file and temporal event types in the server, the server must schedule both events, deciding when file events should be processed, when temporal events should be processed, and how long to process them.

The ae.c/aeProcessEvents functions are responsible for the scheduling and execution of events

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 获取到达时间离当前时间最接近时间事件
            shortest = aeSearchNearestTimer(eventLoop);
        
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }

        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event later. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsyncing a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
Copy the code