Original link: mp.weixin.qq.com/s/\_q0bI62i…

Nowadays, most Internet applications are designed with distributed system architecture, so message queue has gradually become the core means of internal communication of enterprise application systems.

It features low coupling, reliable delivery, broadcasting, flow control, and ultimate consistency.

The most popular message queues are RabbitMQ, RocketMQ, ActiveMQ, Kafka, ZeroMQ, MetaMQ, etc. Some databases, such as Redis, MySQL and PHXSQL, can also implement the function of message queues if hard implemented.

One might feel that the various open source MQ versions are sufficient, so why implement MQ in Redis?

  • Some simple business scenarios may not require heavyweight MQ components (Kafka and RabbitMQ are heavyweight message queues compared to Redis)

Have you considered using Redis for message queues?

In this chapter, I will combine the characteristics of message queue and the use of Redis message queue, as well as the actual use of the project, to discuss the scheme of Redis message queue with you.

Review the message queue

Message queue refers to the use of efficient and reliable messaging mechanism for platform-independent data exchange, and based on data communication to carry out distributed system integration.

By providing message passing and message queuing model, it can provide application decoupling, elastic scaling, redundant storage, traffic peak clipping, asynchronous communication, data synchronization and other functions in distributed environment. As an important component in distributed system architecture, it has a pivotal position.

mq_overview

Now let’s review the general characteristics of the message queues we use:

  • There are three roles: producer, consumer, message processing center
  • Asynchronous processing: A producer sends a message to a virtual channel (message queue) without waiting for a response. The consumer subscribes or listens to the channel to retrieve the message. They don’t interfere with each other, and they don’t even need to be online at the same time, which is what we call loosely coupled
  • Reliability: Messages need to be guaranteed against loss, repeated consumption, and sometimes ordering

Apart from our usual message-oriented middleware, which data types of Redis do you think meet the general requirements of MQ

Redis implements message queuing

There are only two data types, List and Streams, that can implement these requirements for message queues, and of course Redis also provides a publish/subscribe (PUB /sub) schema.

Let’s take a look at the use and scenarios of each of these three methods.

2.1 List implements message queuing

A Redis list is a simple list of strings, sorted by insertion order. You can add an element to either the head (left) or the tail (right) of the list.

So it’s often used for asynchronous queues. The task structure that needs to be deferred is serialized as a string into the Redis list, from which another thread polls the data for processing.

Redis provides several pairs of List commands. Take a quick look at these commands to make them look familiar

List Common commands

Pick up a few pop-up and eject commands to create many poses

  • LPUSH and RPOP go in left and out right

  • RPUSH and LPOP go in right and out left

    127.0.0.1:6379> lpush mylist a a b c d e (integer) 6 127.0.0.1:6379> rpop mylist “a” 127.0.0.1:6379> rpop mylist “a” 127.0.0.1:6379> rpop mylist “b” 127.0.0.1:6379>

redis-RPOP

Immediate consumption problem

Through LPUSH and RPOP, there will be a performance risk point, that is, if consumers want to process data in time, they have to write a logic like while(true) in the program, constantly call RPOP or LPOP command, which will bring some unnecessary performance loss to the consumer program.

Therefore, Redis also provides blocking commands such as BLPOP and BRPOP. If the client does not read the queue, it will automatically block until new data is written to the queue. This saves unnecessary CPU overhead.

  • LPUSH and BRPOP block left in and right out

  • RPUSH and BLPOP block right in and left out

    127.0.0.1:6379> lpush yourlist a b c d (integer) 4 127.0.0.1:6379> blpop yourlist 10

    1. “yourlist”
    2. “d”

    127.0.0.1:6379 > blpop yourlist 10

    1. “yourlist”
    2. “c”

    127.0.0.1:6379 > blpop yourlist 10

    1. “yourlist”
    2. “b”

    127.0.0.1:6379 > blpop yourlist 10

    1. “yourlist”
    2. “a”

    127.0.0.1:6379> blpop yourlist 10 (nil) (10.02s)

If the timeout is set to 0, you can wait indefinitely until a message pops up

Due to the single-threaded nature of Redis, when consuming data, the same message will be consumed by multiple consumers at the same time, but we need to consider the unsuccessful consumption.

Reliable queue model | ack mechanism

In the above mode, messages in the List queue are deleted from the queue as soon as they are sent. If the consumer does not receive the message due to network reasons, or if the consumer crashes in the process of processing the message, the message can never be restored. The reason is the lack of message confirmation mechanism.

To ensure the reliability of messages, message queues have a complete message Acknowledge mechanism, which is a mechanism for consumers to report to the queue that a message has been received or processed.

What about Redis List?

RPOPLPUSH and BRPOPLPUSH (block) get a message from one list and copy the message to another list (as a backup), and the process is atomic.

This allows us to remove queue elements and implement message acknowledgement after the business process is safely completed.

127.0.0.1:6379> rpush myQueue one (integer) 1 127.0.0.1:6379> rpush myQueue two (integer) 2 127.0.0.1:6379> Rpush Myqueue three (integer) 3 127.0.0.1:6379> rpoplpush myqueue queuebak "three" 127.0.0.1:6379> lrange myqueue 0-1 1) "One" 2) "two" 127.0.0.1:6379> lrange queuebak 0-1 1) "three"Copy the code

redis-rpoplpush

In the projects I have done before, data can be processed in this way. Data identifiers are removed from one List and put into another List. After the safe execution of business operations, data in the List can be deleted.

Of course, there are even more special scenarios, which can realize delayed message queue through ZSET. The principle is that after the message is added to the ZSET structure, the timestamp to be consumed can be set to the corresponding score, as long as the business data is not repeated data.

2.2 Subscribe and publish message queue implementation

We all know that there are two kinds of message models

  • Point-to-point: point-to-point
  • Publish/Subscribe(Pub/Sub)

The List implementation is actually a point-to-point model. Let’s take a look at the Redis publish-subscribe model (message multicast), which is the root of Redis MQ

redis-pub_sub

The publish/subscribe pattern also enables inter-process messaging, as follows:

The Publish/subscribe pattern contains two roles, publisher and subscriber. A subscriber can subscribe to one or more channels, and a publisher can send a message to a specified channel, which is received by all subscribers to that channel.

Redis implements SUBSCRIBE and PUBLISH mode through PUBLISH, SUBSCRIBE and other commands. This function provides two information mechanisms, namely SUBSCRIBE/PUBLISH to channel and SUBSCRIBE/PUBLISH to mode.

What’s the difference between this channel and mode?

Channel can be understood as a key value of Redis, while mode can be understood as a key similar to regular matching, but only a channel that can match a given pattern. Instead of explicitly subscribing to multiple names, you can subscribe to multiple channels at once through schema subscriptions.

Let’s start three Redis clients to see the effect:

redis-subscribe

First, two clients subscribe to a channel named Framework, and then the third client sends a message to framework. You can see that the first two clients receive the corresponding message:

redis-publish

We can see that the subscribing client receives a message with three parameters at a time:

  • Type of message
  • The name of the originating channel
  • Actual message

Again, subscribe to a channel that matches a given pattern, this time with the command PSUBSCRIBE

redis-psubscribe

We send a message to the java.framework channel, and not only the Consumer1 and Consumer2 who subscribe to the channel can receive the message, but also the Consumer3 and Consumer4 who subscribe to the schema Java.*.

redis-psubscribe1

Pub/Sub common commands:

2.3 Streams Implements message queues

A disadvantage of Redis publish subscriptions (pub/ SUB) is that messages cannot be persisted and will be discarded if the network is disconnected, Redis goes down, etc. There is also no Ack mechanism to ensure the reliability of the data, so if there are no consumers, the message is simply discarded.

Then Antirez, Redis’ father, started a separate project called Disque to address these issues, but it didn’t work out, and github’s update was made five years ago, so we won’t discuss it.

Redis 5.0 adds a more powerful data structure called Stream. It provides message persistence and master/slave replication functions, allowing any client to access data at any time, remembering the location of each client’s access, and ensuring that messages are not lost.

It is like an add-only message chain, where all the added messages are strung together, each with a unique ID and corresponding content. And the message is persistent.

redis-stream

Each Stream has a unique name, which is the Redis key, created automatically when we append the message the first time using the Xadd directive.

Streams is a Redis data type designed specifically for message queues, so it provides a rich set of message queue operation commands.

Stream common commands

CRUD engineer online

Add, delete, change and check a wave

Xadd mystream * f1 v1 f2 v2 F3 v3 "1609404470049-0" ## There are two components, # Message ID must be larger than last ID 127.0.0.1:6379> xadd mystream 123 F4 V4 (error) ERR The ID specified in xadd is Equal or smaller than the target stream top item # customizable ID 127.0.0.1:6379> xadd mystream 1609404470049-1 F4 V4 "1609404470049-1" # - indicates the minimum value, + indicates the maximum value, can also specify the maximum message ID, or the minimum message ID, 127.0.0.1:6379> xrange myStream - + 1) 1) "1609404470049-0" 2) 1) "f1" 2) "v1" 3) "f2" 4) "v2" 5) "f3" 6) "v3" 1) 1) "1609404470049-1" 2) 1) "F4" 2) "v4" 127.0.0.1:6379> xdel myStream 1609404470049-1 (integer) 1 127.0.0.1:6379> Xlen mystream (integer) 1 # Delete the entire stream 127.0.0.1:6379> del mystream (integer) 1Copy the code

Independent consumption

Xread gets the list of messages either blocking or non-blocking, with the BLOCK option indicating blocking and a timeout of 0 ms (meaning it never times out)

127.0.0.1:6379> xread count 2 streams mystream 0 1) 1) "mystream" 2) 1) 1) "16094051785366-0" 2) 1) 1) "f5" 2) "v5" 2) 1) "1609405198676-0" 2) 1) "f1" 2) "v1" 3) "f2" 4) "v2" 127.0.0.1:6379> xread block 0 streams mystream $1) 1) "mystream" 2) 1) 1) "1609408791503-0" 2) 1) f6" 2) v6" (42.37s)Copy the code

As you can see, instead of passing a regular ID to the stream myStream, I pass a special ID $. This special ID means that XREAD should use the maximum ID that the stream myStream has stored as its last ID. So that we only receive new messages since the time we started listening. This is somewhat similar to the Unix command tail -f.

Of course, you can specify any valid ID.

Furthermore, the blocking form of XREAD can also listen for multiple Stremas at the same time by specifying multiple key names.

127.0.0.1:6379> xread block 0 streams mystream yourstream $$Copy the code

Create consumer Groups

Xread can be fanned out to N clients, however, in some cases what we want to do is not provide the same message flow to many clients, but provide different subsets of messages from the same to many clients. For example, three consumers consume a Stream in rotation.

redis-stream-cg

Redis Stream borrows a lot from Kafka’s design.

  • Consumer Group: With the concept of Consumer Group, each Consumer Group is independent and does not affect each other. A Consumer Group can have multiple consumers
  • Last_delivered_id: Each consumer group has a cursor that moves the value of last_delivered_id across the array to indicate which message has been consumed by the current consumer group
  • Pending_ids: State variables for consumers that maintain unidentified ids for consumers. Pending_ids records messages that have been read by the client, but have no ACK yet. If the client does not ack, the number of message ids in this variable will increase and decrease as soon as a message is ack. The pending_ids variable, officially known as PEL (Pending Entries List) in Redis, is a core data structure that ensures that a client consumes a message at least once without losing it in mid-network transmission.

redis-group-strucure

Streams are not partitioned like Kafak. If you want to achieve partition-like functionality, you have to use policies on the client side to write messages to different streams.

  • Xgroup create: Creates a consumer group
  • Xgreadgroup: Reads messages in the consumer group
  • Xack: ack the specified message

Select * from consumer group where ID = 0 and $= $ 127.0.0.1:6379> xgroup create mystream mygroup $OK 127.0.0.1:6379> xinfo stream mystream 1) "length" 2) (integer) 3) "radix tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1609408943089-0" 9) "groups" 10) (INTEGER) 1 # One consumer group 11) "first-entry" # first message 12) 1) "1609405178536-0" 2) 1) "F5" 2) "V5" 13) "last-entry" # last message 14) 1) "1609408943089-0" 2) 1) "f6" 2) "v6" 127.0.0.1:6379>Copy the code

Consumption by consumption group

Stream provides the XreadGroup directive for in-group consumption of consumer groups by providing the consumer group name, consumer name, and start message ID. Like Xread, it can also block waiting for new messages. When a new message is read, the corresponding message ID is entered into the consumer’s PEL(message in process) structure. After processing, the client notifies the server with the XACK directive that the message has been processed and the message ID is removed from the PEL.

Select last_delivered_id from myStream (select last_delivered_id, select last_delivered_id from myStream) Last_delivered_id = 127.0.0.1:6379> xreadGroup group mygroup1 c1 count 1 streams mystream > 1) 1) 2) 1) 1) "1609727806627-0" 2) 1) "f1" 2) "v1" 3) "f2" 4) "v2" 5) "f3" 6) "v3" 127.0.0.1:6379> xreadGroup group mygroup1 c1 Count 1 streams mystream > 1) 1) "mystream" 2) 1) 1) 1) 1) 1) "1609727818650-0" 2) 1) "f4" 2) "v4" Xreadgroup group mygroup1 c1 count 2 streams mystream > (nil) # Block 0 streams mystream > µ1) 1) "mystream" 2) 1) 1) 1) "1609728270632-0" 2) 1) "f5" 2) "v5" (89.36s) # 127.0.0.1:6379> xinfo groups mystream 1) 1) "name" 2) "mygroup1" 3) "consumers" 4) (integer) 2 # 2 2) "pending" 6) (INTEGER) 7) "last-id" 8) "1609728270632-0" 127.0.0.1:6379> xack myStream mygroup1 1609727806627-0 # ack drop specified message (integer) 1Copy the code

This is the end of taste, not further.

In my opinion, Stream is not currently available as a mainstream MQ, and the use cases are few and far between.