This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.

Nowadays Internet applications are basically designed with distributed system architecture, and a basic software necessary for many distributed systems is message queue.

Message queue should be able to support fast read and write of component communication messages, and Redis itself supports high-speed data access, which can meet the read and write performance requirements of message queue. However, there are other requirements for message queues besides performance, so many people are asking the question, “Is Redis a good message queue?”

In fact, there are two core issues behind this question:

  • What are the message access requirements for message queues?
  • How does Redis implement the message queue requirement?

In this lesson, we will talk about the characteristics of message queues and the message queue scheme provided by Redis. The technical practice of implementing message queues based on Redis can be thoroughly understood only by connecting these two aspects of knowledge with practical experience. Later, when you need to do message queue selection for distributed system components, you can choose a suitable Redis message queue scheme according to the requirements of component traffic and message communication speed.

Let’s start with the first question: What are the requirements for message reading on message queues?

Message access requirements for message queues

I’ll start by describing the process of message queue accessing messages. In a distributed system, when two components communicate based on a message queue, one component passes the data to the message queue in the form of a message, and then the component can continue to perform other operations. Another component on the remote side reads the message from the message queue and processes it locally.

To make things easier for you, let me use an example.

Suppose component 1 needs to sum up the collected data and write it to the database. However, the messages arrive so fast that component 1 cannot do both the collection, the calculation, and the writing to the database in time. So, we can use message queue-based communication to have component 1 save data X and Y as JSON-formatted messages and send them to the message queue so that it can continue receiving new data. Component 2 asynchronously reads data from the message queue, sums it up on server 2, and writes it to the database. This process is shown below:

The component of the message queue that sends the message is called a producer (component 1 in the example) and the component that receives the message is called a consumer (component 2 in the example). The following figure shows a generic message queue architectural model:

When using message queues, consumers can asynchronously read producer messages and then process them. This way, even if the producer can send messages much faster than the consumer can process them, the messages already sent by the producer can be cached in the message queue, avoiding blocking the producer, which is a big advantage of message queues as distributed component communication.

However, when accessing messages, message queues must meet three requirements, namely, message order preservation, processing repeated messages and message reliability.

Requirement 1: message ordering

Although consumers process messages asynchronously, they still need to process messages in the order in which they were sent by producers, so that the messages sent later are not processed first. For the scenario that requires message order preservation, once the message is processed out of order, the business logic may be executed incorrectly, causing losses to the business side.

Let’s look at a scenario that updates the inventory of goods.

Suppose the producer is responsible for receiving the inventory update request and the consumer is responsible for actually updating the inventory, and the existing inventory is 10. The producer sends message 1, which updates the inventory record for item X to 5, and message 2, which updates the inventory record for item X to 3. If messages 1 and 2 are not ordered in the message queue, and message 2 is processed before message 1, then it is clear that an inventory update error occurred. This is unacceptable for business applications.

Faced with this situation, you might think of a solution: Instead of using the updated inventory as a message sent by the producer, use the inventory deduction value as the content of the message. So message 1 is minus inventory 5, message 2 is minus inventory 2. If there is no inventory query request between message 1 and message 2, this scheme ensures that the final inventory is correct, that is, inventory is 3, even if the consumer processes message 2 first and then message 1.

However, we also need to consider the situation where the consumer receives three messages: message 1, which deducts inventory 5, message 2, which reads inventory 2, and message 3, which deducts inventory 2. If the consumer processes message 3 first (which deducts inventory 2), the inventory becomes 8. The consumer then processes message 2 and reads that the current inventory is 8, which results in an incorrect inventory query. From a business application perspective, messages 1, 2, and 3 should be executed sequentially, so message 2 should query the inventory after 5 subtractions, not after 2 subtractions. Therefore, the scheme of using the inventory deduction value as the message introduces the problem of data read errors in the scenario of both read and write operations in the message. Also, there is a problem with this scheme, which is repetitive message processing.

Requirement 2: Repeat message processing

When consumers read messages from message queues, they sometimes retransmit messages because of network congestion. At this point, consumers may receive multiple duplicate messages. Repeated messages processed more than once by consumers can result in a business logic being executed more than once, and if the business logic happens to modify the data, the data will be modified more than once.

Again, taking inventory update as an example, suppose that the consumer receives a message 1 to deduct inventory 5, and then receives a message 1 again. If the consumer cannot recognize that the two messages are actually the same message, the inventory 5 will be deducted twice, and the inventory is not correct. This, of course, is unacceptable.

Requirement 3: Message reliability assurance

In addition, consumers may fail to process messages due to failures or outages. At this point, the message queue needs to provide a guarantee of message reliability, that is, when the consumer restarts, the message can be read again for processing, otherwise, there will be a message leakage problem.

Redis’s List and Streams data types satisfy these three requirements for message queues. Let’s take a look at the list-based message queue implementation.

List-based message queue solution

The List itself accesses data in a first-in, first-out order, so if you use the List as a message queue to store messages, you can meet the requirements of message ordering.

Specifically, producers can use LPUSH to write messages to a List in sequence, while consumers can use RPOP to read messages from the other side of the List and process them in the order in which they were written.

As shown in the figure below, the producer first writes two inventory messages, 5 and 3, using LPUSH to update the inventory to 5 and 3. The consumer uses RPOP to read out the two messages in turn and process them accordingly.

However, there is a potential point of performance risk when consumers read the data.

When a producer writes to a List, the List does not actively notify the consumer that a new message is being written. If the consumer wants to process the message in a timely manner, it needs to call the RPOP command repeatedly in the program (such as using a while(1) loop). If a new message is written, the RPOP command returns the result; otherwise, the RPOP command returns a null value and the loop continues.

So, even if no new messages are written to the List, the consumer keeps calling the RPOP command, which causes the CPU of the consumer program to spend all the time executing the RPOP command, causing unnecessary performance losses.

To solve this problem, Redis provides the BRPOP command. The BRPOP command is also called blocking read. If no data is read from the queue, the client automatically blocks until new data is written to the queue and then reads new data. This saves CPU overhead compared to the consumer program constantly calling the RPOP command itself.

Now that the problem of message ordering is solved, we need to consider the problem of duplicate message handling, which requires that the consumer program itself be able to judge duplicate messages.

On the one hand, a message queue should be able to provide a globally unique ID number for each message. Consumer programs, on the other hand, record the IDS of messages that have been processed.

When a message is received, the consumer program can compare the received message ID with the recorded processed message ID to determine whether the currently received message has been processed. If it has, the consumer program is no longer processing it. This processing property is also known as idempotency. Idempotency means that for the same message, the consumer receives one processing result and receives multiple processing result.

However, the List itself does not generate an ID number for each message, so the globally unique ID number of the message needs to be generated by the producer program before sending the message. Once generated, we need to include this globally unique ID in the message when inserting the message into the List using the LPUSH command.

For example, we insert a message with global ID 101030001 and inventory 5 into the message queue by executing the following command:

LPUSH mq "101030001:stock:5"
(integer) 1
Copy the code

Finally, let’s look at how the List type ensures message reliability.

When a consumer program reads a message from a List, the List no longer stores that message. So, if the consumer program fails or goes down while processing the message, the message is not processed, and the consumer program can’t read the message from the List again when it starts up again.

To save messages, the List type provides the BRPOPLPUSH command, which lets the consumer program read the message from a List and insert the message into another List (called a backup List) to save. This way, if the consumer program reads the message but fails to process it properly, it can re-read the message from the backup List and process it after it restarts.

I have drawn a diagram showing the process of using the BRPOPLPUSH command to save the message and the consumer to read the message again.

The producer first inserts messages “5” and “3” into message queue MQ using LPUSH. The consumer program uses the BRPOPLPUSH command to read message “5”, which is also inserted by Redis into the MQBack queue. If the consumer program goes down while processing message “5,” after it restarts, it can read message “5” again from mqBack and continue processing.

Ok, so here you can see that, based on the List type, we can satisfy the three main requirements for message queues for distributed components. However, when using lists as message queues, we may also encounter a problem: Producers send messages quickly and consumers process them slowly, causing messages in the List to pile up and strain Redis memory.

At this point, we want to start multiple consumer programs to form a consumer group and share processing of the messages in the List. However, the List type does not support the implementation of consumer groups. So is there a more appropriate solution? This brings us to the Streams data types that Redis has provided since version 5.0.

In contrast to List, Streams also meets the three main requirements of message queues. Furthermore, it supports message reading in the form of consuming groups. Let’s take a look at how Streams is used.

A Stream-based message queue solution

Streams is a Redis data type designed specifically for message queues, which provides a rich set of message queue operation commands.

  • XADD: Insert message, ensure order, can automatically generate global unique ID;
  • XREAD: used to read messages, data can be read by ID;
  • XREADGROUP: reads messages as a consumer group;
  • XPENDING and XACK: The XPENDING command can be used to query all messages that have been read but not acknowledged by consumers within each consumer group, while the XACK command is used to confirm to the message queue that message processing is complete.

First, let’s look at the Streams type operation XADD to access messages.

The XADD command inserts new messages in key-value pairs into the message queue. Streams can automatically generate a globally unique ID for each message that is inserted.

For example, we can insert a message with a key of repo and a value of 5 into the message queue named MQSTREAM by executing the following command. The * after the message queue name indicates that Redis should automatically generate a globally unique ID for the inserted data, such as “1599203861727-0”. Of course, we can set an ID number after the message queue name without *, as long as the ID number is globally unique. However, using * is more convenient and efficient than setting your own ID number.

XADD mqstream * repo 5
"1599203861727-0"
Copy the code

It can be seen that the global unique ID of the message consists of two parts. The first part “1599203861727” is the current server time calculated in milliseconds when the data is inserted. The second part represents the message sequence number of the inserted message within the current millisecond, which is numbered from 0. For example, “1599203861727-0” represents the first message within “1599203861727” milliseconds.

When a consumer needs to read a message, it can read it directly from the message queue using the XREAD command.

When XREAD reads a message, it can specify a message ID and start reading from the next message with that message ID.

For example, we could start with the message with ID 1599203861727-0 and read all subsequent messages (there are three in the example).

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      2) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      3) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"
Copy the code

In addition, the consumer can set a block configuration item when XRAED is invoked to implement a blocking read operation similar to BRPOP. When there are no messages in the message queue, XREAD blocks once the block configuration item is set, and the duration of the block can be set in the Block configuration item.

For example, let’s take a look at the following command, where the “$” sign at the end of the command indicates that the latest message is read, and we set the configuration item block 10000, 10000 in milliseconds, indicating that XREAD is reading the latest message and if no message arrives, XREAD will block for 10000 milliseconds (10 seconds) and then return. There is no message in the message queue MQSTREAM after XREAD in the following command executes, so XREAD returns nil after 10 seconds.

XREAD Block 10000 streams MQstream $(nil) (10.00s)Copy the code

These operations are also supported by List, so let’s take a look at the Streams-specific features.

Streams can create a consumer group using XGROUP. Once a consumer group has been created, Streams can use the XREADGROUP command to have consumers within the consumer group read messages.

For example, we execute the following command to create a consumer group named group1 that consumes the message queue mqSTREAM.

XGROUP create mqstream group1 0
OK
Copy the code

We then execute a command to make the consumer in the group1 consumer group consumer1 read all messages from mqStream, where the last argument of the command “>” reads from the first message that has not yet been consumed. Since no other consumer in Group1 has read the message before Consumer1 reads it, Consumer1 gets all the messages in the MQStream message queue (there are four of them).

XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
      2) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      3) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      4) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"
Copy the code

Note that once a message in a message queue has been read by one consumer in a consumer group, it cannot be read by any other consumer in that consumer group. For example, if we execute the XREADGROUP command and then execute the following command to make the message read by consumer2 in group1, consumer2 will read null because the message has already been read by Consumer1, as follows:

XREADGROUP group group1 consumer2  streams mqstream 0
1) 1) "mqstream"
   2) (empty list or set)
Copy the code

The purpose of a consumer group is to have multiple consumers in the group share the reading of messages, so we typically have each consumer read part of the message so that the message read load is evenly distributed across multiple consumers. For example, we execute the following command to have consumer1, 2, and 3 in Group2 each read a message.

XREADGROUP group group2 consumer1 count 1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599203861727-0" 2) 1) "repo" 2)  "5" XREADGROUP group group2 consumer2 count 1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599274912765-0" 2) 1) "repo" 2) "3" XREADGROUP group group2 consumer3 count 1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599274925823-0" 2) 1) "repo" 2) "2"Copy the code

To ensure that Streams can still read PENDING messages after a failure or restart, Streams automatically uses an internal queue (also known as a PENDING List) to hold messages read by each consumer in the consumer group until a consumer uses the XACK command to tell Streams that the message has been processed. If the consumer does not successfully process the message, it does not send the XACK command to Streams and the message remains. At this point, consumers can view messages that have been read but have not been confirmed to have been processed with the XPENDING command after a restart.

For example, let’s look at the number of unconfirmed messages read by individual consumers in Group2. The second and third rows of the result returned by XPENDING represent the minimum and maximum ids of messages read by all consumers in Group2, respectively.

XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"
Copy the code

If we need to take a closer look at what data is being read by a consumer, we can execute the following command:

XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"
   2) "consumer2"
   3) (integer) 513336
   4) (integer) 1
Copy the code

You can see that the ID of the message read by Consumer2 is 1599274912765-0.

Once message 1599274912765-0 has been processed by Consumer2, consumer2 can notify Streams using the XACK command, and the message is deleted. When we look again with the XPENDING command, we can see that Consumer2 has no read messages that have not yet been confirmed to be processed.

 XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)
Copy the code

Now that you know how to implement a message queue with Streams, I want to emphasize that Streams is a data type designed specifically for Redis 5.0 message queue scenarios. If you have Redis 5.0 or later, you should consider using Streams as a message queue.

summary

This class, we studied the distributed system components using the three requirements: when the message queue messages isotone, repeat the message processing and reliability assurance, the three requirements can be further converted to the three requirements for a message queue: orderly message data access, the message data has a globally unique number, and message data after completion of the consumption is deleted.

I drew a table summarizing the features and differences of implementing message queues with List and Streams. Of course, in the process of practice, you can also further supplement and improve the list based on new accumulation.

In fact, there has been a debate about whether Redis is suitable for message queuing. Many people think that to use message queues, you should use Kafka, RabbitMQ and other software specifically for message queue scenarios, while Redis is more suitable for caching.

Based on my experience in Redis development over the years, my view is that Redis is a very lightweight key value database. Deploying a Redis instance is to start a process, deploying a Redis cluster, that is, deploying multiple Instances of Redis. When Kafka and RabbitMQ are deployed, additional components are involved. For example, ZooKeeper needs to be deployed to run Kafka. Kafka and RabbitMQ are generally considered heavyweight message queues compared to Redis.

Therefore, the question of whether to use Redis for message queuing cannot be generalized. We need to consider the volume of data at the business level, as well as the requirements for performance, reliability and scalability. If the message traffic of the components in the distributed system is not large, then Redis only needs to use limited memory space to meet the requirements of message storage, and the high performance features of Redis can support fast message read and write, which is a good solution of message queue.

Each lesson asking

Well, as usual, LET me give you a quick question. If a producer is sent to the message queue, to be more consumer to read and process (for example, a message is an acquisition of data from the business system, real-time calculation, read by the consumer 1 should also is consumer 2 read and keep to the HDFS distributed file system, so that subsequent history query). What data types of Redis would you use to solve this problem?

Feel free to leave your thoughts and answers in the comments section, and if you find today’s content helpful, please share it with more people. I’ll see you next time.