In the last article “Redis command Execution Process (PART 1)”, we first understand the overall process of Redis command execution, and then carefully analyze the process from Redis startup to establishing socket connection, to reading socket data to input buffer, parsing command, The principle and implementation details of executing commands and other processes. Next, let’s look at the implementation details of the set and GET commands and how to send the command results to the Redis client through the output buffer and socket.

The set and get commands are implemented

The processCommand method parses the corresponding redisCommand from the input buffer and then calls the Call method to execute the proc method of the parsed redisCommand. Proc methods vary from command to command; for example, proc for redisCommand named set is setCommand, while get is getCommand. In this form, you actually implement polymorphic strategies that are particularly common in Java.

void call(client *c, int flags) {... c->cmd->proc(c); . }// redisCommand structure
struct redisCommand {
    char *name;
    // The function paradigm of the corresponding methodredisCommandProc *proc; .// Other definitions
};
// Use a typedef to define an alias
typedef void redisCommandProc(client *c);
// Call different methods for different commands.
struct redisCommand redisCommandTable[] = {
    {"get",getCommand,2."rF".0.NULL.1.1.1.0.0},
    {"set",setCommand,- 3."wm".0.NULL.1.1.1.0.0},
    {"hmset",hsetCommand,4 -."wmF".0.NULL.1.1.1.0.0},...// all redis commands are available
}
Copy the code

SetCommand determines whether the set command carries optional arguments such as nx, xx, ex, or px and then invokes setGenericCommand. Let’s go straight to the setGenericCommand method.

The processing logic of the setGenericCommand method is as follows:

  • The set type is set_nx or set_xx. If the set type is Nx and the key already exists, the set type is returned. Return if it is xx and the key does not exist.
  • Call the setKey method to add the key value to the corresponding Redis database.
  • If there is an expiration time, the call to setExpire sets the expiration time
  • For key space notification
  • Returns the corresponding value to the client.
// t_string.c 
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; 
    /** * set the expiration time; Expire is of type robj and gets an integer value */
    if (expire) {
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }
    /** * NX; * * lookupKeyWrite lookupKeyWrite */ if the key does not exist in the corresponding database
    if((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) ! =NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
    /** * add to data dictionary */
    setKey(c->db,key,val);
    server.dirty++;
    /** * Add expiration time to expiration dictionary */
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    /** * key space notification */
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    /** * returns the value of */
    addReply(c, ok_reply ? ok_reply : shared.ok);
}
Copy the code

The implementation of setKey and setExpire methods will not be discussed here. In fact, the key values are added to the DICT data hash table of DB, and the key and expiration time are added to the Expires hash table, as shown in the figure below.

Let’s look at the implementation of getCommand, which, again, calls the getGenericCommand method underneath.

The getGenericCommand method calls lookupKeyReadOrReply to lookup the corresponding key from the dict data hash table. If not, return C_OK directly; If found, the addReply or addReplyBulk methods are called to add the value to the output buffer, depending on the type of value.

int getGenericCommand(client *c) { robj *o; // Call lookupKeyReadOrReply to find the corresponding key from the data dictionaryif ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        returnC_OK; // If it is a string, addReply returns a single line. If it is any other object type, addReplyBulk is calledif (o->type! = OBJ_STRING) { addReply(c,shared.wrongtypeerr);return C_ERR;
    } else {
        addReplyBulk(c,o);
        returnC_OK; }}Copy the code

LookupKeyReadWithFlags looks up the corresponding key-value pair from redisDb. It first calls expireIfNeeded to determine if the key is out of date and needs to be removed. If it is, lookupKey is called to lookup and return from the dict hash table. See detailed comments in the code for an explanation

/* * Return NULL if the key is not found or is logically out of date. There are some side effects * 1 If the key reaches its expiration date, it will be treated as expired by the device. Delete * 2 last access time to update key * 3 global cache hit probability * flags has two values: LOOKUP_NONE is generally this; LOOKUP_NOTOUCH does not change the last access time */
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { // db.c
    robj *val;
    // Check whether the key is expired
    if (expireIfNeeded(db,key) == 1) {...// Special handling of this case by master and slave
    }
    // Find the key-value dictionary
    val = lookupKey(db,key,flags);
    // Update the global cache hit ratio
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
}
Copy the code

Redis will call expireIfNeeded to determine whether a key is out of date before calling the Find key series method, and then perform synchronous or asynchronous deletion depending on whether Redis is configured with lazy deletion. See Redis Memory Management mechanism and Implementation for more details on key deletion.

There are two special cases in the logic that determines key release expiration:

  • If the current Redis is a slave instance in the master-slave structure, it only determines whether the key is expired and does not delete the key directly. Instead, it waits for the delete command sent by the master instance to delete the key. If Redis is currently the primary instance, propagateExpire is called to propagate expired directives.
  • If Lua script execution is currently underway, due to its atomic and transactional nature, the entire execution expiration time is calculated from the moment it starts, meaning that keys that are not expired at the time of Lua execution will not expire during its entire execution.

/* * Call the lookupKey* family of methods before calling it. * If it is slave: * Slave does not actively expire deleting the key, but the return value will still return that the key has been deleted. * Master Deletes the expired key if it expires, and triggers AOF and synchronization. * A return value of 0 means the key is still valid, Otherwise return 1 */ int expireIfNeeded(redisDb *db, robj *key) {// db.c // obtain the expiration time of the key mstime_t when = getExpire(db,key); mstime_t now;if (when < 0) return0; /* * If you are executing a Lua script, the time of expiration is calculated according to its atomicity at the beginning of execution * this means that keys that are not expired at the time of execution of Lua are not expired at the time of execution. */ now = server.lua_caller ? server.lua_time_start : mstime(); // slave directly returns whether the key is expiredif(server.masterhost ! = NULL)returnnow > when; // when the key is master, the key is returned without expirationif (now <= when) return0; // Delete key server.stat_expiredKeys ++; // Trigger the command to propagate propagateExpire(db,key, server.lazyfree_lazy_EXPIRE); // And the key-space event notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",key,db->id); // Call different functions depending on whether the deletion is lazy or notreturn server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                         dbSyncDelete(db,key);
}
Copy the code

The lookupKey method looks up the key value from the dict hash table of redisDb through dictFind method. If it can be found, it can be judged as the latest access time of updating LRU according to Redis maxmemory_policy. Again, the updateFU method is called to update other metrics that can be reclaimed later when memory runs out.

robj *lookupKey(redisDb *db, robj *key, DictEntry *de = dictFind(db->dict,key-> PTR);if(de) {// get value robj *val = dictGetVal(de); // When in the RDB aOF child replication phase or flags is not LOOKUP_NOTOUCHif(server.rdb_child_pid == -1 && server.aof_child_pid == -1 && ! (flags & LOOKUP_NOTOUCH)) {// If MAXMEMORY_FLAG_LFU is usedif (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                updateLFU(val);
            } else{val->lru = LRU_CLOCK(); }}return val;
    } else {
        returnNULL; }}Copy the code

Writes the command result to the output buffer

At the end of all redisCommand execution, the addReply method is usually called to return the results, and our analysis comes to the data return phase of Redis command execution.

The addReply method does two things:

  • PrepareClientToWrite Determines whether data needs to be returned and adds the current client to the queue waiting for the data to be written back.
  • The _addReplyToBuffer and _addReplyObjectToList methods are called to write the return value to the output buffer, waiting for the socekt to be written.
void addReply(client *c, robj *obj) {
    if(prepareClientToWrite(c) ! = C_OK)return;
    if (sdsEncodedObject(obj)) {
        // The response needs to be added to the output buffer. The general idea is to first try adding to the fixed buffer, and then try adding to the response list if adding fails
        if(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) ! = C_OK) _addReplyObjectToList(c,obj); }else if (obj->encoding == OBJ_ENCODING_INT) {
        .... // Special case optimization
    } else {
        serverPanic("Wrong obj->encoding in addReply()"); }}Copy the code

PrepareClientToWrite first determines whether the current client needs to return data:

  • Clients executed by Lua scripts need to return values;
  • If the client sends REPLY OFF or SKIP, the return value is not required;
  • If the client is the primary instance in master-slave replication, the return value is not required.
  • If the client is a dummy client in AOF loading state, no value is required.

Then, if the client is not already in the deferred pending write (CLIENT_PENDING_WRITE) state, it is set to that state and added to Redis’s pending write return value client queue, the clientS_pending_write queue.

int prepareClientToWrite(client *c) {
    // If it is a Lua client, it is OK
    if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
    // The client sends REPLY OFF or SKIP without sending the return value
    if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;

    // The master sends commands to the slave as a client without receiving return values
    if((c->flags & CLIENT_MASTER) && ! (c->flags & CLIENT_MASTER_FORCE_REPLY))return C_ERR;
    // False client in AOF loading does not need to return a value
    if (c->fd <= 0) return C_ERR; 

    // Add the client to the queue waiting for the return value to be written to the next event cycle.
    if(! clientHasPendingReplies(c) && ! (c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && ! c->repl_put_online_on_ack))) {// Set the flag bit and add clients to the clients_pending_write queue
        c->flags |= CLIENT_PENDING_WRITE;
        listAddNodeHead(server.clients_pending_write,c);
    }
    // indicates that data is already queued for return
    return C_OK;
}
Copy the code

Redis divides the space to store the response data waiting to be returned, known as the output buffer, into two parts, a fixed-size buffer and a linked list of response content data. When the linked list is empty and the buffer has enough space, the response is added to the buffer. If the buffer is full, a node is created and appended to the list. _addReplyToBuffer and _addReplyObjectToList are methods to write data to these Spaces, respectively.

The fixed buffer and response linked list constitute a queue as a whole. The advantage of this organization is that it saves memory, does not need to pre-allocate large chunks of memory, and avoids frequent allocation and reclamation of memory.

This is the process of writing the response to the output buffer. Let’s look at the process of writing data from the output buffer to the socket.

The prepareClientToWrite function adds the client to the Redis wait-to-write return value client queue, the clients_pending_write queue. The event-handling logic for the request processing ends and waits for Redis to write the response from the output buffer to the socket the next time the event loop is processed.

Writes the command return value from the output buffer to the socket

As we saw in The Redis event mechanism, Redis calls the beforeSleep method between event loops to handle some things, including the clients_pending_write list.

The following aeMain method is the main logic of the Redis event loop, and you can see that the Beforesleep method is called every time the loop is executed.

void aeMain(aeEventLoop *eventLoop) { // ae.c
    eventLoop->stop = 0;
    while(! eventLoop->stop) {/* If there is a function that needs to be executed before event processing, execute it */
        if(eventLoop->beforesleep ! =NULL)
            eventLoop->beforesleep(eventLoop);
        /* Start processing the event */aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); }}Copy the code

BeforeSleep function will be called handleClientsWithPendingWrites function to handle clients_pending_write list.

HandleClientsWithPendingWrites method traverses clients_pending_write list, For each client, the writeToClient method is first called to try to write the returned data from the output cache to the SOcekt. If not, The aeCreateFileEvent method can only be called to register a write data event handler sendReplyToClient and wait for the Redis event mechanism to be called again.

The advantage of this is that for clients that return less data, they do not need to bother to register data writing events and wait for events to trigger data writing to the socket. Instead, they directly write data to the socket in the next event cycle, which speeds up the response speed of data return.

However, if the clients_pending_write queue is too long, the processing time will be too long, blocking normal event response processing and resulting in increased latency for subsequent Redis commands.

// Write the return value directly to the client's output buffer, without making a system call or registering a write event handler
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    // Get the length of the system delay write queue
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    // in sequence
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);

        // Write the buffer value to the socket of the client. If the buffer value is written, the next operation is skipped.
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        // There is still data not written, can only register write event handler
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_flags |= AE_BARRIER;
            }
            // Register write event handler sendReplyToClient and wait for execution
            if(aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); }}}return processed;
}
Copy the code

The sendReplyToClient method also calls the writeToClient method, which writes as much data as possible from both the BUF and reply lists in the output buffer to the corresponding socket.

// Write the data in the output buffer to the socket, and return C_OK if there is any data left unprocessed
int writeToClient(int fd, client *c, int handler_installed) {
    ssize_t nwritten = 0, totwritten = 0;
    size_t objlen;
    sds o;
    // Data is still not written
    while(clientHasPendingReplies(c)) {
        // If the buffer has data
        if (c->bufpos > 0) {
            // Write to the socket represented by fd
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            // Count how many subsections are output
            totwritten += nwritten;

            // If the data in buffer has already been sent, the flag bit is reset to allow subsequent data in the response to be written to buffer
            if ((int)c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0; }}else {
            // Buffer has no data, get it from reply queue
            o = listNodeValue(listFirst(c->reply));
            objlen = sdslen(o);

            if (objlen == 0) {
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }
            // Write the queue data to the socket
            nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
            // If the write succeeds, the queue is deleted
            if (c->sentlen == objlen) {
                listDelNode(c->reply,listFirst(c->reply));
                c->sentlen = 0;
                c->reply_bytes -= objlen;
                if (listLength(c->reply) == 0)
                    serverAssert(c->reply_bytes == 0); }}// If the number of bytes output exceeds the NET_MAX_WRITES_PER_EVENT limit, break
        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
            (server.maxmemory == 0|| zmalloc_used_memory() < server.maxmemory) && ! (c->flags & CLIENT_SLAVE))break;
    }
    server.stat_net_output_bytes += totwritten;
    if (nwritten == - 1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            serverLog(LL_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            returnC_ERR; }}if(! clientHasPendingReplies(c)) { c->sentlen =0;
        // Delete the event handler if all the content has been output
        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
        // If all data is returned, close the client and connection
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
            freeClient(c);
            returnC_ERR; }}return C_OK;
}
Copy the code

Personal blog address, welcome to view