Redis is event-driven

Redis performs well and is a single threaded framework. Thanks to the technology of asynchronous IO and multiplexing, Redis uses the reactor mode to process a large number of IO operations in a message-driven way, which can make good use of CPU resources. Because there are no synchronous calls, the processing is very fast. This enables high concurrency performance when multiple clients access redis-server. So how exactly does Redis work?

1 Redis multiplexing technology

Redis is a C/S architecture framework, so it supports multiple clients to access the Server side through the network. In order to support database operation requests from multiple clients simultaneously, Redis-server uses IO multiplexing technology.

In a thread, the system API(SELECT, poll, epoll, etc.) provided by the system UNIX, at the same time n file descriptor FD (socket can also be abstracted into file descriptor), read and write listening, once the system listening fd readable/writable event occurs, Through the system API function, the corresponding FD can be obtained, and the corresponding file events can be dispatched and processed at the same time.

Similar to a teacher (Redis-server) looking after a class of n students (n redis-CLI sockets), as soon as a student raises his hand (socket file descriptor readable and writable event), the teacher immediately processes the student’s request (file event distributor), and returns immediately after processing. Look at n students in a class and see if anyone else raises their hand and do it over and over again.

Epoll, Kqueue, Select, evport are all UNIX multiplexing interfaces, because Redis for unix-like operating system compatibility is actually better, so Redis support these interfaces. The corresponding code implementations are ae_epoll.c, AE_kqueue. c, AE_select. c, ae_evport.c.

Since I am using the Ubuntu operating system, this article uses epoll as an example to see how the epoll event driver of Redis is implemented.

2 Redis epoll source code analysis

2.1 Initialization of Redis EventPoll

When redi-Server is started, it goes to the initServer() function, which calls redisServer server. The initialization of the global unique variable, the structure of the server defines all the information related to the entire server, the specific structure is very complex, but note that there is a structure:

aeEventLoop *el; // This is the registration structure for all event loops in RedisCopy the code
/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

Copy the code

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

Copy the code

It’s hard to see the structure from the code, as shown below:

The specific initialization function aeCreateEventLoop is as follows:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    if(aeApiCreate(eventLoop) == -1) goto err; EventLoop ->apidata // Events with mask == AE_NONE are not set. //Solet's initialize the vector with it. for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }Copy the code

aeApiCreate

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if(! state)return - 1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if(! state->events) { zfree(state);return - 1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == - 1) {
        zfree(state->events);
        zfree(state);
        return - 1;
    }
    eventLoop->apidata = state;
    return 0;
}

Copy the code

Then, in the initServer function, Redis tries to listen on the port according to the configuration:

    /* Open the TCP listening socket for the user commands. */
    if(server.port ! = 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)exit(1);

Copy the code

In the listenToPort function, Redis will try to bind/listen multiple IP addresses, and consider both IPV4 and IPV6 scenarios.

int listenToPort(int port, int *fds, int *count) {
    int j;

    /* Force binding of 0.0.0.0 if no bind address is specified, always * entering the loop if j == 0. */
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        if (server.bindaddr[j] == NULL) {
            int unsupported = 0;
            /* Bind * for both IPv6 and IPv4, we enter here only if * server.bindaddr_count == 0. */
            fds[*count] = anetTcp6Server(server.neterr,port,NULL,
                server.tcp_backlog);
            if(fds[*count] ! = ANET_ERR) { anetNonBlock(NULL,fds[*count]);
                (*count)++;
            } else if (errno == EAFNOSUPPORT) {
                unsupported++;
                serverLog(LL_WARNING,"Not listening to IPv6: unsupproted");
            }

            if (*count == 1 || unsupported) {
                /* Bind the IPv4 address as well. */
                fds[*count] = anetTcpServer(server.neterr,port,NULL,
                    server.tcp_backlog);
                if(fds[*count] ! = ANET_ERR) { anetNonBlock(NULL,fds[*count]);
                    (*count)++;
                } else if (errno == EAFNOSUPPORT) {
                    unsupported++;
                    serverLog(LL_WARNING,"Not listening to IPv4: unsupproted"); }}/* Exit the loop if we were able to bind * on IPv4 and IPv6, * otherwise fds[*count] will be ANET_ERR and we'll print an * error and return to the caller with an error. */
            if (*count + unsupported == 2) break;
        } else if (strchr(server.bindaddr[j],':')) {
            /* Bind IPv6 address. */
            fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        } else {
            /* Bind IPv4 address. */
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        }
        if (fds[*count] == ANET_ERR) {
            serverLog(LL_WARNING,
                "Creating Server TCP listening socket %s:%d: %s",
                server.bindaddr[j] ? server.bindaddr[j] : "*",
                port, server.neterr);
            return C_ERR;
        }
        anetNonBlock(NULL,fds[*count]);
        (*count)++;
    }
    return C_OK;
}

Copy the code

The socket on the server side is stored as a file descriptor in the IPFD array of the server:

int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */

Copy the code

InitServer (ipFD, ipFD, ipFD, ipFD);

    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event."); }}Copy the code

Ipfd [I] is associated with the AE_READABLE event and the acceptTcpHandler callback. This means that whenever a TCP link request is sent from a client, The acceptTcpHandler function is triggered. Let’s take a look at how this function combines these things using the data structure shown above.

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

Copy the code

As you can see from the above source code, this function does two things. One is to store the event callback function in the eventLoop->events[fd] structure. AeApiAddEvent is then called, which is essentially a wrapper around the EPoll interface function. The concrete implementation is as follows:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return- 1;return 0;
}

Copy the code

The code logic is clear, in fact, the core is to call epoll_ctl in the epoll interface, put the FD of the server socket into epoll to monitor.

2.2 Epoll loop invocation of Redis service

After initialization, Redis will enter a loop as follows:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while(! eventLoop->stop) {if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
Copy the code

The loop state is constantly trying to handle events, which is the aeProcessEvents function. This function handles all redis events, including file events and timer events. For file events, the core code is as follows:

      /* Call the multiplexing API, will returnonly on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */if(eventLoop->aftersleep ! = NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop);for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

            /* Note the fe->mask & mask & ... code: maybe an already
             * processed event removed an element that fired and we still
             * didnt processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if(! invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */if (fe->mask & mask & AE_WRITABLE) {
                if(! fired || fe->wfileProc ! = fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */if (invert && fe->mask & mask & AE_READABLE) {
                if(! fired || fe->wfileProc ! = fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; }Copy the code
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if(e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; }}return numevents;
}


Copy the code

Each loop calls aeApiPoll, which is a wrapper around the epoll interface function. The code logic is to see if the file descriptor of the current Monitor has an event to trigger, and if so, to call the callback function to handle it.

2.3 Connection establishment and processing process of Redis client

As mentioned in Section 2.1, the file descriptor for the server socket and AE_READABLE events are associated with a callback function, acceptTcpHandler, which is fired when the server socket becomes readable.

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(Max --) {CFD = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), & cPort);if (cfd == ANET_ERR) {
            if(errno ! = EWOULDBLOCK) serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(cfd,0,cip); }}Copy the code

We can see that Redis uses the Accept function of the socket to accept TCP connection requests one by one, and then forwards the acceptCommonHandler function to handle them.

#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    if ((c = createClient(fd)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return; }... Behind there are some do not affect the main process, so temporarily skip not table.Copy the code

A client data area is created to represent a client. The logic is as follows:

client *createClient(int fd) { client *c = zmalloc(sizeof(client)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the commands needs to be  executed *in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if(fd ! = -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd);if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)  
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    selectDb(c,0);
    uint64_t client_id;
    atomicGetIncr(server.next_client_id,client_id,1);
    c->id = client_id;
    c->fd = fd;
    c->name = NULL;
    c->bufpos = 0;
    c->qb_pos = 0;
    c->querybuf = sdsempty();
    c->pending_querybuf = sdsempty();
    c->querybuf_peak = 0;
    c->reqtype = 0;
    c->argc = 0;
    c->argv = NULL;
    c->cmd = c->lastcmd = NULL;
    c->multibulklen = 0;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->ctime = c->lastinteraction = server.unixtime;
    c->authenticated = 0;
    c->replstate = REPL_STATE_NONE;
    c->repl_put_online_on_ack = 0;
    c->reploff = 0;
    c->read_reploff = 0;
    c->repl_ack_off = 0;
    c->repl_ack_time = 0;
    c->slave_listening_port = 0;
    c->slave_ip[0] = '\ 0';
    c->slave_capa = SLAVE_CAPA_NONE;
    c->reply = listCreate();
    c->reply_bytes = 0;
    c->obuf_soft_limit_reached_time = 0;
    listSetFreeMethod(c->reply,freeClientReplyValue);
    listSetDupMethod(c->reply,dupClientReplyValue);
    c->btype = BLOCKED_NONE;
    c->bpop.timeout = 0;
    c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
    c->bpop.target = NULL;
    c->bpop.xread_group = NULL;
    c->bpop.xread_consumer = NULL;
    c->bpop.xread_group_noack = 0;
    c->bpop.numreplicas = 0;
    c->bpop.reploffset = 0;
    c->woff = 0;
    c->watched_keys = listCreate();
    c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
    c->pubsub_patterns = listCreate();
    c->peerid = NULL;
    c->client_list_node = NULL;
    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    if(fd ! = -1) linkClient(c); initClientMultiState(c);return c;
}
Copy the code

CreateClient does two things

  • The client structure is initialized while applying for memory.
  • The call associates the client socket file descriptor fd with AE_READABLE, and when the client sends a message, the call is madereadQueryFromClientFunction into the reactor server.el
   if (aeCreateFileEvent(server.el,fd,AE_READABLE,
       readQueryFromClient, c) == AE_ERR)
   {
       close(fd);
       zfree(c);
       return NULL;
   }
Copy the code

When redis-server receives a database operation request from a client, it triggers the following callback function, which reads data from the socket and begins processing.

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if(c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen ! = -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); /* Note that the'remaining' variable may be zero in some edge case,
         * for example once we resume a blocked client after CLIENT PAUSE. */
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen); // The socket interface function is called here to read data from the client socket and process itif (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return; }}else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }

    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    /* Time to process the buffer. If the client is a master we need to
     * compute the difference between the applied offset before and after
     * processing the buffer, to understand how much of the replication stream
     * was actually applied to the master state: this quantity, and its
     * corresponding part of the replication stream, will be propagated to
     * the sub-slaves and to the replication backlog. */
    processInputBufferAndReplicate(c);
}
Copy the code

The above function allocates a buffer of the maximum size and calls the socket interface function to read data from the client socket for processing. Finally make processInputBufferAndReplicate (c); This function will do the normal redis command parsing and processing.

At this point, a basic listen port is started, then the service is provided, then the client sends a link request, then the database operation business message flow is all strung together.