Redis Stream is basically an Append Only log data structure similar to Kafka. It has two main features: 1. BLOCK API and 2. Consumer Group

Redis Stream is available on the official website: Intro to Redis Stream Intro to Redis Stream Intro to Redis Stream Intro to Redis Stream

A: Entry IDs

composition

Create a record using the XADD command

>XADD mystream * sensor-id 1234 temperature 19.8
1592836847842-0
Copy the code

This returns an Entry ID that uniquely identifies a record. It consists of two parts:

<millisecondsTime>-<sequenceNumber>
Copy the code

Milliseconds indicates the unique ID generated by the Redis node based on the current timestamp.

  1. The current time is smaller than the previous time
  2. Multiple records exist for this millisecond

In the second case, the sequence number represents the number of records at this point in time. The default value starts at 0 and is 0 even if there is no sequenceNumber. In the first case, the documentation says, if the current time is less than the previous time, increment the previous time instead of using the present time. So we have order.

The data in order

The data written must be larger than the current maximum ID.

Compare the id size, also compare the timestamp, then the sequence number.

Data consumption

Redis Stream supports three consumption methods:

  1. Multiple consumers can see the arrival of new messages simultaneously
  2. Look at the data generated during that time in chronological terms.
  3. Multiple nodes consume the same data, and a message can only be consumed by one node.

These three commands correspond to XREAD XRANGE, XREVRANGE, and XREADGROUP commands respectively. Here’s a brief overview:

Unlike Pub/Sub, Redis Stream stores message history. Even if the client is offline, the client can pull and fetch the required data. XREAD can read or listen to data after an entry ID. Which read support COUNT parameter, continuous listening support BLOCK way.

127.0.0.1:6379> XREAD BLOCK 0 STREAMS mystream $1) 1) "mystream" 2) 1) 1) 1) "1592840681758-0" 2) 1) 1) "message" 2) "ABC" (22.72s) 127.0.0.1:6379> XREAD BLOCK 0 STREAMS mystream $1) 1) "mystream" 2) 1) 1) 1) "1592840705832-0" 2) 1) 1) "message" 2) "ABC" (3.56 s)Copy the code

The data in a certain range is XRANGE and XREVRANGE, where COUNT and – + are supported to indicate the smallest and largest Entry IDS respectively. XREVRANGE and XRANGE are the same.

In XRANGE start and End, if sequenceNumber is not specified, start defaults to 0 and end defaults to the maximum value

Consumer Group

It is very important to understand that Redis consumer groups have nothing to do from the point of view of the implementation with Kafka (TM) consumer groups, but they are only similar from the point of view of the concept they implement, so I decided to do not change terminology compared to the software product that initially popularized such idea.

Basic concepts and processes:

  1. Each message is consumed by a different Consumer; there is no single message sent to more than one Consumer.
  2. Consumers of the same Consumer Group are case-sensitive by name. The Consumer Group holds all states
  3. Every consumer group has itfirst ID never consumedWhen the consumer requests data, only the data that is not sent can be retrieved (XREADGROUP >).
  4. Use consumer group consumption, need to useACKCommand to verify that this message is properly consumed and can be excluded from the Consumer Group.
  5. When the message is sent to the consumer, it has not beenACKWhen marked, the message is maintained inPENDINGList. But each consumer can only see messages addressed to himself.(PEL and XREADGROUP)

XREADGROUP

127.0.0.1:6379> xreadgroup group mygroup Alice count 1 STREAMS mystream > 1) 1) 1) 1) 1) 1) 2) 1) "key" 2) "value1" 127.0.0.1:6379> xreadGroup group mygroup Alice count 1 STREAMS mystream 0 1) 1) 1) "1592842069466-0" 2) 1) "key" 2) "value1" 127.0.0.1:6379> xack myStream myGroup 1592842069466-0 (integer) 1 127.0.0.1:6379> xreadGroup group mygroup Alice count 1 STREAMS mystream 0 1) 1) 2) (empty array)Copy the code
  1. ifIDfor>, will only accept data that is not sent to other consumers, and will be updatedlast ID
  2. ifIDFor other valid numeric IDS, we will gethistory of pending messages. Sent to the consumer, but executedACKThe data of

XPENDING and XCLAIM

Some consumers may never recover, but some data is still in PEL. At this time, we can send it to the PEL of other consumers through XCLAIM. See the documentation for specific syntax.

The overall process of Consumer Group

So what does a complete Consumer Group process look like today?

  1. When initializing the system, check whether Stream and Group exist. If not, passXGROUP CREATE xxx $ MKSTREAMTo create.
  2. At project startup, the messages in the PEL in this Consumer are detected and processed. The message is guaranteed to be ACK.(Retry should be implemented in code)
  3. Main process: Set the ID to>Listen for the arrival of new data, process and then ACK.
  4. BackUp: periodic scan PEL(no Consumer specified), reprocess data outside a certain processing time range (via XREAD) or assign it to another Consumer via XCLAIM (which requires step 2 to be a timed task).

Subsequent task

  1. Id generation and order guarantee in Redis distributed system
  2. Why is Redis Stream designed as a time series?
  3. Tests clock callback and associated ID generation