An overview of the

Streams is one of the most important features added to Redis5.0 over Redis4.0. Streams is a basic data structure for Redis. Streams is a powerful new multicast persistent message queue that takes its design cues from Kafaka. The Streams data type itself is very simple, somewhat similar to a hash structure, but its additional features are extremely powerful and complex:

  • Support persistence. Streams stores data persistently, unlike streamspub/subThe mechanism andlistMessages are deleted after they are consumed, and streams consumed data is persisted in history.
  • Supports multicast. This is topub/subIt’s kind of similar.
  • Support consumer groups. Streams allows consumers within the same consumer group to compete for messages and provides a set of mechanisms that allow consumers to view their own historical consumption messages. This allows you to monitor streams’ consumer group information, consumer group information, and the state of messages within streams.

Based on the content

Data ID

Streams provides a default ID schema that uniquely identifies each piece of data in streams. It consists of two parts: < zippex econdstime >-

zippex econdstime is the time of the redis server. SequenceNumber is used for data created in the same millisecond. One thing to note is that streams ids grow monotonously, even if the redis server has an abnormal time. If the current number of milliseconds is less than the previous number, the largest number of milliseconds in the history is used, and the sequence number is incremented. The reason for this is that the Streams mechanism allows you to look up data based on a time interval or a time node or an ID.

Insert data into Streams

XADD key ID field value [field value…]

127.0.0.1:6379> XADD mystream * name dwj age 18
"1574925508730-0"127.0.0.1:6379 >Copy the code

The above example uses XADD to add a stream of data to a stream named Mystream. The ID * indicates that streams is used and the default ID is used. In this case, the 1574925508730-0 returned by Redis is the ID redis generated for the data we inserted.

The streams command to query the streams length is XLEN

127.0.0.1:6379 > XLEN mystream (integer6379 > 3 127.0.0.1) :Copy the code

Read data from Streams

Log files can be used for comparison. You can view historical logs and query logs by range. You can use the TAIL -f command in Unix to listen to logs. Multiple users can view only the part of the log that they have permission to view.

Query by range: XRANGE and XREVRANGE

XRANGE key start end [COUNT COUNT] XRANGE key start end [COUNT COUNT] XRANGE key start end [COUNT COUNT] A closed interval containing start and end will be returned. The two special ids – and + represent the minimum and maximum POSSIBLE IDS, respectively.

127.0.0.1:6379> XRANGE mystream - +
1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
2) 1) "1574925508730-0"
   2) 1) "name"
      2) "dwj"
      3) "age"
      4) "18"127.0.0.1:6379 >Copy the code

We mentioned id is included in the data to create data in front of time information, which means we can according to the time range queries data, in order to according to the time range queries, we omit the serial number of the id, if you omit, can use to start the id 0 as the default serial number, for maximum serial number end id will be used as a default value, So we can query the data using two Unix timestamps and get all the data in that time interval.

1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"127.0.0.1:6379 >Copy the code

You may also have noticed that the last edge of the syntax is the count parameter, which allows us to return a fixed amount of data at a time, and then start the next query based on the last_id of the returned data, allowing us to batch return data in a very large stream. The XREVRANGE command is the same as XRANGE, but returns the elements in reverse order without repeating them.

Read data through XREAD

XREAD [COUNT COUNT] [BLOCK milliseconds] Streams key [key…] id [id …] In this case, we will mainly subscribe to streams new data via XREAD. This operation may be similar to the pub/sub mechanism of REDIS or the concept of blocking queues, which wait for a key and then fetch new data, but there are essential differences with these two:

  • Streams withpub/subandBlocking queueAllows streams to wait for data from multiple clients. By default, Streams will push messages to all clients waiting for Streams datapub/subThis is similar, but Streams also allows messages to be pushed to one of its clients through a competition mechanism (this mode uses the concept of consumer groups, which is covered later).
  • pub/subThe messages are fire and Forget and are never stored, you can only subscribe to messages generated after the time you subscribed, and messages are only pushed to the client once, and history is not viewable. And the use ofBlocking queueWhen a client receives a message, the element pops out of the queue; in other words, the history of a consumer’s consumption of the message cannot be viewed. In Streams, all messages are added to streams indefinitely (messages can be explicitly deleted and there is a culling mechanism), and clients need to remember the last message they received to retrieve new messages from the node.
  • Streams consumer groups provide a level of control that cannot be achieved by Pub/Sub or blocking lists, different groups of the same Stream, the ability to explicitly acknowledge processed items, examine pending items, declare unprocessed messages, and have consistent historical visibility for each consumer, A single client can only view its own message history.

    Read data from Streams
    127.0.0.1:6379> XREAD COUNT 2 STREAMS 1) 1)"mystream"
     2) 1) 1) "1574835253335-0"
           2) 1) "name"
              2) "bob"
              3) "age"
              4) "23"
        2) 1) "1574925508730-0"
           2) 1) "name"
              2) "dwj"
              3) "age"
              4) "18"127.0.0.1:6379 >Copy the code

    Like the list structure, Streams provides commands to block reads

    XREAD BLOCK 0 STREAMS mystream
    Copy the code

    The above command specifies the BLOCK option with a timeout of 0 milliseconds (meaning it will never expire). In addition, this place uses a special ID$This particular ID represents the largest CURRENT STREAMS ID, meaning that you will only read streams messages after the time you listened. It’s kind of unix-liketail -f. In addition, XREAD can listen to data in multiple streams at the same time.

Consumer groups

Instead of multiple clients processing the same message, we want multiple clients to take different messages from Streams and process them. That’s the producer-consumer model that we use. Imagine we have two producers p1, P2, three consumers C1, C2, C3 and seven goods. We want to proceed as follows

p1 =>item1 => c1
p2 =>item2 => c2
p1 =>item3 => c3
p2 =>item4 => c1
p1 =>item5 => c2
p2 =>item6 => c3
p1 =>item7 => c1
Copy the code

To address this scenario, Redis uses a concept called consumer, somewhat similar to Kafka, but only in appearance. A consumer group is like a pseudo-consumer that reads data from the stream, distributes it to the consumers in the group, and records what data the consumer group consumes, processes that data, and provides a range of functions.

  1. Each message is provided to a different consumer, so it is impossible to deliver the same message to multiple consumers.
  2. Consumers are identified in the consumer group by a name, which is a case-sensitive string that the customer that enforces the consumer must select. This means that the consumer group retains all of its state even after the connection has been disconnected, because the client will re-apply as the same consumer. However, this also means that the client provides a unique identifier.
  3. Each consumer group has a concept that the first ID will never be consumed, so that when a consumer requests a new message, it can provide a message that has never been delivered before.
  4. A consumption message requires explicit confirmation using a specific command to indicate that the message has been processed correctly and can therefore be expelled from the consumer group.
  5. The consumer group keeps track of all currently pending messages, that is, some of the consumers in the consumer group that the message was delivered to but has not yet been recognized as processed. Because of this feature, when accessing a Stream’s history message, each consumer will only see the message that was passed to it.

Its model looks something like the following

| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
Copy the code

From the above model we can see that the consumer group records the last message processed and distributes the message to different consumers, each of whom can only see its own message. If we think of consumer groups as secondary data structures for Streams, we can see that a Stream can have multiple consumer groups, and within a single consumer group, multiple consumers. In effect, one Streams client allows a client to read using XREAD while another client reads data from a consumer group.

Create a consumer group

Let’s start by creating a Streams that contains some data

127.0.0.1:6379> XADD fruit * message apple
"1574935311149-0"
127.0.0.1:6379> XADD fruit * message banada
"1574935315886-0"
127.0.0.1:6379> XADD fruit * message pomelo
"1574935323628-0"
Copy the code

Then create a consumer group

127.0.0.1:6379> XGROUP CREATE fruit mygroup $OKCopy the code

Note that we need to specify an ID. Here we use the special id$. We can also use 0 or a Unix timestamp so that the consumer group will only read messages after this node.

Now that the consumer group is created, we can use the XREADGROUP command to immediately start trying to read messages from the consumer group. XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …] , like XREAD, provides the BLOCK option. Let’s say the specified consumers are Alice and Bob, and see how the system returns different messages to Alice and Bob.

127.0.0.1:6379> XREADGROUP GROUP  mygroup Alice COUNT 1 STREAMS fruit >
1) 1) "fruit"
   2) 1) 1) "1574936034258-0"
         2) 1) "message"
            2) "apple"127.0.0.1:6379 >Copy the code

The above command represents the message: I want to read streams fruit data through mygroup. I am Alice in mygroup. Please send me a piece of data. The > operator is valid only in the live text of the consumer group, indicating that the message has not been processed by another consumer so far. We can also use a valid ID, in which case the consumer group will tell us about the history of pending messages and not the new ones. This feature is also useful because when a consumer restarts for some reason, we can view our own history of pending messages and process new messages after we have processed the pending messages. We can use the XACK command to tell the consumer group that a message has been processed correctly and should not show up in my list of historical pending messages. XACK key group ID [ID…]

127.0.0.1:6379> XACK fruit mygroup 1574936034258-0
(integer1)Copy the code

A few things to keep in mind:

  1. Consumers are created automatically when they are first mentioned and do not need to be explicitly created.
  2. Even with XREADGROUP, you can read from multiple keys at the same time, but to make this work, you need to create a consumer group with the same name for each Stream. This is not a common requirement, but it should be noted that this capability is technically possible.
  3. The XREADGROUP command is a write command because the consumer group is modified when it reads the message from the Stream, so this command can only be invoked on the master node.

Recovery from permanent failure

There may be multiple consumer consumption messages in a consumer group, but it may also be the case that one consumer permanently exits the consumer group, and we need a mechanism to assign pending messages from that consumer to another consumer in the consumer group. This requires the ability to view and process messages and assign a message to a given consumer. The former is done through a command called XPENDING, whose syntax is XPENDING key group [start end count] [consumer]

127.0.0.1:6379> XPENDING fruit mygroup
1) (integer1) 2)"1574936042937-0"
3) "1574936042937-0"
4) 1) 1) "Alice"
      2) "1"
Copy the code

The returned result above represents that the consumer group has one pending command, the start ID of the pending message is 1574936042937-0, and the end ID is 1574936042937-0. The consumer named Alice has one pending command, and some people may be curious that we put 3 fruits into the fruit group earlier. XACK is used to process a fruit, and there should be two fruits in the consumer pending list. In fact, the consumer group’s pending list is a collection of consumer pending messages in the group. When a consumer obtains messages from the group, the state of the consumer group will be changed. This is why XREADGROUP must be called from the master node as mentioned earlier. We can use the start End Count parameter to see the status of messages in a range

127.0.0.1:6379> XPENDING fruit mygroup - + 10 Alice
1) 1) "1574936042937-0"
   2) "Alice"(3)integer), 903655 (4)integer1) 2) 1)"1574936052018-0"
   2) "Alice"(3)integer), 491035 (4)integer1)Copy the code

So we see the details of a message. The consumer of the message with id 1574936042937-0 is Alice, whose pending time is 903655, and the message is assigned once. We will find that the processing time of the first message is a little long, and we suspect that Alice can no longer process this message, so we want to assign this message to Bob. In this scenario, we need to use the XCLAIM command, whose syntax is XCLAIM… In the command, min-idle-time indicates the minimum idle time of a message. Messages are allocated only when the idle time of a message is greater than this value. The idle time of a message is reset when the message is allocated. Because the idle time is reset when the message is assigned to the first client, the second command is invalidated. We could also use a separate process to constantly find time-out messages and assign them to active consumers, but note that if the number of times the message is allocated reaches a certain threshold, it should not be redistributed but put somewhere else.

The observability of Streams

Streams is fairly observable, and the preceding XPENDING command allows you to see the status of Streams pending messages within a consumer group. But we want to see more, like how many groups there are under this Streams, and how many consumers there are under this group. To do this, use the XINFO command:

127.0.0.1:6379> XINFO STREAM mystream
 1) "length"(2)integer) 2, 3)"radix-tree-keys"(4)integer) 1. 5)"radix-tree-nodes"(6)integer7) 2)"groups"(8)integer1) 9)"last-generated-id"
10) "1574925508730-0"
11) "first-entry"
12) 1) "1574835253335-0"
    2) 1) "name"
       2) "bob"
       3) "age"
       4) "23"
13) "last-entry"
14) 1) "1574925508730-0"
    2) 1) "name"
       2) "dwj"
       3) "age"
       4) "18"
Copy the code

The output tells us the length of the Streams, the number of groups, and the details of the first and last messages. Here’s a look at the Streams stream stream message:

127.0.0.1:6379> XINFO GROUPS fruit
1) 1) "name"
   2) "mygroup"
   3) "consumers"(4)integer) 1. 5)"pending"(6)integer7) 2)"last-delivered-id"
   8) "1574936052018-0"
2) 1) "name"
   2) "mygroup-1"
   3) "consumers"(4)integer) 0. 5)"pending"(6)integer) 0, 7)"last-delivered-id"
   8) "0-0"
Copy the code

And you can see from the output that there are two groups under Fruit, the name of the group and the number of messages to be processed, the last message to be processed. We can view the status of consumers in the consumer group in detail.

127.0.0.1:6379> XINFO CONSUMERS fruit group 1) 1)"name"
   2) "Alice"
   3) "pending"(4)integer) 2. 5)"idle"(6)integer), 1990242 (2) 1)"name"
   2) "Bob"
   3) "pending"(4)integer) 1. 5)"idle"(6)integer) 9178
Copy the code

From the output, you can see the number of consumer pending messages and how long the consumer is idle.

Set streams Upper Limit

If you can view history from Streams, you might wonder if streams would be sufficient if they were added to memory indefinitely. Once the number of messages reaches its limit, it is necessary to permanently delete messages or persist them to the database. Redis also provides support for such scenarios. One way is to specify the maximum length of Streams when we use XADD. XADD myStream MAXLEN ~ 1000 can be preceded by a ~ mark. It is not necessary to keep the length at 1000 exactly, but more than 1000 is acceptable. If you do not use this identifier, performance will be worse. Another option is to use XTRIM, which also uses the MAXLEN option, > XTRIM mystream MAXLEN ~ 10

Some special ID

As mentioned earlier, there are special ids in the Streams API. The first are – and +, which are used in the XRANGE command to represent the minimum and maximum ID, respectively. – represents 0-1, + represents 18446744073709551615-18446744073709551615, from the use of a lot more convenient. It can be used in range queries such as XPENDING. $represents the largest CURRENT ID in Streams, and in XREAD and XGROUP represents only incoming messages. Note that the meanings of $and + are not the same. There is also a special ID called >, which can only be used in the XREADGROUP command, meaning that in this consumer group, it is never assigned to any other consumer, so > is always used as the last DELIVERED ID in the group.

Persistence, replication, and message security

As with other redis data structures, Streams are asynchronously copied to slave nodes, persisted to AOF and RDB files, and the state of consumer groups is persisted according to this mechanism. A few points to note are:

  • If message persistence and state are important, AOF must use strong fsync. (AOF records every command that changes redis data. There are many persistence mechanisms, which are used hereappendfsync alwaysThis will seriously slow down Redis.
  • By default, asynchronous replication does not guarantee that data from the slave node is the same as that from the master node, and some content may be lost after a failover, depending on the ability of the node to receive data from the master node.
  • WAITCommands can be used to force changes to be transmitted to a set of slave nodes. While this makes it less likely that data will be lost, Redis Sentinel and Cluster do not necessarily use slave nodes with the latest data for failover, and may use slave nodes that lack some data in the event of a particular failure.

    So when designing applications with Redis Streams and consumer groups, make sure you know your application’s strategy during failures and configure it accordingly to assess whether it is secure enough for your application.

Delete data from Streams

To delete streams, run the XDEL command. The syntax is XDEL key ID. Note that in the current implementation, memory is not really reclaimed until the macro node is completely empty, so you should not abuse this feature.

The performance of the streams

Streams’ non-blocking commands, such as XRANGE or XREAD and XREADGROUP, which do not use the BLOCK option, are consistent with redis’s normal commands, so there is no need to discuss them. You can check the time complexity of the corresponding command in the Redis documentation if you are interested. The streams command is as fast as set to a certain extent, and the XADD command is very fast, with 50W to 100W data inserts per second on a typical machine. We are interested in the performance of a consumer group blocking scenario from inserting a piece of data into Streams through the XADD command to the consumer reading the message through the group. To test the delay between message generation and consumption, we use a Ruby program to test the message generation time as a field of the message, and then push the message to Streams. After receiving the message, the client compares the current time with the production time to calculate the delay time of the message. The program is not optimized for performance and runs on a dual-core machine on which Redis also runs to simulate scenarios under less-than-ideal conditions. Messages are generated 1W per second and there are 10 consumer consumption data in the group. The test results are as follows:

Processed between 0 and 1 ms -> 74.11% Processed between 1 and 2 ms -> 25.80% Processed between 3 and 4 ms -> 0.01%Copy the code

99.9% of requests had a latency of 2 milliseconds or less, and outliers were very close to average. Two other points to note:

  1. Consumers process 1W messages at a time, which adds some latency, so that slower consumers can maintain the flow of messages.
  2. The system used to do the testing was very slow compared to the current system.


Redis. IO /topics/stre…

Worktile’s website: Worktile.com

Translated by Worktile engineer Wenjie Du

This article was first published on Worktile’s official blog.