This article was first published on my blog. The link is: blog.d77.xyz/archives/d1…

preface

A recent project used the Stream data type in Redis as a message queue, which is more convenient than other Redis implementations of message queues. Because it is the first time to use, record knowledge points later standby.

The Stream type

There are many different implementations of message queues based on Redis, but most of them have their own characteristics and problems. Redis is just a cache 😂.

The stream type provides almost all of the functionality needed for a message queue, including but not limited to:

  • Serialized generation of message IDS
  • Message traversal
  • Process unacknowledged messages
  • Blocking and non-blocking reads of messages
  • Grouping consumption of messages
  • Message queue monitoring

And so on…

Here’s how to use the Stream data type.

Use of the stream type

Xadd command

The syntax format is:

XADD key ID field value [field value …]

  • Key, which specifies the name of the stream
  • ID, used to specify the ID value, the most common is *
  • field value [field value …] , key-value data of the type

Xadd is used to add messages to the specified key, which is automatically created if it does not exist. The messages to be added are of key-value type. You can add multiple messages at a time.

The ID, most commonly *, is automatically generated by Redis in the format of 1526919030474-55. The generated ID consists of a milliseconds timestamp and a serial number. The serial number is used to distinguish messages generated within the same milliseconds and ensure that the ID is always incremented. If the generated timestamp is smaller than the value recorded in REDis because the system clock is slow for some other reasons, the system increses the maximum value recorded in the system to ensure the increasing status of the ID.

127.0.0.1:6379> xadd message 1 key1 value1 key2 value2
"1-0"
127.0.0.1:6379> xadd message 1-2 key1 value1 key2 value2
"1-2"
Copy the code

Ids are normally assigned automatically by Redis, but they can also be customized. In order for ids to increment, manually specified ids must be larger than existing ids in the system, but this is not usually done.

127.0.0.1:6379> xadd message * key1 value1 key2 value2
"1604475735664-0"
Copy the code

The above command adds two messages key1-value and key2-value2 to the message key. The return value is the ID of the current message, which is automatically generated by Redis. At this time, a message can be read in the message queue.

127.0.0.1:6379> xadd message maxlen 10 * key3 value3
"1604476672762-0"
Copy the code

The maxlen parameter is used to limit the maximum number of messages in a key. However, it is inefficient to limit the number of messages in a key precisely. You can use the ~ symbol to roughly limit the number of messages in a key. This is normal, but no less than the limit.

Xread command

The syntax format is:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] id [id …]

  • [COUNT COUNT] is used to obtain the number of messages
  • [BLOCK milliseconds], which sets blocking mode and blocking timeout. Default is non-blocking
  • id [id …] Where ID > ID, block mode can use ID, block mode can use ID, block mode can use to get the latest message ID, non-blocking mode meaningless.
  • Key, which specifies the stream name

The xread command is used to read messages from one or more keys, returning only messages whose ID value is greater than the ID passed in as a parameter. This command has blocking and non-blocking uses, and returns null if there is no message in the key.

127.0.0.1:6379> xread streams message 0
1) 1) "message"
   2) 1) 1) "1-0"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      2) 1) "1-2"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      3) 1) "1604476672762-0"
         2) 1) "key3"
            2) "value3"
Copy the code

The above command outputs all messages in non-blocking mode, since there are no messages whose ID is less than 0, so all messages are output.

In blocking mode, you can use the $sign to get the latest message. If there are no new messages within the specified timeout period, null is returned.

127.0.0.1:6379> xread count 10 block 10000 Streams Message $(nil) (10.02s) 127.0.0.1:6379> xread count 10 block 10000 streams message $ 1) 1)"message"
   2) 1) 1) "1604478070071-0"
         2) 1) "keyblock2"
            2) "value"(5.00 s)Copy the code

Enter xadd message * keyblock2 value to add a new message to the key. You can see that the command above returns the value that was added and the blocking time.

Xlen command

Syntax format:

XLEN key

Returns the number of messages in the key, or 0 if the key does not exist. Even if the number of messages in a key is zero, the key is not automatically deleted because there may still be consumer groups associated with the key.

127.0.0.1:6379 > xlen message (integer5)Copy the code

Returns the number of all messages in message.

Xrange command

The syntax is as follows:

XRANGE key start end [COUNT count]

  • Key, which specifies the stream name
  • Start: indicates the start ID
  • End, the termination ID
  • [COUNT COUNT], the number of reads

This command returns a message that matches the given ID range. The range of ids is specified by the start and end parameters.

127.0.0.1:6379> xrange message - + 
1) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
3) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
4) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
5) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"
Copy the code

This command consists of two special ids, using – to indicate the minimum ID value and + to indicate the maximum ID value. All messages can be queried.

127.0.0.1:6379> xrange message 1 1
1) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
Copy the code

Even if the ID is incomplete, entering only the ID value will output all messages with different serial numbers of the same ID.

127.0.0.1:6379> xrange message - + count 3
1) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
3) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
Copy the code

Use the count parameter to limit the number of output messages.

A simple loop allows you to iterate over all the values in a key with a small amount of memory, using the largest ID from the last iteration as the starting ID for the next iteration.

Xrevrange command

Grammar description:

XREVRANGE key end start [COUNT count]

The syntax of the xrevrange command is exactly the same as that of the xrange command, except that xrevrange is traversed backwards.

127.0.0.1:6379> xRevrange message + -1) 1)"1604478070071-0"
   2) 1) "keyblock2"
      2) "value"
2) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
3) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
4) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
5) 1) "1-0"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
Copy the code

Xtrim command

Grammar description:

XTRIM key MAXLEN [~] count

  • Key, which specifies the stream name
  • Maxlen, specifies the pruning strategy, which is currently the only one implemented
  • [~], whether approximate pruning
  • Count, the number of trims

The xtrim command discards messages with small IDS.

127.0.0.1:6379> xread streams message 0
1) 1) "message"
   2) 1) 1) "1-0"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      2) 1) "1-2"
         2) 1) "key1"
            2) "value1"
            3) "key2"
            4) "value2"
      3) 1) "1604476672762-0"
         2) 1) "key3"
            2) "value3"
      4) 1) "1604478059261-0"
         2) 1) "keyblock"
            2) "value"
      5) 1) "1604478070071-0"
         2) 1) "keyblock2"
            2) "value"
127.0.0.1:6379> xtrim message maxlen 4
(integer1)Copy the code

The return value of the xtrim command is the number of trimmed ids.

127.0.0.1:6379> xrange message - +
1) 1) "1-2"
   2) 1) "key1"
      2) "value1"
      3) "key2"
      4) "value2"
2) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
3) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
4) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"
Copy the code

Looking again, you can see that the number of messages in the key has been pruned down by one to four.

127.0.0.1:6379> xtrim message maxlen ~ 2
(integer) 0
Copy the code

If the ~ parameter is used, pruning may not take place. This parameter tells Redis to perform the pruning only when the entire macro node can be removed, which is more efficient and ensures that the number of messages is not less than the required number.

Xdel command

Grammar description:

XDEL key ID [ID …]

  • Key, which specifies the stream name
  • ID [ID …] , the ID to be deleted

The xdel command is used to delete messages with a specified ID from the key. If the ID does not exist, the number of messages returned may be different from the number of messages deleted. When the xdel command is executed, Redis does not delete the corresponding message in memory, but only marks it as deleted, and after all nodes have been deleted, the entire node is destroyed and memory is reclaimed.

127.0.0.1:6379> xdel message 1-2
(integer) 1
127.0.0.1:6379> xrange message - +
1) 1) "1604476672762-0"
   2) 1) "key3"
      2) "value3"
2) 1) "1604478059261-0"
   2) 1) "keyblock"
      2) "value"
3) 1) "1604478070071-0"
   2) 1) "keyblock2"
      2) "value"
Copy the code

Example Delete the message whose ID is 1-2.

Xgroup command

Grammar description:

XGROUP [CREATE keygroupnameid -or-][SETIDkeygroupnameid−or−] [SETIDkeygroupnameid -or-][SETIDkeygroupnameid−or−] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]

  • [CREATE key groupname id-or-], CREATE a group on the specified key and specify the starting point for the group to read messages. If 0 is specified, the group can read all historical messages on the specified key. If 0 is specified, CREATE a group on the specified key. Reads the message and specify the grouping of the starting point, if you specify a 0, group will be able to read all of the specified key message history, if specified, at the specified key is created in groups, and specify the starting point of grouping read messages, if you specify a 0, group will be able to read all of the specified key message history, if specified, Grouping will be able to read new messages with the specified key, and will not be able to read historical messages. You can also specify any start ID.
  • [SETID key groupname id-or-$] to reset the starting point for reading messages for existing groups. For example, setting the starting point to 0 will re-read all history messages
  • [DESTROY key groupname] destroys a group in the specified key
  • CREATECONSUMER Key groupName ConsumerName creates a consumer in the specified key and the specified group. A new consumer is also created automatically when a command mentions a new consumer name.
  • [DELCONSUMER Key groupName ConsumerName] destroys a consumer in the specified key and the specified group.

Xgroup is a command group that can execute different commands using different keywords.

127.0.0.1:6379> xgroup create message read_group $OK 127.0.0.1:6379> xreadgroup group read_groupreadStreams Message > (nil) 127.0.0.1:6379> xadd message * readKey ReadValue"1604494179721-0"127.0.0.1:6379 > xreadgroup group read_groupread streams message >
1) 1) "message"
   2) 1) 1) "1604494179721-0"
         2) 1) "readkey"
            2) "readvalue"
Copy the code

Use the creat command to create a read_group group, specifying that the ID starts with the last ID and is null if read directly (xreadgroup says below).

Use the xadd command to add a new message, and read the message again.

127.0.0.1:6379> xgroup setid message read_group 0
OK
127.0.0.1:6379> xreadgroup group read_group read streams message >
1) 1) "message"
   2) 1) 1) "1604476672762-0"
         2) 1) "key3"
            2) "value3"
      2) 1) "1604478059261-0"
         2) 1) "keyblock"
            2) "value"
      3) 1) "1604478070071-0"
         2) 1) "keyblock2"
            2) "value"
      4) 1) "1604494179721-0"
         2) 1) "readkey"
            2) "readvalue"
Copy the code

Using the setid command to reset the starting point for reading ids, you can read all history messages.

127.0.0.1:6379> xinfo groups message
1) 1) "name"
   2) "read_group"
   3) "consumers"(4)integer) 1. 5)"pending"(6)integer) 4, 7)"last-delivered-id"
   8) "1604494179721-0"
Copy the code

Use the xinfo command to query the information about the newly created group. You can see the group name, the number of consumers, and the ID value of the last message added (described below in the xinfo command).

127.0.0.1:6379> xinfo consumers message read_group
1) 1) "name"
   2) "read"
   3) "pending"(4)integer), 4, 5)"idle"(6)integer) 476449
Copy the code

Use the xinfo command to view information about the newly added consumer. You can see the consumer name and the number of pending messages.

127.0.0.1:6379> xgroup delconsumer message read_group read
(integer) 4
127.0.0.1:6379> xinfo consumers message read_group
(empty array)
Copy the code

Use the delconsumer command to delete the consumers in the group, use the xinfo command to view the consumers in the group, return an empty array, indicating that the deletion is successful. The return value of the delconsumer command is the number of pending messages that the current consumer has.

127.0.0.1:6379> xgroup destroy message read_group
(integer) 1
127.0.0.1:6379> xinfo groups message
(empty array)
Copy the code

Run the destroy command to destroy the group in message. Run the xinfo command to view the group. If an empty array is returned, the deletion is successful. The return value of the destroy command is the number of groups that were successfully deleted. Note: Even with active consumers and pending messages, the grouping will still be removed, so you need to ensure that this command is executed only when necessary.

Xinfo command

Grammar description:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

  • [CONSUMERS key groupname] : queries information about CONSUMERS in the specified key and group.
  • [GROUPS key] : queries information about GROUPS in the specified key.
  • [STREAM key] : queries all information in the specified key.
127.0.0.1:6379> xinfo consumers message read_group
1) 1) "name"
   2) "read"
   3) "pending"(4)integer), 4, 5)"idle"(6)integer) 206796
Copy the code

Read all consumer information in the read_group group of message.

127.0.0.1:6379> xinfo groups message
1) 1) "name"
   2) "read_group"
   3) "consumers"(4)integer) 1. 5)"pending"(6)integer) 4, 7)"last-delivered-id"
   8) "1604494179721-0"
Copy the code

Read all packet information in message.

127.0.0.1:6379> xinfo stream message
 1) "length"(2)integer3) 4)"radix-tree-keys"(4)integer) 1. 5)"radix-tree-nodes"(6)integer7) 2)"last-generated-id"
 8) "1604494179721-0"
 9) "groups"(10)integer1) 11)"first-entry"
12) 1) "1604476672762-0"
    2) 1) "key3"
       2) "value3"
13) "last-entry"
14) 1) "1604494179721-0"
    2) 1) "readkey"
       2) "readvalue"
Copy the code

Read all messages from message.

Xpending command

Grammar description:

XPENDING key group [start end count] [consumer]

  • Key: specifies the key
  • Group: specifies a group
  • [start end count], start ID, end ID, and quantity
  • The name of the consumer
127.0.0.1:6379> xpending message readgroup 
1) (integer) 2, 2)"1604496633846-0"
3) "1604496640734-0"
4) 1) 1) "read"
      2) "2"
Copy the code

The xpending command looks at the number of unpending messages in the corresponding group and the names of their corresponding consumers as well as the start and end ids.

127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496633846-0"
   2) "read"(3)integer), 513557 (4)integer2) 4) 1)"1604496640734-0"
   2) "read"(3)integer), 482927 (4)integer1)Copy the code

You can use the xpending command to view the details of messages that are in the pending state.

Xreadgroup command

Grammar description:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

  • GROUP, fixed
  • Group: indicates the group name
  • The name of the consumer
  • [COUNT COUNT], the number of messages obtained at a time
  • [BLOCK milliseconds], BLOCK mode and timeout
  • [NOACK], does not need to acknowledge messages, for less important messages that can be lost
  • STREAMS, fixed
  • key [key …] , the specified key
  • ID [ID …] , the specified message ID, > specifies that all unconsumed messages are read, and other values specify pending messages

The xreadgroup command refines the operation of reading messages by combining with consumer groups and consumers to read and confirm messages.

The xreadgroup command is syntactically almost identical to the xread command, which has one mandatory parameter: GROUP groupName consumerName.

When multiple consumers consume the same message queue at the same time, the same message is consumed repeatedly, and each message is consumed once by each consumer. But if you want multiple consumers to collaboratively consume the same message queue, you need consumer groups.

For example, the push system must not be pushed repeatedly, that is to say, each message can only be consumed once. At this time, multiple consumers can be used to consume the same push queue, reducing the pressure of each consumer system.

127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> xadd message * key1 value1
"1604496633846-0"
127.0.0.1:6379> xadd message * key2 value2
"1604496640734-0"
127.0.0.1:6379> xgroup create message readgroup 0
OK
127.0.0.1:6379> xadd message * key3 value3
"1604496696501-0"
127.0.0.1:6379> xadd message * key4 value4
"1604496704823-0"127.0.0.1:6379 > xreadgroup group readgroupread count 1 streams message >
1) 1) "message"
   2) 1) 1) "1604496633846-0"
         2) 1) "key1"
            2) "value1"127.0.0.1:6379 > xreadgroup group readgroupread count 1 streams message >
1) 1) "message"
   2) 1) 1) "1604496640734-0"
         2) 1) "key2"
            2) "value2"
Copy the code

Start from scratch, empty the database, re-add messages to Message, create groups, and read the latest messages using the readgroup command.

127.0.0.1:6379> xinfo consumers message readgroup
1) 1) "name"
   2) "read"
   3) "pending"(4)integer) 2. 5)"idle"(6)integer) 53146
127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496633846-0"
   2) "read"(3)integer), 513557 (4)integer2) 4) 1)"1604496640734-0"
   2) "read"(3)integer), 482927 (4)integer1)Copy the code

After reading two messages, run the xinfo command to view the pending status of the two messages. You can use the xpending command to view the details of the pending messages.

Xack command

Grammar description:

XACK key group ID [ID …]

  • Key: specifies the key
  • Group: specifies the group
  • D [ID …] , the ID of the message that you want to confirm

The xack command removes pending messages from the pending queue, which is to acknowledge previously unacknowledged messages. When a message is read with the xreadgroup command, the message is stored in the PEL at the same time, waiting to be acknowledged, and the xack command is invoked to remove pending messages from the PEL and free up memory, ensuring that messages are not lost.

127.0.0.1:6379> xack message readgroup 1604496633846-0
(integer) 1
127.0.0.1:6379> xpending message readgroup 
1) (integer1) 2)"1604496640734-0"
3) "1604496640734-0"
4) 1) 1) "read"
      2) "1"
127.0.0.1:6379> xinfo consumers message readgroup
1) 1) "name"
   2) "read"
   3) "pending"(4)integer) 1. 5)"idle"(6)integer) 418489
Copy the code

Run the xack command to check the number of pending messages, and then run the xpending command to check the number of pending messages. Only one unconfirmed message remains. Run the xinfo command to check the number of pending messages.

Xclaim command

Grammar description:

XCLAIM key group consumer min-idle-time ID [ID …] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

  • Key: specifies the key
  • Group: specifies a group
  • Consumer, the designated consumer
  • Min-idle-time: specifies the minimum number of idle messages and the number of idle messages that are selected
  • ID [ID …] , the message ID
  • [IDLE ms] : Indicates that the IDLE time of the message is set. If this parameter is not provided, the default value is 0
  • [TIME ms-UNIx-time], same as IDLE, Unix timestamp
  • RETRYCOUNT, which sets the number of retries. Xclaim does not change this value. It is usually used for xpending commands to find messages that have not been processed for a long time.
  • FORCE to create pending messages in PEL, even if the specified ID has not been assigned to the PEL of the client.
  • JUSTID returns only the array of claimed message ids, not the actual message.

The xclaim command is used to change the ownership of an unconfirmed message. If a consumer reads a message and hangs before processing it, the message will remain in the pending queue, occupying memory. In this case, you need to use the xclaim command to change the owner of the message so that other consumers can consume the message.

127.0.0.1:6379> xreadGroup group readgroup read2 count 1 streams message > 1)"message"
   2) 1) 1) "1604496696501-0"
         2) 1) "key3"
            2) "value3"
127.0.0.1:6379> xack message readgroup 1604496696501-0
(integer) 1
127.0.0.1:6379> xpending message readgroup - + 10 read
1) 1) "1604496640734-0"
   2) "read"(3)integer), 1258517 (4)integer) 2
127.0.0.1:6379> xpending message readgroup - + 10 read2
(empty array)
Copy the code

Create a new consumer, read a message, then acknowledge the message, check the number of unacknowledged messages between the two consumers, read has one, read2 has no unacknowledged messages.

127.0.0.1:6379> xclaim message readgroup read2 0 1604496640734-0
1) 1) "1604496640734-0"
   2) 1) "key2"
      2) "value2"
127.0.0.1:6379> xpending message readgroup - + 10 read2
1) 1) "1604496640734-0"
   2) "read2"(3)integer), 3724 (4)integer) 4
127.0.0.1:6379> xpending message readgroup - + 10 read
(empty array)
Copy the code

Using the xclaim command to transfer message ownership to the consumer read2, you can see that there is an unacknowledged message in the pending queue of consumer read2 and no message in the pending queue of consumer Read.

conclusion

This is the first time to use Redis to do message queue, experience is still very good, the first time to do this command summary, if there are omissions and errors in the above content, please correct.

Refer to the link

Redis. IO/commands# st…