List

This is the most common way to queue, which is to use lpush & RPOP on a list, as shown below:

To introduce a for loop with a certain sleep time due to the empty queue, the pseudo-code is as follows:

for {
  ifmsg:=redis.rpop(); msg! =nil{
    handle(msg)
  }else{
    time.sleep(1000)}}Copy the code

There may be a risk of delayed processing in this scenario (although in most scenarios it has little impact)

However, Redis has block operator, which realizes blocking pull through BRPOP, and can obtain data in time. Pseudo-code is as follows:

for{# timeout is0, stands for infinite waitingif msg:=redis.brpop(0); msg! =nil{
    handle(msg)
  }
}
Copy the code

However, if the Redis client does not receive messages for a long time, the Redis server may judge the client as an invalid link and kick it offline. Therefore, when the messages are not very dense, If the value is directly set to 0, there is still some risk. It is recommended to keep the minimum wait value of non-0 (1s), and the pseudo-code is as follows:

for {
  if msg:=redis.brpop(1000); msg! =nil{
    handle(msg)
  }
}
Copy the code

It not only ensures real-time performance, but also avoids link disconnection

This is the simplest possible queue, but note that this is not a reliable queue and is mainly a message loss problem: due to the lack of ACK mechanism, if the consumer goes down after pulling (RPOP) messages (or a new version comes online), the message is most likely missing

Second, we’re looking at some of the message queue features:

  • Support for multiple groups of consumers: Since it is a simple list, it is pulled out and disappears, resulting in a message being consumed by a single consumer only once. In some consumer group scenarios, it is not satiable (multiple different businesses consuming the same consumer queue)
  • Message playback: The Redis list does not support this feature when messages need to be rolled back (online bug tracking history data, tests, etc.)

conclusion

  • Easy to implement, that’s why it’s so common, right
  • There is no ACK mechanism and it is unreliable
  • Multiple consumer groups are not supported
  • Message playback is not supported

Pub/Sub

Publish & Subscribe redis is a publish & subscribe command designed to solve the publish/subscribe problem

#producers
127.0.0.1:6379> publish queue 1
(integer) 1

#Consumer 1
127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "1"

#Consumer 2
127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "1"
Copy the code

It mainly solves the problem of supporting multiple groups of consumers, as shown below:

However, pub/ SUB itself does not have the ability of persistence, but only real-time forwarding of data, which will cause the following problems:

  • When a consumer goes offline and goes online again, the old messages are lost and the new messages can only be received because the previous messages are not stored
  • When all consumers log off and no one buys anything, all the messages are lost

Therefore, this requires that consumers be online before producers, or they will lose messages

Second, there is a buffer inside each subscription, and since there is a limit to the buffer, redis will force the consumer offline once the limit is exceeded, resulting in message loss

Here we can see that the list-based queue is a pull model, while pub/sub is a push model, pushed to buffer and then waited for the consumer to fetch

conclusion

  • Support publish/subscribe, support multiple groups of consumers
  • Consumers lose data when they go offline
  • Message pile-up can kick consumers off, causing data loss
  • There is no ACK mechanism and it is unreliable

In general, it is very weak and meaningless, so don’t consider it

Stream

Note that this feature is only available in Redis 5.0

Stream uses XAdd and XreadGroup to produce and consume messages

#producers
127.0.0.1:6379> xadd queue * k1 v1
1636289446933-0
127.0.0.1:6379> xadd queue * k2 v2
1636295029388-0
127.0.0.1:6379> xadd queue * k3 v3
1636291571597-0

#Consumer 1127.0.0.1:6379> xreadGroup group g1 c1 COUNT 1 streams queue > 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 1) 2) "V1" 2) 1) "1636289446933-0" 2) 1) "k1" 2) "v1" 127.0.0.1:6379 > xreadgroup group g1 c1 COUNT 1 streams queue > 1) 1) "queue" 2) 1) 1) "1636295029388-0" 2) 1) "k2" 2) "v2"#Consumer 2127.0.0.1:6379> xreadGroup group g1 C2 COUNT 1 streams queue > 1) 1) "queue" 2) 1) 1) 1) "1636291571597-0" 2) 1) "k3" 2) "v3"Copy the code

Xack and XreadGroup are used to reply and recover messages

#Manual ack
127.0.0.1:6379> xack queue g1 1636289446933-0
(integer) 1

#Query for messages that are not yet committed pending127.0.0.1:6379> xreadGroup group g1 c1 COUNT 1 streams queue 0) 1) "queue" 2) 1) 1) 1) 1) 1) 1) 1) 2) 1) 2) "v2"Copy the code

Most recovery consumption is implemented based on the following code

for{# bypassing (1, 1, 0) {# bypassing (1, 1, 0)if id=0{# unpushed messages start consuming id=">"
  }
  msg:=xreadgroup(lastUnAckID)
  handle(msg)
  xack(msg)
}
Copy the code

In the case that consumers remain unchanged (quantity, unique ID, etc.), the above is a very convenient scheme; However, if there is any change, it is necessary to introduce additional timing polling scheme, or ZooKeeper scheme to ensure consistency of consumers. It is more complicated, but it will be expanded here, and a special stream topic will be opened later

Contrast this with a professional message queue

We compare redis queues with professional queues such as rabbitMQ and Kafka.

  • The message is not lost
  • Message backlog costs are low

The message is not lost

This is a whole topic, need producer, consumer, middleware tripartite cooperation ability to achieve

1. The producer loses messages

  • If no, try again
  • Sent, but the return is lost: this can only be retried, but may result in downstream message duplication

Therefore, if the producer wants to avoid losing the message, he can only try again. At this time, the consumer should consider retry the message processing to achieve idempotency

From this point of view, the producer does not lose messages has nothing to do with the overall middleware, it is entirely a matter of business implementation, whether or not the above exception is considered

2. Information loss of consumers

The main reason is that consumers are down and there is no receipt after taking out

In this scenario, the middleware needs to provide an ACK mechanism to ensure that which messages have been consumed, so that messages are not lost

Redis stream has the same ack mechanism as Kafka and rabbitMQ

3. Lost messages

This is how middleware is implemented

According to @Kaito, Redis has two risks:

  • Aof periodically flusher disks. This process is asynchronous and has the risk of loss
  • Master/slave switch, the master library is not synchronized to the master library (there are doubts, not synchronized can also be referred to the master library 🤔️)

For kafka and rabbitMQ, multiple nodes ack at the same time to consider the write to be successful, which further enhances message reliability

Message backlog

  • Redis: Memory based, limited storage space, messages are discarded after a certain amount of backlog
  • Kafka, rabbitMQ: Hard disk based, considered unlimited storage space

conclusion

  • Ack is supported on a pub/sub basis
  • Support message playback
  • When consumers change, additional encoding needs to be introduced to ensure the reliability of messages, which is complicated
  • There is still a risk that messages will be lost due to component failures

Reference:

How does Redis do message queues?

Redis Stream features