Redis is the big brother of the cache world, the recent things to do rely on Redis more, using the inside of the publish and subscribe function, transaction function and SortedSet data structure, ready to learn a good summary of some knowledge points of Redis.

Let’s take a look at the redis publish-subscribe structure:

Redis publishes a subscription structure

Publishers and subscribers interact with each other through channels, which can be divided into two modes.

Redis publishes subscription commands

Six commands are provided in Redis for the publish subscribe (pub/sub) function, divided into two modes.

  1. Subscribe consists of subscribe and unsubscribe, which are responsible for subscribing to a channel with a certain name. For example, subscribe test means subscribing to a channel named test.
  2. It consists of psubscribe, punsubscribe, which is responsible for subscribing to channels with vague names. For example, psubscribe test* means to subscribe to all channels starting with test.

Finally, a publish command and a pubsub command to view subscription information are added.

Second, Redis release and subscription source analysis

All redis commands and their handlers are placed at the beginning of the server.c file, where you can find command information related to the publish and subscribe function.

    {"subscribe",subscribeCommand,-2,"pslt",0,0,0,0, 0, NULL, 0}, {"unsubscribe",unsubscribeCommand,-1,"pslt",0,0,0,0, 0, NULL, 0}, {"psubscribe",psubscribeCommand,-2,"pslt",0,0,0,0, 0, NULL, 0}, {"punsubscribe",punsubscribeCommand,-1,"pslt",0,0,0,0, 0, NULL, 0}, {"publish",publishCommand,3,"pltF",0,0,0,0, 0, NULL, 0}, {"pubsub",pubsubCommand,-2,"pltR",0,0,0,0, 0, NULL, 0},Copy the code

As you can see, creating a command requires many parameters. We only need to focus on the first two parameters. The first parameter represents the content of the command, and the second parameter represents the corresponding handler function of the command.

Normal mode subscribe function: This command supports multiple arguments, i.e. Subscribe channel1,channel2…

void subscribeCommand(client *c) { int j; // The arguments to subscribe are processed one by one, since the command itself is treated as argument 0, the following arguments are processed from 1for(j = 1; j < c->argc; J ++) // Subscribe every channel pubsubSubscribeChannel(c,c->argv); / / here sets the client's status, the following will explain the role of the state c - > flags | = CLIENT_PUBSUB; }Copy the code

In the server.c file, the processCommand function is the judgment logic before calling the specific command function, which contains the following paragraph:

/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if(c->flags & CLIENT_PUBSUB && c->cmd->proc ! = pingCommand && c->cmd->proc ! = subscribeCommand && c->cmd->proc ! = unsubscribeCommand && c->cmd->proc ! = psubscribeCommand && c->cmd->proc ! = punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }Copy the code

When a client is in the pub/sub context, it only receives subscription commands and a ping command. This explains why the client flag field is set in the subscribeCommand function.

Let’s look at the logic of the subscription:

int pubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; // Add the specified channel to the client's pubsub_channels hash table // failed to subscribe to the channelif(dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; // Add the channel to the client's hash table by referring to incrRefCount(channel); Pubsub_channels = dictFind(server.pubsub_channels,channel); // If the channel does not already exist, create itif(de == NULL) {// Create an empty list clients = listCreate(); DictAdd (server.pubsub_channels,channel,clients); // Add channel to the server hash table dictAdd(server.pubsub_channels,channel,clients); // Add 1 incrRefCount(channel) to the channel reference; }else{ clients = dictGetVal(de); } // Add client to the channel's subscription list. ListAddNodeTail (Clients,c); } // a list of operations to notify the client addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,clientSubscriptionsCount(c));return retval;
}Copy the code

Add the specified channel to the pub/sub table of the client and server respectively, and save the list of clients subscribed to the channel on the server side.

Normal mode publishes subscription data structures

Consider the publish publish command: for example, publish channelName MSG

Void publishCommand(client *c) {// Receivers = pubsubPublishMessage(c->argv[1], C ->argv[2]); // This is about clustering or AOF operationsif (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    elseforceCommandPropagation(c,PROPAGATE_REPL); // Returns the number of subscribers notified by client addReplyLongLong(c, Receivers); }Copy the code

Focus on the source of the distribution function:

int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; listNode *ln; listIter li; Clients de = dictFind(server.pubsub_channels,channel); // Clients de = dictFind(server.pubsub_channels,channel);ifList *list = dictGetVal(de); listNode *ln; listIter li; ListRewind (list,&li); listRewind(list,&li); // Iterate over all clients and send messageswhile((ln = listNext(&li)) ! = NULL) { client *c = ln->value; addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message); receivers++; }} // Start the logical processing of fuzzy matching. Fuzzy patterns use linked lists instead of hash tables, as discussed laterif(listLength(server.pubsub_patterns)) {// Create an iterator for fuzzy rules li listRewind(server.pubsub_patterns,& Li); channel = getDecodedObject(channel); // Iterate over all fuzzy patterns and send a message if a match is successfulwhile((ln = listNext(&li)) ! = NULL) { pubsubPattern *pat = ln->value; // Determine whether the current channel can match the fuzzy ruleif (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}Copy the code

Publish: publish: publish: publish: publish: publish: publish: publish: publish: publish: publish: publish: publish: publish: publish: publish: publish: publish: publish

Fuzzy mode subscription pSUBSCRIBE function:

Void psubscribe (client *c) {int j; // Subscribe the pattern specified by the client one by onefor(j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); / / modify client state c - > flags | = CLIENT_PUBSUB; } int pubsubSubscribePattern(client *c, robj *pattern) { int retval = 0; // Check whether the client has subscribed to this patternif(listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; pubsubPattern *pat; ListAddNodeTail (c-> pubSub_Patterns,pattern); // Add the specified pattern to the client's pattern list. +1 incrRefCount(pattern); If multiple clients subscribe to the same pattern, multiple patters will be created. If multiple clients subscribe to the same pattern, multiple patters will be created. Different from normal mode pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat); } // Notify the client of addReply(c,shared.mbulkhdr[3]); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); addReplyLongLong(c,clientSubscriptionsCount(c));return retval;
}Copy the code

By analyzing the above source code, we can summarize the data structure of fuzzy subscription, as shown below:

Fuzzy publish subscribe schema data structures

Note: As mentioned above, in fuzzy mode, a PAT object contains a pattern rule and a client pointer, that is, when multiple clients ambiguously subscribe to the same pattern, a node is also created for each client.

Normal mode unsubscribe function: Unsubscribe is relatively simple, in other words, the above lock is stored in the server and client data deletion.

Void unsubscribeCommand(client *c) {// If the command has no parameters, cancel all channelsif (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
    } else{ int j; // Iteration cancels the top channelfor(j = 1; j < c->argc; j++) pubsubUnsubscribeChannel(c,c->argv[j],1); } // If channels are cancelled altogether, change the client state so that clients can send other commandsif(clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; } / / disposable unsubscribe all channel int pubsubUnsubscribeAllChannels (client * c, DictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; int count = 0;while((de = dictNext(di)) ! = NULL) { robj *channel = dictGetKey(de); Channel count += pubsubUnsubscribeChannel(c,channel,notify); } // If there is no subscription on the client, the response is still returnedif(notify && count == 0) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.unsubscribebulk); addReply(c,shared.nullbulk); addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } // free space dictReleaseIterator(di);returncount; } // unsubscribe specified channel int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {dictEntry *de; list *clients; listNode *ln; int retval = 0; // Remove the specified channel from the clientif(dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; Client de = dictFind(server.pubsub_channels,channel); serverAssertWithInfo(c,NULL,de ! = NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); serverAssertWithInfo(c,NULL,ln ! = NULL); listDelNode(clients,ln);ifDictDelete (server.pubsub_channels,channel); listLength(clients) == 0) {// dictDelete(server.pubsub_channels,channel); }} // Returns the client responseif(notify) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.unsubscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } // reference count -1 decrRefCount(channel);return retval;
}Copy the code

Since unsubscribe in fuzzy mode is similar to normal mode, I will not post the code here.

3. Summary of Redis publication and subscription

The whole release subscription code is simple and clear, when a problem worth thinking about in the normal mode with fuzzy pattern respectively using the hash table with linked list two kinds of structure, rather than a uniform, the reason is that the fuzzy model can’t exactly match, need to traverse each judgment, and hash table has the advantage of fast locating, Not applicable in scenarios where traversal and fuzzy matching are required.