Offer to come, dig friends take it! I am participating in the 2022 Spring Recruit Punch card activity. Click here for details.

The data structure

In Redis server.h source code redisServer data structure can be seen, its function has to be more complex. So I’m going to break it down into smaller pieces here, some of which will be dissected separately later. And have done detailed comments, I will not post here. It can be roughly divided into the following parts.

  • General parameters
  • Modules (Modules)
  • Network (Networking)
  • Common command callback functions
  • Statistical correlation (STAT)
  • Configuration Information
  • AOF persistence (including the pipes that parent and child processes use to erase data differences in the AOF rewrite process)
  • RDB persistence dependencies (including pipes used by parent and child processes to communicate during RDB)
  • Command propagation is related under AOF or master-slave replication
  • Log correlation
  • Replication (Master)+Replication (slave)
  • Replication Script cache for master-slave Replication
  • Synchronous Replication
  • System Limits
  • Blocked clients
  • Sort command correlation (sort)
  • Data structure conversion parameters
  • Time cache
  • Publish and Subscribe (Pubsub)
  • Cluster (Cluster)
  • Lazy free(delete expired key, whether to enable background thread asynchronous deletion)
  • The LUA script
  • Delay monitoring correlation
  • Mutex information on the server
  • System Hardware Information

The boot process

Int main(int argc, char **argv);

Let’s analyze the startup process of Redis-server.

General Settings

In the main function, common Settings include time zone Settings and hash seeds

// Time zone Settings
setlocale(LC_COLLATE,"");
tzset(); /* Populates 'timezone' global. */
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
srandom(time(NULL)^getpid());
gettimeofday(&tv,NULL);
init_genrand64(((long long) tv.tv_sec * 1000000 + tv.tv_usec) ^ getpid());
crc64_init();
Copy the code
  • Generating hash seeds
void dictSetHashFunctionSeed(uint8_t *seed) {
    memcpy(dict_hash_function_seed,seed,sizeof(dict_hash_function_seed));
}

/* The default hashing function uses SipHash implementation * in siphash.c. */
uint64_t siphash(const uint8_t *in, const size_t inlen, const uint8_t *k);
uint64_t siphash_nocase(const uint8_t *in, const size_t inlen, const uint8_t *k);

uint64_t dictGenHashFunction(const void *key, int len) {
    return siphash(key,len,dict_hash_function_seed);
}
Copy the code

Mode selection

Check whether it is sentry mode according to the parameters

server.sentinel_mode = checkForSentinelMode(argc,argv);

/* Returns 1 if there is --sentinel among the arguments or if * argv[0] contains "redis-sentinel". */
int checkForSentinelMode(int argc, char **argv) {
    int j;

    if (strstr(argv[0]."redis-sentinel") != NULL) return 1;
    for (j = 1; j < argc; j++)
        if (!strcmp(argv[j],"--sentinel")) return 1;
    return 0;
}
Copy the code

Initialize the server

Server side initialization, mainly the function initServerConfig. The primary responsibility is to initialize the individual member variables in the redisServer database structure.

void initServerConfig(void) {
    int j;

    updateCachedTime(1);
    getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
    server.runid[CONFIG_RUN_ID_SIZE] = '\ 0';
    changeReplicationId();
    clearReplicationId2();
    server.hz = CONFIG_DEFAULT_HZ; /* Initialize it ASAP, even if it may get updated later after loading the config. This value may be used before the server is initialized. */
    server.timezone = getTimeZone(); /* Initialized by tzset(). */
    server.configfile = NULL;
    server.executable = NULL;
    server.arch_bits = (sizeof(long) = =8)?64 : 32;
    server.bindaddr_count = 0;
    server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
    server.ipfd.count = 0;
    server.tlsfd.count = 0;
    server.sofd = - 1;
    server.active_expire_enabled = 1;
    server.skip_checksum_validation = 0;
    server.saveparams = NULL;
    server.loading = 0;
    server.loading_rdb_used_mem = 0;
    server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
    server.aof_state = AOF_OFF;
    server.aof_rewrite_base_size = 0;
    server.aof_rewrite_scheduled = 0;
    server.aof_flush_sleep = 0;
    server.aof_last_fsync = time(NULL);
    atomicSet(server.aof_bio_fsync_status,C_OK);
    server.aof_rewrite_time_last = - 1;
    server.aof_rewrite_time_start = - 1;
    server.aof_lastbgrewrite_status = C_OK;
    server.aof_delayed_fsync = 0;
    server.aof_fd = - 1;
    server.aof_selected_db = - 1; /* Make sure the first time will not match */
    server.aof_flush_postponed_start = 0;
    server.pidfile = NULL;
    server.active_defrag_running = 0;
    server.notify_keyspace_events = 0;
    server.blocked_clients = 0;
    memset(server.blocked_clients_by_type,0.sizeof(server.blocked_clients_by_type));
    server.shutdown_asap = 0;
    server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
    server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
    server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
    server.next_client_id = 1; /* Client IDs, start from 1 .*/
    server.loading_process_events_interval_bytes = (1024*1024*2);

    unsigned int lruclock = getLRUClock();
    atomicSet(server.lruclock,lruclock);
    resetServerSaveParams();

    appendServerSaveParams(60*60.1);  /* save after 1 hour and 1 change */
    appendServerSaveParams(300.100);  /* save after 5 minutes and 100 changes */
    appendServerSaveParams(60.10000); /* save after 1 minute and 10000 changes */

    /* Replication related */
    server.masterauth = NULL;
    server.masterhost = NULL;
    server.masterport = 6379;
    server.master = NULL;
    server.cached_master = NULL;
    server.master_initial_offset = - 1;
    server.repl_state = REPL_STATE_NONE;
    server.repl_transfer_tmpfile = NULL;
    server.repl_transfer_fd = - 1;
    server.repl_transfer_s = NULL;
    server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
    server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
    server.master_repl_offset = 0;

    /* Replication partial resync backlog */
    server.repl_backlog = NULL;
    server.repl_backlog_histlen = 0;
    server.repl_backlog_idx = 0;
    server.repl_backlog_off = 0;
    server.repl_no_slaves_since = time(NULL);

    /* Failover related */
    server.failover_end_time = 0;
    server.force_failover = 0;
    server.target_replica_host = NULL;
    server.target_replica_port = 0;
    server.failover_state = NO_FAILOVER;

    /* Client output buffer limits */
    for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
        server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];

    /* Linux OOM Score config */
    for (j = 0; j < CONFIG_OOM_COUNT; j++)
        server.oom_score_adj_values[j] = configOOMScoreAdjValuesDefaults[j];

    /* Double constants initialization */
    R_Zero = 0.0;
    R_PosInf = 1.0/R_Zero;
    R_NegInf = 1.0/R_Zero;
    R_Nan = R_Zero/R_Zero;

    /* Command table -- we initialize it here as it is part of the * initial configuration, since command names may be changed via * redis.conf using the rename-command directive. */
    server.commands = dictCreate(&commandTableDictType,NULL);
    server.orig_commands = dictCreate(&commandTableDictType,NULL);
    populateCommandTable();
    server.delCommand = lookupCommandByCString("del");
    server.multiCommand = lookupCommandByCString("multi");
    server.lpushCommand = lookupCommandByCString("lpush");
    server.lpopCommand = lookupCommandByCString("lpop");
    server.rpopCommand = lookupCommandByCString("rpop");
    server.zpopminCommand = lookupCommandByCString("zpopmin");
    server.zpopmaxCommand = lookupCommandByCString("zpopmax");
    server.sremCommand = lookupCommandByCString("srem");
    server.execCommand = lookupCommandByCString("exec");
    server.expireCommand = lookupCommandByCString("expire");
    server.pexpireCommand = lookupCommandByCString("pexpire");
    server.xclaimCommand = lookupCommandByCString("xclaim");
    server.xgroupCommand = lookupCommandByCString("xgroup");
    server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
    server.lmoveCommand = lookupCommandByCString("lmove");

    /* Debugging */
    server.watchdog_period = 0;

    /* By default we want scripts to be always replicated by effects * (single commands executed by the script), and not by sending the * script to the slave / AOF. This is the new way starting from * Redis 5. However it is possible to revert it via redis.conf. */
    server.lua_always_replicate_commands = 1;

    /* Client Pause related */
    server.client_pause_type = CLIENT_PAUSE_OFF;
    server.client_pause_end_time = 0;   

    initConfigValues();
}
Copy the code

The guard Settings

Based on the mode selected above, if sentry mode is selected, the sentry initialization process is entered here.

Check whether RDB/AOF is required

/* Check if we need to start in redis-check-rdb/aof mode. We just execute * the program main. However the program is part of the Redis executable * so that we can easily execute an RDB check on loading errors. */ if (strstr(argv[0],"redis-check-rdb") ! = NULL) redis_check_rdb_main(argc,argv,NULL); else if (strstr(argv[0],"redis-check-aof") ! = NULL) redis_check_aof_main(argc,argv);Copy the code

Parses command line arguments

Override the default parameter Settings based on command line parameters, such as those already set in the configuration file

if (argc >= 2) { j = 1; /* First option to parse in argv[] */ sds options = sdsempty(); /* Handle special options --help and --version */ if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) version(); if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0) usage(); if (strcmp(argv[1], "--test-memory") == 0) { if (argc == 3) { memtest(atoi(argv[2]),50); exit(0); } else { fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n"); exit(1); } } /* Parse command line options * Precedence wise, File, stdin, explicit options -- last config is the one that matters. * * First argument is the config file name? */ if (argv[1][0] ! = '-') { /* Replace the config file in server.exec_argv with its absolute path. */ server.configfile = getAbsolutePath(argv[1]); zfree(server.exec_argv[1]); server.exec_argv[1] = zstrdup(server.configfile); j = 2; // Skip this arg when parsing options } while(j < argc) { /* Either first or last argument - Should we read config from stdin? */ if (argv[j][0] == '-' && argv[j][1] == '\0' && (j == 1 || j == argc-1)) { config_from_stdin = 1; } /* All the other options are parsed and conceptually appended to the * configuration file. For instance --port 6380 will generate the * string "port 6380\n" to be parsed after the actual config file * and stdin input are parsed (if they  exist). */ else if (argv[j][0] == '-' && argv[j][1] == '-') { /* Option name */ if (sdslen(options)) options = sdscat(options,"\n"); options = sdscat(options,argv[j]+2); options = sdscat(options," "); } else { /* Option argument */ options = sdscatrepr(options,argv[j],strlen(argv[j])); options = sdscat(options," "); } j++; } // read configfile loadServerConfig(server.configfile, config_from_stdin, options); if (server.sentinel_mode) loadSentinelConfigFromQueue(); sdsfree(options); }Copy the code

Reading configuration Files

Read the redis.conf configuration parameters.

/* Load the server configuration from the specified filename. * The function appends the additional configuration directives stored * in the 'options' string to the config file before loading. * * Both filename and options can be NULL, in such a case are considered * empty. This way loadServerConfig can be used to just load a file or * just load a string. */
void loadServerConfig(char *filename, char config_from_stdin, char *options) {
    sds config = sdsempty();
    char buf[CONFIG_MAX_LINE+1];
    FILE *fp;

    /* Load the file content */
    if (filename) {
        if ((fp = fopen(filename,"r")) = =NULL) {
            serverLog(LL_WARNING,
                    "Fatal error, can't open config file '%s': %s",
                    filename, strerror(errno));
            exit(1);
        }
        while(fgets(buf,CONFIG_MAX_LINE+1,fp) ! =NULL)
            config = sdscat(config,buf);
        fclose(fp);
    }
    /* Append content from stdin */
    if (config_from_stdin) {
        serverLog(LL_WARNING,"Reading config from stdin");
        fp = stdin;
        while(fgets(buf,CONFIG_MAX_LINE+1,fp) ! =NULL)
            config = sdscat(config,buf);
    }

    /* Append the additional options */
    if (options) {
        config = sdscat(config,"\n");
        config = sdscat(config,options);
    }
    loadServerConfigFromString(config);
    sdsfree(config);
}
Copy the code

Background Running

intbackground = server.daemonize && ! server.supervised;// Whether to run in the background
if (background) daemonize();

// The method that executes the background run
void daemonize(void) {
    int fd;

    if(fork() ! =0) exit(0); /* parent exits */
    setsid(); /* create a new session */

    /* Every output goes to /dev/null. If Redis is daemonized but * the 'logfile' is set to 'stdout' in the configuration file * it will not log at all. */
    if ((fd = open("/dev/null", O_RDWR, 0)) != - 1) {
        dup2(fd, STDIN_FILENO);
        dup2(fd, STDOUT_FILENO);
        dup2(fd, STDERR_FILENO);
        if(fd > STDERR_FILENO) close(fd); }}Copy the code

Here need to say (often asked in the interview), of course, Redis process, seems to be the common problem is relatively simple, we seem to be pure background process:

int daemon_init(const char *pname, int facility) { int i; pid_t pid; If (pid = Fork()) < 0) // create a child process return (-1); else if (pid) _exit(0); /* Child 1 continues /* child 1 continues /* child 1 continues /* child 1 continues /* child 1 continues */ if (setsid() < 0) /* if (setsid() < 0) /* become session leader */ return (-1); Signal(SIGHUP, SIG_IGN); If ((pid = Fork()) < 0) // create a child process return (-1); Else if (pid) // Parent process exits _exit(0); /* child 1 terminates */ * child 2 terminates */ * child 2 continues /* child 2 terminates */ * child 2 continues /* Child 2 terminates */ */ daemon_proc = 1; /* for err_XXX() */ chdir("/"); /* for err_XXX() */ chdir("/"); /* change working directory to the root */ * close off file descriptors */ for (I = 0; i < MAXFD; I ++) // Close all file descriptors; /* redirect stdin, stdout, and stderr to /dev/null defines stdout, stdout, and stderr to /dev/null */ open("/dev/null", O_RDONLY); open("/dev/null", O_RDWR); open("/dev/null", O_RDWR); openlog(pname, LOG_PID, facility); Return (0); /* success */}Copy the code
  • Setsid (): Ensures that the current process programming the session header group of the new painting is not under terminal control.
  • Ignoring the SIGHUP signal, the purpose of the fork is to ensure that if Ben opens a terminal, it does not automatically acquire the terminal. When a session header process with a control terminal opens a control terminal, the terminal automatically becomes the control terminal of the session header process. Then, after forking again, we ensure that the new child process is no longer a session header process and therefore cannot automatically acquire a controller terminal. The SIGHUP signal must be ignored here, because when the session header process (the children of the first fork) terminates, all the processes in the session (the children of the second fork) receive the SIGHUP signal
  • Change trace path
  • Close all inherited sockets
  • Redirect stdin,stdout and stderr, otherwise it will output to the screen.

Initializing the service

A semaphore

The server deals most with SIGHUP and SIGPIPE, both of which terminate by default. And the server can’t just terminate that, we need to ignore it.

    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
Copy the code

For other signals, such as SIGINT signals, we capture them and then redis carries out the final work to avoid the process of killing by violence

void setupSignalHandlers(void) {
    struct sigaction act;

    /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. * Otherwise, sa_handler is used. */
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    act.sa_handler = sigShutdownHandler;
    sigaction(SIGTERM, &act, NULL);
    sigaction(SIGINT, &act, NULL);

    sigemptyset(&act.sa_mask);
    act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
    act.sa_sigaction = sigsegvHandler;
    if(server.crashlog_enabled) {
        sigaction(SIGSEGV, &act, NULL);
        sigaction(SIGBUS, &act, NULL);
        sigaction(SIGFPE, &act, NULL);
        sigaction(SIGILL, &act, NULL);
        sigaction(SIGABRT, &act, NULL);
    }
    return;
}
Copy the code

Syslog set

Syslog comes with The Linux system and is used to log daemon processes, as discussed earlier. The deamon process cannot print to the terminal, so how can it easily receive the output log? Linux provides the syslog service, which supports the printing of various levels of logs and the output location (local or remote).

    if (server.syslog_enabled) {
        openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
            server.syslog_facility);
    }
Copy the code

Set other common parameters

    /* Initialization after setting defaults from the config system. */
    server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
    server.hz = server.config_hz;
    server.pid = getpid();
    server.in_fork_child = CHILD_TYPE_NONE;
    server.main_thread_id = pthread_self();
    server.current_client = NULL;
    server.errors = raxNew();
    server.fixed_time_expire = 0;
    server.clients = listCreate();
    server.clients_index = raxNew();
    server.clients_to_close = listCreate();
    server.slaves = listCreate();
    server.monitors = listCreate();
    server.clients_pending_write = listCreate();
    server.clients_pending_read = listCreate();
    server.clients_timeout_table = raxNew();
    server.replication_allowed = 1;
    server.slaveseldb = - 1; /* Force to emit the first SELECT command. */
    server.unblocked_clients = listCreate();
    server.ready_keys = listCreate();
    server.clients_waiting_acks = listCreate();
    server.get_ack_from_slaves = 0;
    server.client_pause_type = 0;
    server.paused_clients = listCreate();
    server.events_processed_while_blocked = 0;
    server.system_memory_size = zmalloc_get_memory_size();
    server.blocked_last_cron = 0;
    server.blocking_op_nesting = 0;
Copy the code

Creating a Shared Object

Redis is very focused on memory consumption, and some commonly used objects are reused by reference counting

    createSharedObjects();

    /* Shared command responses */
    shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
    shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
    shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
    shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
    shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
    shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n"));
    shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
    shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
    shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
    shared.space = createObject(OBJ_STRING,sdsnew(""));
    shared.colon = createObject(OBJ_STRING,sdsnew(":"));
    shared.plus = createObject(OBJ_STRING,sdsnew("+"));
Copy the code

Adjust the number of files you open according to system limits

    adjustOpenFilesLimit();
Copy the code

The network model REACTOR is initialized

Redis supports both Unix and TCP models. Unix domain sockets are faster when both servers and clients are native because no protocol header parsing is required

    const char *clk_msg = monotonicInit();
    serverLog(LL_NOTICE, "monotonic clock: %s", clk_msg);
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    if (server.el == NULL) {
        serverLog(LL_WARNING,
            "Failed creating the event loop. Error message: '%s'",
            strerror(errno));
        exit(1);
    }
    server.db = zmalloc(sizeof(redisDb)*server.dbnum);

    /* Open the TCP listening socket for the user commands. */
    if(server.port ! =0 &&
        listenToPort(server.port,&server.ipfd) == C_ERR) {
        serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
        exit(1);
    }
    if(server.tls_port ! =0 &&
        listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
        serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
        exit(1);
    }

    /* Open the listening Unix domain socket. */
    if(server.unixsocket ! =NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {
            serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL,server.sofd);
        anetCloexec(server.sofd);
    }
Copy the code

Expiration pool initialization in LRU policy

/* Create a new eviction pool. */
void evictionPoolAlloc(void) {
    struct evictionPoolEntry *ep;
    int j;

    ep = zmalloc(sizeof(*ep)*EVPOOL_SIZE);
    for (j = 0; j < EVPOOL_SIZE; j++) {
        ep[j].idle = 0;
        ep[j].key = NULL;
        ep[j].cached = sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE);
        ep[j].dbid = 0;
    }
    EvictionPoolLRU = ep;
}
Copy the code

Initialize RDB and AOF information

    server.rdb_child_type = RDB_CHILD_TYPE_NONE;
    server.rdb_pipe_conns = NULL;
    server.rdb_pipe_numconns = 0;
    server.rdb_pipe_numconns_writing = 0;
    server.rdb_pipe_buff = NULL;
    server.rdb_pipe_bufflen = 0;
    server.rdb_bgsave_scheduled = 0;
    server.child_info_pipe[0] = - 1;
    server.child_info_pipe[1] = - 1;
    server.child_info_nread = 0;
    aofRewriteBufferReset();
    server.aof_buf = sdsempty();
    server.lastsave = time(NULL); /* At startup we consider the DB saved. */
    server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
    server.rdb_save_time_last = - 1;
    server.rdb_save_time_start = - 1;
    server.dirty = 0;
Copy the code

Example Initialize the status information

    resetServerStats();
    /* A few stats we don't want to reset: server startup time, and peak mem. */
    server.stat_starttime = time(NULL);
    server.stat_peak_memory = 0;
    server.stat_current_cow_bytes = 0;
    server.stat_current_cow_updated = 0;
    server.stat_current_save_keys_processed = 0;
    server.stat_current_save_keys_total = 0;
    server.stat_rdb_cow_bytes = 0;
    server.stat_aof_cow_bytes = 0;
    server.stat_module_cow_bytes = 0;
    server.stat_module_progress = 0;
    for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
        server.stat_clients_type_memory[j] = 0;
    server.cron_malloc_stats.zmalloc_used = 0;
    server.cron_malloc_stats.process_rss = 0;
    server.cron_malloc_stats.allocator_allocated = 0;
    server.cron_malloc_stats.allocator_active = 0;
    server.cron_malloc_stats.allocator_resident = 0;
    server.lastbgsave_status = C_OK;
    server.aof_last_write_status = C_OK;
    server.aof_last_write_errno = 0;
    server.repl_good_slaves_count = 0;
Copy the code

Register event

In network programming, there are three types of events: file events, timing events and signal events

    /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL.NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }

    /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */
    if(createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) ! = C_OK) { serverPanic("Unrecoverable error creating TCP socket accept handler.");
    }
    if(createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) ! = C_OK) { serverPanic("Unrecoverable error creating TLS socket accept handler.");
    }
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");


    /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */
    if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
        moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
            serverPanic(
                "Error registering the readable event for the module "
                "blocked clients subsystem.");
    }
Copy the code

Here we use callback to register events and combine IO reuse technology with time-efficient network model.

  • Copy initialization and so on
  • Some Lua virtual machine scripts added

Slow log initialization

/* Initialize the slow log. This function should be called a single time * at server startup. */
void slowlogInit(void) {
    server.slowlog = listCreate();
    server.slowlog_entry_id = 0;
    listSetFreeMethod(server.slowlog,slowlogFreeEntry);
}
Copy the code

Background task thread creation

There are three main types of background threads

  • Close the file descriptor asynchronously
  • AOF Asynchronous disk flushing
  • The expiration key is deleted asynchronously
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    /* Initialization of state vars and objects */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if(! stacksize) stacksize =1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void(*)unsigned long) j;
        if(pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) ! =0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1); } bio_threads[j] = thread; }}Copy the code

The lua script

other

External module loading

void moduleLoadFromQueue(void) {
    listIter li;
    listNode *ln;

    listRewind(server.loadmodule_queue,&li);
    while((ln = listNext(&li))) {
        struct moduleLoadQueueEntry *loadmod = ln->value;
        if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc)
            == C_ERR)
        {
            serverLog(LL_WARNING,
                "Can't load module from %s: server aborting",
                loadmod->path);
            exit(1); }}Copy the code

Disk load data

/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == AOF_ON) {
        if (loadAppendOnlyFile(server.aof_filename) == C_OK)
            serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds", (float)(ustime()-start)/1000000);
    } else {
        rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
        errno = 0; /* Prevent a stale value from affecting error checking */
        if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
            serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
                (float)(ustime()-start)/1000000);

            /* Restore the replication ID / offset from the RDB file. */
            if((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself))) && rsi.repl_id_is_set && rsi.repl_offset ! =- 1 &&
                /* Note that older implementations may save a repl_stream_db * of -1 inside the RDB file in a wrong way, see more * information in function rdbPopulateSaveInfo. */rsi.repl_stream_db ! =- 1)
            {
                memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
                server.master_repl_offset = rsi.repl_offset;
                /* If we are a slave, create a cached master from this * information, in order to allow partial resynchronizations * with masters. */replicationCacheMasterUsingMyself(); selectDb(server.cached_master,rsi.repl_stream_db); }}else if(errno ! = ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
            exit(1); }}}Copy the code

Prepare before the event loop

    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
Copy the code

beforeSleep

BeforeSleep consists of several steps:

  • Run the Fast Cycle mode to process the expiration
  • Send ACK requests to all slave nodes
  • Connect blocking slave nodes
  • Handle blocked clients
  • Brushed AOF plate
  • Process pending requests

The code is as follows:

void beforeSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);

    size_t zmalloc_used = zmalloc_used_memory();
    if (zmalloc_used > server.stat_peak_memory)
        server.stat_peak_memory = zmalloc_used;

    /* Just call a subset of vital functions in case we are re-entering * the event loop from processEventsWhileBlocked(). Note that in this * case we keep track of the number of events we are processing, since * processEventsWhileBlocked() wants to stop ASAP if there are no longer * events to handle. */
    if (ProcessingEventsWhileBlocked) {
        uint64_t processed = 0;
        processed += handleClientsWithPendingReadsUsingThreads();
        processed += tlsProcessPendingData();
        processed += handleClientsWithPendingWrites();
        processed += freeClientsInAsyncFreeQueue();
        server.events_processed_while_blocked += processed;
        return;
    }

    /* Handle precise timeouts of blocked clients. */
    handleBlockedClientsTimeout();

    /* We should handle pending reads clients ASAP after event loop. */
    handleClientsWithPendingReadsUsingThreads();

    /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
    tlsProcessPendingData();

    /* If tls still has pending unread data don't sleep at all. */
    aeSetDontWait(server.el, tlsHasPendingData());

    /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients * later in this function. */
    if (server.cluster_enabled) clusterBeforeSleep();

    /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

    /* Unblock all the clients blocked for synchronous replication * in WAIT. */
    if (listLength(server.clients_waiting_acks))
        processClientsWaitingReplicas();

    /* Check if there are clients unblocked by modules that implement * blocking commands. */
    if (moduleCount()) moduleHandleBlockedClients();

    /* Try to process pending commands for clients that were just unblocked. */
    if (listLength(server.unblocked_clients))
        processUnblockedClients();

    /* Send all the slaves an ACK request if at least one client blocked * during the previous event loop iteration. Note that we do this after * processUnblockedClients(), so if there are multiple pipelined WAITs * and the just unblocked WAIT gets blocked again, we don't have to wait * a server cron cycle in absence of other event loop events. See #6623. * * We also don't send the  ACKs while clients are paused, since it can * increment the replication backlog, they'll be sent after the pause * if we are still the master. */
    if(server.get_ack_from_slaves && ! checkClientPauseTimeoutAndReturnIfPaused()) { robj *argv[3];

        argv[0] = shared.replconf;
        argv[1] = shared.getack;
        argv[2] = shared.special_asterick; /* Not used argument. */
        replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
        server.get_ack_from_slaves = 0;
    }

    /* We may have recieved updates from clients about their current offset. NOTE:
     * this can't be done where the ACK is recieved since failover will disconnect 
     * our clients. */
    updateFailoverStatus();

    /* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST)  mode. */
    trackingBroadcastInvalidationMessages();

    /* Write the AOF buffer on disk */
    if (server.aof_state == AOF_ON)
        flushAppendOnlyFile(0);

    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();

    /* Close clients that need to be closed asynchronous */
    freeClientsInAsyncFreeQueue();

    /* Try to process blocked clients every once in while. Example: A module * calls RM_SignalKeyAsReady from within a timer callback (So we don't * visit processCommand() at all). */
    handleClientsBlockedOnKeys();

    /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */
    if (moduleCount()) moduleReleaseGIL();

    /* Do NOT add anything below moduleReleaseGIL !!! * /
}
Copy the code

Enter the event main loop

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

/* After sleep callback. */
if(eventLoop->aftersleep ! =NULL && flags & AE_CALL_AFTER_SLEEP)
    eventLoop->aftersleep(eventLoop);

/ / write: sendReplyToClient
/ / read: readQueryFromClient
Copy the code

AeProcessEvents contains network time and timed events, which are inherited together through I/O reuse.

Overall request process

Reference and citation

  • Redis Design and Implementation, Huang Jianhong
  • Images from the Internet