preface

This article will build the REDis source debug environment, through the analysis of the main function process, to understand the redis network model and event-driven model, will clearly show the Redis architecture.

The body of the

The source code to build

Source code: Redis-Github

Download Clion and import the project, switch to the Redis6 branch and create the following files:

File name: cmakelists.txt

  • With the directory:
cmake_minimum_required(VERSION 3.0 FATAL_ERROR)
project(redis VERSION 4.0)
set(CMAKE_BUILD_TYPE "Debug")

get_filename_component(REDIS_ROOT "${CMAKE_CURRENT_SOURCE_DIR}" ABSOLUTE)

add_subdirectory(deps)
add_subdirectory(src/modules)

set(SRC_SERVER_TMP
        src/acl.c
        src/adlist.c
        src/ae.c
        src/anet.c
        src/ae_kqueue.c
        src/dict.c
        src/sds.c
        src/zmalloc.c
        src/lzf_c.c
        src/lzf_d.c
        src/pqsort.c
        src/zipmap.c
        src/sha1.c
        src/ziplist.c
        src/release.c
        src/networking.c
        src/util.c
        src/object.c
        src/db.c
        src/replication.c
        src/rdb.c
        src/t_string.c
        src/t_list.c
        src/t_set.c
        src/t_zset.c
        src/evict.c
        src/defrag.c
        src/module.c src/quicklist.c src/expire.c src/childinfo.c src/redis-check-aof.c src/redis-check-rdb.c src/lazyfree.c src/geohash.c  src/rax.c src/geohash_helper.c src/siphash.c src/geo.c src/t_hash.c src/config.c src/aof.c src/pubsub.c src/multi.c src/debug.c src/sort.c src/intset.c src/syncio.c src/cluster.c src/crc16.c src/endianconv.c src/slowlog.c src/scripting.c src/bio.c src/rio.c src/rand.c src/memtest.c src/crc64.c src/bitops.c src/sentinel.c src/notify.c src/setproctitle.c src/crcspeed.c src/blocked.c src/hyperloglog.c src/latency.c src/sparkline.c src/t_stream.c src/lolwut.c src/lolwut5.c src/listpack.c src/localtime.c src/timeout.c src/connection.c src/tls.c src/tracking.c src/lolwut6.c src/gopher.c src/sha256.c )set(SRC_SERVER src/server.c ${SRC_SERVER_TMP})

set(SRC_CLI
        src/anet.c
        src/sds.c
        src/adlist.c
        src/redis-cli.c
        src/zmalloc.c
        src/release.c
        src/anet.c
        src/ae.c
        src/crc64.c
        src/crc16.c
        src/dict.c
        src/siphash.c
        src/crcspeed.c
        )


set(EXECUTABLE_OUTPUT_PATH src)
link_directories(deps/linenoise/ deps/lua/src deps/hiredis)

add_executable(redis-server ${SRC_SERVER})
target_include_directories(redis-server
        PRIVATE ${REDIS_ROOT}/deps/linenoise
        PRIVATE ${REDIS_ROOT}/deps/hiredis
        PRIVATE ${REDIS_ROOT}/deps/lua/src)
target_link_libraries(redis-server
        PRIVATE pthread
        PRIVATE m
        PRIVATE lua
        PRIVATE linenoise
        PRIVATE hiredis)


add_executable(redis-cli ${SRC_CLI})
target_include_directories(redis-cli
        PRIVATE ${REDIS_ROOT}/deps/linenoise
        PRIVATE ${REDIS_ROOT}/deps/hiredis
        PRIVATE ${REDIS_ROOT}/deps/lua/src)

target_link_libraries(redis-cli
        PRIVATE pthread
        PRIVATE m
        PRIVATE linenoise
        PRIVATE hiredis
        )
Copy the code
  • deps/CMakeLists.txt
add_subdirectory(hiredis)
add_subdirectory(linenoise)
add_subdirectory(lua)
Copy the code
  • deps/hiredis/CMakeLists.txt
add_library(hiredis STATIC
        hiredis.c
        net.c
        dict.c
        sds.c
        async.c
        read.c
        )
Copy the code
  • deps/linenoise/CMakeLists.txt
add_library(linenoise linenoise.c)
Copy the code
  • deps/lua/CMakeLists.txt
set(LUA_SRC
        src/lapi.c src/lcode.c src/ldebug.c src/ldo.c src/ldump.c src/lfunc.c
        src/lgc.c src/llex.c src/lmem.c
        src/lobject.c src/lopcodes.c src/lparser.c src/lstate.c src/lstring.c
        src/ltable.c src/ltm.c
        src/lundump.c src/lvm.c src/lzio.c src/strbuf.c src/fpconv.c
        src/lauxlib.c src/lbaselib.c src/ldblib.c src/liolib.c src/lmathlib.c
        src/loslib.c src/ltablib.c
        src/lstrlib.c src/loadlib.c src/linit.c src/lua_cjson.c
        src/lua_struct.c
        src/lua_cmsgpack.c
        src/lua_bit.c
        )

add_library(lua STATIC ${LUA_SRC})
Copy the code
  • src/modules/CMakeLists.txt
cmake_minimum_required(VERSION 3.9)
set(CMAKE_BUILD_TYPE "Debug")
add_library(helloworld SHARED helloworld.c)
set_target_properties(helloworld PROPERTIES PREFIX "" SUFFIX ".so")


add_library(hellotype SHARED hellotype.c)
set_target_properties(hellotype PROPERTIES PREFIX "" SUFFIX ".so")


add_library(helloblock SHARED helloblock.c)
set_target_properties(helloblock PROPERTIES PREFIX "" SUFFIX ".so")


add_library(testmodule SHARED testmodule.c)
set_target_properties(testmodule PROPERTIES PREFIX "" SUFFIX ".so")

Copy the code

Change the file ae_kqueue.c to include:

#include "unistd.h"
#include "ae.h"
#include "zmalloc.h"
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
Copy the code

Run the cmake command. (Install cmake by yourself.) Run the make command

The main method

int main(int argc, char **argv) {
    struct timeval tv;
    int j;

#ifdef REDIS_TEST
    if (argc == 3 && !strcasecmp(argv[1]."test")) {
        if(! strcasecmp(argv[2]."ziplist")) {
            return ziplistTest(argc, argv);
        } else if(! strcasecmp(argv[2]."quicklist")) {
            quicklistTest(argc, argv);
        } else if(! strcasecmp(argv[2]."intset")) {
            return intsetTest(argc, argv);
        } else if(! strcasecmp(argv[2]."zipmap")) {
            return zipmapTest(argc, argv);
        } else if(! strcasecmp(argv[2]."sha1test")) {
            return sha1Test(argc, argv);
        } else if(! strcasecmp(argv[2]."util")) {
            return utilTest(argc, argv);
        } else if(! strcasecmp(argv[2]."endianconv")) {
            return endianconvTest(argc, argv);
        } else if(! strcasecmp(argv[2]."crc64")) {
            return crc64Test(argc, argv);
        } else if(! strcasecmp(argv[2]."zmalloc")) {
            return zmalloc_test(argc, argv);
        }

        return - 1; /* test not found */
    }
#endif

    /* We need to initialize our libraries, and the server configuration. */
#ifdef INIT_SETPROCTITLE_REPLACEMENT
    spt_init(argc, argv);
#endif
    setlocale(LC_COLLATE,"");
    // Set the time environment variable
    tzset();
    // Memory outage handler(log)
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
    // Random number initialization
    srand(time(NULL)^getpid());
    // Exact time
    gettimeofday(&tv,NULL);
    crc64_init();

    // Hash algorithm seed
    uint8_t hashseed[16];
    getRandomBytes(hashseed,sizeof(hashseed));
    dictSetHashFunctionSeed(hashseed);
    // Whether to start in Sentinel mode
    server.sentinel_mode = checkForSentinelMode(argc,argv);

    // Initialize the configuration file
    initServerConfig();
    ACLInit();
    moduleInitModulesSystem();
    tlsInit();

    // Execute the command and save the parameters
    server.executable = getAbsolutePath(argv[0]);
    server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
    server.exec_argv[argc] = NULL;
    for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);

    // Initialize the sentinel configuration
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }

    // Check for aOF and RDB errors
    if (strstr(argv[0]."redis-check-rdb") != NULL)
        redis_check_rdb_main(argc,argv,NULL);
    else if (strstr(argv[0]."redis-check-aof") != NULL)
        redis_check_aof_main(argc,argv);
Copy the code

Basically, the initServerConfig method is executed to set the default value for the service configuration

if (argc >= 2) {
        j = 1;
        sds options = sdsempty();
        char *configfile = NULL;

        // Output the version number and help, etc
        if (strcmp(argv[1]."-v") = =0 ||
            strcmp(argv[1]."--version") = =0) version();
        if (strcmp(argv[1]."--help") = =0 ||
            strcmp(argv[1]."-h") = =0) usage();
        if (strcmp(argv[1]."--test-memory") = =0) {
            if (argc == 3) {
                memtest(atoi(argv[2]),50);
                exit(0);
            } else {
                fprintf(stderr."Please specify the amount of memory to test in megabytes.\n");
                fprintf(stderr."Example: ./redis-server --test-memory 4096\n\n");
                exit(1); }}// Get the configuration file address
        if (argv[j][0] != The '-' || argv[j][1] != The '-') {
            configfile = argv[j];
            server.configfile = getAbsolutePath(configfile);
            zfree(server.exec_argv[j]);
            server.exec_argv[j] = zstrdup(server.configfile);
            j++;
        }

        // Parses parameters such as --port 6379 and other configurations
        while(j ! = argc) {if (argv[j][0] = =The '-' && argv[j][1] = =The '-') {
                // skip --check-rdb has no arguments
                if (!strcmp(argv[j], "--check-rdb")) {
                    j++;
                    continue;
                }
                if (sdslen(options)) options = sdscat(options,"\n");
                options = sdscat(options,argv[j]+2);
                options = sdscat(options,"");
            } else {
                /* Option argument */
                options = sdscatrepr(options,argv[j],strlen(argv[j]));
                options = sdscat(options,"");
            }
            j++;
        }
        // Sentry configuration can only be read from file with no arguments allowed
        if (server.sentinel_mode && configfile && *configfile == The '-') {
            serverLog(LL_WARNING,
                "Sentinel config from STDIN not allowed.");
            serverLog(LL_WARNING,
                "Sentinel needs config file on disk to save state. Exiting...");
            exit(1);
        }
        resetServerSaveParams();
        // Load the configuration file and the parameters read from the command line into the server configuration
        loadServerConfig(configfile,options);
        sdsfree(options);
    }
Copy the code

The input parameters can then be parsed through:

redis-server .. /redis.conf --port 6379Copy the code

To specify the address of the configuration file, and you can fill in some parameters, and finally through the loadServerConfig method to the configuration file and their parameters to parse

 / / Redis information
    serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
    serverLog(LL_WARNING,
        "Redis version=%s, bits=%d, commit=%s, modified=%d, pid=%d, just started",
            REDIS_VERSION,
            (sizeof(long) = =8)?64 : 32,
            redisGitSHA1(),
            strtol(redisGitDirty(),NULL.10) > 0,
            (int)getpid());

    if (argc == 1) {
        serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
    } else {
        serverLog(LL_WARNING, "Configuration loaded");
    }

    // Background mode mode
    server.supervised = redisIsSupervised(server.supervised_mode);
    intbackground = server.daemonize && ! server.supervised;if (background) daemonize();

    // Initialize the server
    initServer();
Copy the code

It’s mainly the initServer method, so inintServer first initializes some list, so I’m not going to show you this, so I’m going to keep going

// Common strings and other shared objects
    createSharedObjects();
    // Increase the maximum number of open files
    adjustOpenFilesLimit();
    // Create an event loop
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    if (server.el == NULL) {
        serverLog(LL_WARNING,
            "Failed creating the event loop. Error message: '%s'",
            strerror(errno));
        exit(1);
    }
Copy the code

As you can see, the call to aeCreateEventLoop creates the event loop for the service, which is the cornerstone of the entire service

aeEventLoop

typedef struct aeEventLoop {
    int maxfd;   /* The current highest file descriptor */
    int setsize; / / the event number
    long long timeEventNextId; // The id of the scheduled task is automatically added
    time_t lastTime;
    aeFileEvent *events; // Event array
    aeFiredEvent *fired; /* Ready event */
    aeTimeEvent *timeEventHead; // Head of scheduled tasks
    int stop;
    void *apidata;
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;
Copy the code

This is the structure of the event loop, mainly *events to store the corresponding events

typedef struct aeFileEvent {
    int mask; / * three flags AE_ (READABLE | WRITABLE | BARRIER) * /
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;
Copy the code

AeFileEvent is a read event and a write event. According to the call methods of the event, the aeFileEvent should be read before write. However, if the mask is AE_BARRIER, the event is read before read

typedef struct aeTimeEvent {
    long long id;
    long when_sec; / / SEC.
    long when_ms;  / / ms
    aeTimeProc *timeProc; // Corresponding execution method
    aeEventFinalizerProc *finalizerProc; // Recycle method
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount;
} aeTimeEvent;
Copy the code

This is a list of timed events in an event loop

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = - 1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    if (aeApiCreate(eventLoop) == - 1) goto err;
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}
Copy the code

Create event loop code, you can see that the main call aeApiCreate, and the setsize event initialization, setsize indicates the maximum number of clients

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif
Copy the code

AeApiCreate invokes the underlying IO implementation, which supports four types:

  • Epoll: used in Linux
  • Kqueue: used in MAC environment
  • Evport: Solaris 10
  • Select: Almost all can be used

About what is select, what is epoll old topic, here will not explain….

++ The following uses epoll as the main IO mode ++

In Java, epoll can also be used as an I/O method. If you have written javaNio, you can register the event you want to listen on, such as Accept and read. When the event is coming, you can process it according to the event type.

Epoll_create Creates an epoll object. Epoll_ctl Adds or deletes an event from a stream to an epoll object. For example, epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);Epoll_wait returns if there is data in the buffer
epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);Epoll_wait returns epoll_wait(epollfd,...) Wait until the registration event occurs
Copy the code
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if(! state)return - 1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if(! state->events) { zfree(state);return - 1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == - 1) {
        zfree(state->events);
        zfree(state);
        return - 1;
    }
    eventLoop->apidata = state;
    return 0;
}

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    // Merge old mask
    mask |= eventLoop->events[fd].mask;
    // Set the event read/write
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    // Add event
    if (epoll_ctl(state->epfd,op,fd,&ee) == - 1) return - 1;
    return 0;
}
Copy the code

Create epoll and add events

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    / / core
    if (aeApiAddEvent(eventLoop, fd, mask) == - 1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}
Copy the code

This is the aeCreateFileEvent method, and it’s used in a lot of places, like overwriting aof. Register a corresponding event with ePoll via aeApiAddEvent

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : - 1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if(e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; }}return numevents;
}
Copy the code

Call epoll_wait to wait for the event, and then put the event into the Fired array

Conclusion: Redis uses different I/O models to process events, and can freely process the addition and deletion of events. Finally, aeApiPoll is called to obtain the reached events, and the corresponding methods will be processed in other places later.

Create a default of 16 databases
    server.db = zmalloc(sizeof(redisDb)*server.dbnum);

    // Bind ports
    if(server.port ! =0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    if(server.tls_port ! =0 &&
        listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR)
        exit(1);

// Unixsocket is not configured with port
    if(server.unixsocket ! =NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {
            serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL,server.sofd);
    }

    // There is no listen for any port
    if (server.ipfd_count == 0 && server.tlsfd_count == 0 && server.sofd < 0) {
        serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
        exit(1);
    }

Copy the code

Continue with bound ports and Unixsockets

// Create 16 databases
    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&dbDictType,NULL);
        server.db[j].expires = dictCreate(&keyptrDictType,NULL);
        server.db[j].expires_cursor = 0;
        server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
        server.db[j].defrag_later = listCreate();
        listSetFreeMethod(server.db[j].defrag_later,(void(*) (void*))sdsfree);
    }
Copy the code

Initializing the database

 // Create a timer
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL.NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
Copy the code

The serverCron timer is created. The serverCron timer is the scheduled task processing for the entire system

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    // Get the self-added ID
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}
Copy the code

Adding a scheduled task does not use IO, but creates a scheduled task to put in the head of the linked list

static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;

    while(te) {
        if(! nearest || te->when_sec < nearest->when_sec || (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms)) nearest = te; te = te->next; }return nearest;
}
Copy the code

Go through the list and find the fastest task

 // Create event handlers for handling TCP or Unix connections
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event."); }}Copy the code

A number of event handlers are created to handle redis client connections

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if(errno ! = EWOULDBLOCK) serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); }}Copy the code

Continue to see acceptCommonHandler

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    client *c;
    UNUSED(ip);

    // There are too many clients
    if (listLength(server.clients) + getClusterConnectionsCount()
        >= server.maxclients)
    {
        char *err;
        if (server.cluster_enabled)
            err = "-ERR max number of clients + cluster "
                  "connections reached\r\n";
        else
            err = "-ERR max number of clients reached\r\n";

        // Write an error message
        if (connWrite(conn,err,strlen(err)) == - 1) {

        }
        server.stat_rejected_conn++;
        connClose(conn);
        return;
    }

    /* Create client */
    if ((c = createClient(conn)) == NULL) {
        char conninfo[100];
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (conn: %s)",
            connGetLastError(conn),
            connGetInfo(conn, conninfo, sizeof(conninfo)));
        connClose(conn); /* May be already closed, just ignore errors */
        return;
    }
    
    c->flags |= flags;
    
    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
        char conninfo[100];
        if (connGetState(conn) == CONN_STATE_ERROR)
            serverLog(LL_WARNING,
                    "Error accepting a client connection: %s (conn: %s)",
                    connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
        freeClient(connGetPrivateData(conn));
        return; }}Copy the code

The maximum number of clients was determined and a failure was returned if the number exceeded, and the createClient was called to create the client

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    
    if (conn) {
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }
Copy the code

Just looking at this paragraph, I set up a read handler for the client connection, readQueryFromClient

static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    return conn->type->set_read_handler(conn, func);
}

.set_read_handler = connSocketSetReadHandler,
Copy the code

Set_read_handler is the connSocketSetReadHandler method

static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if(! conn->read_handler) aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);else
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}
Copy the code

As you can see here, Redis creates multiple TCP events, creates client instances when the client first connects, and sets up ReadHandler, which actually registers read events with epoll, ReadQueryFromClient is called when a command is available to read

ReadQueryFromClient ends up calling processCommand, and redis6 supports multithreading, but dict does not lock. See next time to break down the Redis multithreaded model

conclusion