Advanced RocketMQ principle of message consumption

Don’t complain when you encounter difficulties. Since you can’t change the past, try to change the future

“This is the 12th day of my participation in the First Challenge 2022. For details: First Challenge 2022”

Method of getting messages

Consumers can get messages from brokers in two ways: pull and push

【pull】

The Consumer takes the initiative to pull messages from the Broker. Once the batch messages are retrieved, the consumption process is initiated. However, the real-time performance of this method is weak, that is, consumers cannot discover and consume new messages in the Broker in time. Since the pull interval is specified by the user, it is necessary to pay attention to the stability when setting this interval: if the interval is too short, the proportion of empty requests will increase. The interval is too long and the timeliness of the message is too poor

Push consumption

In this mode, the Broker actively pushes data to the Consumer when it receives it. This acquisition method is in a typical publish-subscribe mode, that is, a Consumer registers a listener with its associated Queue. Once a new message is detected, the callback will be triggered. The callback method is that the Consumer pulls the message from the Queue. These are based on the long connection between the Consumer and Broker. The maintenance of long connections consumes system resources


Consuming message pattern

Consumer groups have two modes of news consumption: Clustering and Broadcasting

Radio consumption

What is broadcast consumption

In the broadcast mode, each consumer in the consumer group will consume information under all the Topic topics. For example, if there are 10 messages under the Topic and 10 consumers in the consumer group, each consumer will consume 10 messages

Progress of consumption

The consumption progress is saved on the Consumer side. In broadcast mode, each consumer in a consumer group consumes all messages, but their consumption schedules are different. So consumers keep their own consumption progress

Cluster consumption

What is cluster consumption

Cluster consumption means that all the messages under a Topic are consumed by each consumer in the consumer group on average. For example, if there are 10 messages under a Topic and there are 10 consumers in the consumer group, the 10 messages will be evenly distributed, meaning that each consumer will consume only one message

Progress of consumption

The consumption progress is saved in the broker. All consumers in a Consumer group collectively consume messages in the same Topic, and the same message is consumed only once. Progress will be involved in the consumption of load balance, so the consumption is the need to share progress, progress of consumption in the store of broker/config/consumeOffset json



The figure shows four queues consuming 500 pieces of information in the TopicTest topic

Rebalance mechanism

Rebalance is based on the premise of cluster consumption

What is a Rebalance

Rebalance. This is the process of reallocating queues in a Topic among consumers in the same Consumer Group

For example,

The Rebalance mechanism is meant to increase the concurrent consumption of messages. For example, there are five queues under a Topic, and if there is only one consumer, that consumer will be responsible for consuming messages from the five queues. If we add a consumer at this point, we can allocate 2 queues to one consumer and 3 queues to the other, increasing the parallel consumption of messages.

Rebalance limit

Since a queue is assigned to only one consumer, if the number of consumers in the consumer group is greater than the number of queues in the topic, then the extra consumers will be idle and do nothing

Rebalance the problem

1. Consumption pause [sort of like STW JVM garbage collection]

When there is only one Consumer, it is responsible for consuming all queues. Adding a Consumer triggers the Rebalance. At this point, the original Consumer needs to suspend part of the queue consumption, wait until these queues are allocated to the new Consumer, these suspended consumption queue can continue to be consumed

2. Repeat spending

When a Consumer consumes the newly assigned queue, it must continue consuming the offset of the consumption progress submitted by the previous Consumer. By default, however, offsets are committed asynchronously. This asynchrony causes the offset submitted to the Broker to differ from the message actually consumed by the Consumer. This inconsistent difference is the message that is likely to be re-consumed

  1. Synchronization to submit

After the consumer submits the offset of the messages it has consumed to the broker, it waits for a successful ACK from the broker. After receiving an ACK, the consumer continues to fetch and consume the next batch of messages. The consumer is blocked while waiting for an ACK

  1. Asynchronous submission

The consumer does not need to wait for a successful ACK from the broker after submitting the offset of its consumed batch of messages to the broker. Consumers can directly obtain and consume the next batch of messages. Note: For the number of messages read at a time, it is necessary to choose a relatively balanced number according to the specific business scenario. Because of the large number, system performance improves, but the number of messages that generate repeated consumption may increase; If the number is too small, system performance degrades, but the number of messages consumed repeatedly may decrease

3. Consumption spikes

There are two reasons for this:

  1. Rebalance can lead to repeated consumption if there are too many messages that need to be repeated
  2. Or because the Rebalance was paused too long and some messages were backlogged

Both of these reasons can consume a lot of messages immediately after the Rebalance is over

The cause of Relalance

  1. The number of queues for the topics to which consumers subscribe changes

The resulting scene:

  1. The Broker expands or shrinks
  2. Broker Upgraded o&M
  3. The network between the Broker and NameServer is abnormal
  4. Queue Capacity expansion or reduction
  5. Change in the number of consumers in the consumer group

Rebalance in both cases:

  1. The Consumer Group expands or shrinks its capacity
  2. Consumer Upgrade O&M
  3. The network between the Consumer and NameServer is abnormal

Rebalance process

Multiple Map collections are maintained within the Broker, which dynamically store information about queues in the current Topic and Consumer instances in the Consumer Group. Rebalance each instance of the Consumer Group when it sees a change in the number of queues subscribed to by the Consumer or changes in the number of consumers in the Consumer Group. After receiving the notification, the Consumer will use the Queue allocation algorithm to obtain the corresponding Queue. Rebalance the Consumer instance

Queue allocation algorithm

Queues in a Topic can only be consumed by one Consumer in the Consumer Group, and a Consumer can consume messages in multiple queues at the same time. How the pairing relationship between Queue and Consumer is determined, that is, to which Consumer the Queue is allocated for consumption, also has an algorithmic strategy. There are four common strategies:

1. Equal Distribution Policy [Default]

The algorithm is based onAvg = number of queues/number of consumersThe calculation results are allocated. If divisible, the avG queues are assigned to consumers one by one. If not divisible, the extra queues are allocated one by one in Consumer order.

Calculate how many queues each Consumer should have, and then allocate each Queue to each Consumer in turn

2. Circular averaging strategy

The ring averaging algorithm refers to allocating queues one by one in the ring graph composed of queues according to the order of consumers. This algorithm does not need to calculate how many queues each Consumer needs to allocate in advance, but can directly divide them one by one



3. Consistent hash policy

The algorithm stores the hash value of the consumer as a Node on the hash ring, and then places the hash value of the queue on the hash ring. Clockwise, the nearest consumer to the queue is the consumer to which the queue is assigned. Unevenly distributed.



4. Use the same equipment room policy

The algorithm filters queues in the same machine room as the current consumer based on the location of the queue and consumer. The same machine room queue is then allocated according to the average allocation policy or ring average policy. If there are no queues in the same room, all queues are allocated according to the average allocation policy or the ring average policy

Compare the four algorithms

  1. The problem with consistent hash algorithms: Complex and potentially uneven results
  2. The efficiency of circular allocation and average allocation is higher

So what’s the point of a consistent hash algorithm that can effectively reduce a lot of Rebalance due to the scaling and scaling of consumer groups

For example, rebalance



Rebalance the consistency hash algorithm



The consistent Hash algorithm occurs in scenarios where there is frequent rebalance, for exampleFrequent consumer turnover

At least once rule

RocketMQ has a principle that each message must be successfully consumed once

What is successful consumption

After consuming the message, the Consumer submits the offset of its consumption message to its consumption progress logger. The offset is successfully recorded in the logger, and the consumption is successfully consumed

What is a consumption progress logger

  1. For the broadcast consumption pattern, the Consumer itself is the consumption progress logger because the offset is maintained locally
  2. For the clustered consumption pattern, the Broker is the consumption progress logger because the offset is maintained within the Broker