Chapter 3 How RocketMQ works

Fourth, the consumption of information

Consumers can get messages from brokers in two ways: pull and push. There are two models of consumer group’s news consumption: Clustering and broadcast consumption and Broadcasting.

1 Obtain the consumption type

Pull consumption

The Consumer takes the initiative to pull messages from the Broker. Once the batch messages are retrieved, the consumption process is initiated. However, this approach is less real-time, meaning consumers do not find and consume new messages in the Broker.

Since the pull interval is specified by the user, attention should be paid to smoothness when setting the interval: if the interval is too short, empty requests will increase more than regular ones; The interval is too long and the timeliness of the message is too poor

Push consumption

In this mode, the Broker actively pushes data to the Consumer when it receives it. This acquisition method generally has high real-time performance.

This approach is a typical publish-subscribe pattern, in which a Consumer registers a listener to its associated Queue and triggers a callback whenever a new message arrives. The callback method is that the Consumer pulls the message from the Queue. These are based on the long connection between the Consumer and Broker. The maintenance of long connections consumes system resources.

contrast

  • Pull: The application is needed to traverse the associated Queue, and the real-time performance is poor; But it is convenient for applications to control the pulling of messages
  • Push: encapsulates the traversal of the associated Queue, which is real-time but consumes more system resources

2 consumption Pattern

Radio consumption

In broadcast consumption mode, each Consumer instance of the same Consumer Group receives full messages for the same Topic. That is, each message is sent to each Consumer in the Consumer Group.

Cluster consumption

In the cluster consumption mode, each Consumer instance of the same Consumer Group shares messages on the same Topic equally. That is, each message is sent to only one Consumer in the Consumer Group.

Save message progress

  • Broadcast mode: The consumption progress is saved on the Consumer terminal. In broadcast mode, each consumer in a consumer group consumes all messages, but their consumption schedules are different. So consumers keep their own consumption progress.
  • Cluster mode: Consumption progress is stored in the broker. All consumers in a Consumer group collectively consume messages in the same Topic, and the same message is consumed only once. Consumption progress will participate in the load balancing of consumption, so consumption progress needs to be shared. Below is the consumption progress for each Queue of each Topic hosted in the broker.

3 Rebalance mechanism

Rebalance is based on the premise of cluster consumption.

What is a Rebalance

Rebalance. This is the process of reallocating queues in a Topic among consumers in the same Consumer Group.Rebalance is meant to improve messagesConcurrent consumption capacity. For example, there are five queues under a Topic, and if there is only one consumer, that consumer will be responsible for consuming messages from the five queues. If we add a consumer at this point, we can allocate 2 queues to one consumer and 3 queues to the other, increasing the parallel consumption of messages.

Rebalance limit

Since a queue can be assigned to at most one consumer, when the number of consumer instances under a consumer group is greater than the number of queues, the excess consumer instances will not be allocated to any queues.

Rebalance harm

Rebalance while improving consumption power, it also brings some problems:

Consumption pause textColor {red}{consumption pause} Consumption pause: When there is only one Consumer, it is responsible for consuming all queues; Adding a Consumer triggers the Rebalance. At this point, the original Consumer needs to suspend part of the queue consumption, wait until these queues are allocated to the new Consumer, these suspended consumption queue can continue to be consumed.

Repeat textcolor{red}{repeat consumption} Repeat consumption: when a Consumer consumes a new queue assigned to it, it must continue to consume the offset of the previous Consumer’s consumption schedule. By default, however, offsets are committed asynchronously. This asynchrony causes the offset submitted to the Broker to differ from the message actually consumed by the Consumer. This inconsistent difference is the message that is likely to be re-consumed.

Synchronous commit: After the consumer submits the offset of the messages it has consumed to the broker, it waits for a successful ACK from the broker. After receiving an ACK, the consumer continues to fetch and consume the next batch of messages. The consumer is blocked while waiting for an ACK.

Asynchronous commit: The consumer does not need to wait for a successful ACK from the broker after submitting the offset of the messages it has consumed to the broker. Consumers can directly retrieve and consume the next batch of messages.

For the number of messages read at a time, it is necessary to select a relatively balanced number according to specific service scenarios. Because of the large number, system performance improves, but the number of messages that generate repeated consumption may increase; If the number is too small, system performance degrades, but the number of messages consumed repeatedly may decrease.

Rebalance. There can be repeated consumption if there are too many messages that need to be repeated because of the Rebalance, or because the Rebalance has been paused for too long. This can result in consuming a lot of messages immediately after the Rebalance is over.

Causes of Rebalance

This Rebalance can happen either because of a change in the number of queues of topics that consumers subscribe to, or because of a change in the number of consumers in a consumer group.

1) Scenarios where the number of queues changes:

The Broker expands or shrinks

Broker Upgraded o&M

The network between the Broker and NameServer is abnormal

Queue Capacity expansion or reduction

2) Scenarios where the number of consumers changes:

The Consumer Group expands or shrinks its capacity

Consumer Upgrade O&M

The network between the Consumer and NameServer is abnormal

Rebalance process

Multiple Map collections are maintained within the Broker, which dynamically store information about queues in the current Topic and Consumer instances in the Consumer Group. Make a Rebalance call to each instance of the Consumer Group when you see a change in the number of queues a Consumer subscribes to, or a change in the number of consumers in a Consumer Group.

TopicConagManager: Key is the topic name and value is TopicConag. TopicConag maintains data for all queues in this Topic.

ConsumerManager: Key is the Consumser Group Id and value is the ConsumerGroupInfo. ConsumerGroupInfo maintains data for all Consumer instances in the Group.

ConsumerOffsetManager: The key is the combination of the Topic and the Group that subscribes to the Topic, i.e. topic@group, and the value is an inner Map. The key of the inner Map is QueueId, and the value of the inner Map is the consumption progress offset of the Queue.

After receiving the notification, the Consumer instance will use the Queue allocation algorithm to obtain the corresponding Queue. Therefore, the Consumer instance Rebalance itself.

Compared with Kafka

In Kafka, when a Rebalance condition occurs, the Broker calls a Group Coordinator to complete the Rebalance. A Coordinator is a process within a Broker. Coordinator selects a Group Leader from the Consumer Group. The Leader redistributes partitions according to its own group. The result of this redistribution is reported to the Coordinator and synchronized by the Coordinator to all Consumer instances in the Group.

Rebalance in Kafka is done by the Consumer Leader. Rebalance in RocketMQ is done by each Consumer. There is no Leader in a Group.

4 Queue allocation algorithm

Queues in a Topic can only be consumed by one Consumer in the Consumer Group, and a Consumer can consume messages in multiple queues at the same time. So how the pairing relationship between Queue and Consumer is determined, that is, which Consumer is allocated to Queue for consumption, also has an algorithmic strategy. There are four common strategies. These policies are passed in through the constructor when the Consumer is created.

Equal distribution strategy

The algorithm is based onavg = QueueCount / ConsumerCountThe calculation results are allocated. If divisible, the avG queues are assigned to consumers one by one. If not divisible, the extra queues are allocated one by one in Consumer order.

The algorithm calculates how many queues each Consumer should have, and then allocates each Queue to each Consumer one by one.

Circular averaging strategy

The circular averaging algorithm is based on the order in which consumers are allocated in a ring of queues.

This algorithm does not need to calculate how many queues each Consumer needs to allocate in advance, but can be directly divided one by one.

Consistent Hash PolicyThe algorithm stores the hash value of the consumer as a Node on the hash ring, and the hash value of the queue on the hash ringclockwiseDirection, the consumer closest to the queue is the consumer to which the queue is assigned.

The problem of this algorithm: uneven distribution.

Same Room Policy

The algorithm filters queues in the same machine room as the current consumer based on the location of the queue and consumer. The same machine room queue is then allocated according to the average allocation policy or the ring average policy. If there is no queue in the same room, all queues are allocated according to the average allocation policy or the ring average policy.

contrast

Problems with consistent Hash algorithms:

The allocation efficiency of the two average allocation policies is high, but the consistent hash policy is low. Because consistent hash algorithms are complicated. In addition, the results assigned by the homogeneous hash policy are also likely to be uneven.

The existence significance of the consistent Hash algorithm:

This can reduce a lot of Rebalance due to user groups scaling up or downsizing.

Application scenarios of the consistent hash algorithm are as follows:

Scenarios where the number of consumers changes frequently.

At least once rule

RocketMQ has a principle that each message must be successfully consumed once.

So what is successful spending? After consuming the message, the Consumer submits the offset of its consumption message to its consumption progress logger. The offset is successfully recorded in the logger, and the consumption is successfully consumed.

What is a consumption progress logger?

For broadcast consumption patterns, Consumer itself is the consumption progress recorder.

For the clustered consumption pattern, the Broker is the consumption progress logger.

5. Consistency of subscription relationship

Consistency of subscription relationships means that all Consumer instances under the same Consumer Group (with the same Group ID) subscribe to topics and tags and the logic for processing messages must be exactly the same. Otherwise, the logic of message consumption can be confused and even lead to message loss.

1 Correct subscription relationship

Multiple consumer groups subscribe to multiple topics, and the subscription relationships for multiple consumer instances within each consumer group remain consistent.

2 Incorrect subscription relationship

A Consumer group subscribes to multiple topics, but the subscription relationships among the Consumer instances in that Consumer group are not consistent.

Subscribed to different topics

The error in this example is that two Consumer instances in the same Consumer group subscribe to different topics.

Consumer instance 1-1 :(subscribed topic for jodie_test_A, tag for all messages)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A"."*".new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
    System.out.println(message.getMsgID());
    	returnAction.CommitMessage; }});Copy the code

Consumer example 1-2 :(subscribed topic for jodie_test_B, tag for all messages)

Properties properties = newProperties(); properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_B"."*".new MessageListener() {    public Action consume(Message message, ConsumeContext context) {    	System.out.println(message.getMsgID());    	return Action.CommitMessage;    }});
Copy the code

Subscribed to different tags

The error in this example is that two consumers in the same Consumer group subscribe to different tags for the same Topic.

Consumer instance 2-1 :(subscribed to a message whose topic is jodie_test_A and tag is TagA)

Properties properties = newProperties(); properties.put(PropertyKeyConst.GROUP_ID,"GID_jodie_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A"."TagA".new MessageListener() {    public Action consume(Message message, ConsumeContext context) {    	System.out.println(message.getMsgID());    	return Action.CommitMessage;    }});
Copy the code

Consumer example 2-2 :(subscribed topic for jodie_test_A, tag for all messages)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A"."*".new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
    	System.out.println(message.getMsgID());
    	returnAction.CommitMessage; }});Copy the code

Subscribed to different number of topics

The error in this example is that two consumers in the same Consumer group subscribe to different numbers of topics.

Consumer example 3-1 :(the Consumer subscribed to two topics)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A"."TagA".new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
    	System.out.println(message.getMsgID());
    	returnAction.CommitMessage; }}); consumer.subscribe("jodie_test_B"."TagB".new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
    	System.out.println(message.getMsgID());
    	returnAction.CommitMessage; }});Copy the code

Consumer example 3-2 :(the Consumer subscribed to a Topic)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A"."TagB".new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
    	System.out.println(message.getMsgID());
    	returnAction.CommitMessage; }});Copy the code

6. Offset management

Offset here refers to the consumption progress offset of the Consumer.

The consumption progress offset is used to record the consumption progress of different consumption groups in each Queue. Depending on the consumption progress logger, there are two modes: local mode and remote mode.

1 offset Local management mode

When the consumption mode is broadcast consumption, offset is stored in local mode. Because each message will be consumed by all consumers, each consumer manages its own consumption progress, and there is no intersection of consumption progress among consumers.

In broadcast consumption mode, offset data is persisted to the Consumer local disk file in json form. The default file path is.rocketmq_offsets/${clientId}/${group}/Offsets. Json in the current user’s home directory. {clientId} is the current consumer ID. The default value is ip@DEFAULT. ${group} is the consumer group name.

2 offset Remote management mode

When the consumption mode is cluster consumption, offset is managed in remote mode. Because all Cosnumer instances consume messages equitably, all consumers share the Queue’s consumption progress.

Consumer consumption mode in the cluster offset related data in the form of json persisted to Broker a disk file, the file path for the current user home directory store/config/consumerOffset json.

This file is loaded when the Broker starts and written to a two-tier Map (ConsumerOffsetManager). The key of the outer map is topic@group, and the value is the inner map. The key and value of the inner map are queueId and offset. When the Rebalance occurs, the new Consumer will retrieve data from the Map and continue consuming.

In cluster mode, offset is managed remotely to ensure the Rebalance mechanism.

3 offset purposes

How do consumers continue to consume messages from the beginning? Consumers to consume the starting position of the first message is the users themselves through consumer setConsumeFromWhere specified () method.

After Consumer is started, there are three common starting positions for the first message it consumes, which can be set by enumerating type constants. This enumeration is of type ConsumeFromWhere.

CONSUME_FROM_LAST_OFFSET: consumes the current message from the queue

CONSUME_FROM_FIRST_OFFSET: consume from the first message in the queue

CONSUME_FROM_TIMESTAMP: Starts the consumption from the message with the specified body timestamp position. This specific timestamp is specified by a separate statement.

Consumer. SetConsumeTimestamp yyyyMMddHHmmss (” 20210701080000 “)

After consuming a batch of messages, the Consumer submits its consumption progress offset to the Broker, which updates the progress to the two-tier Map (ConsumerOffsetManager) and consumeroffset.json files. Then ACK the Consumer, and the ACK content contains three data: the minimum offset (minOffset) of the current consumption queue, the maximum offset (maxOffset), and the start offset (nextBeginOffset) of the next consumption.

4 Retry Queue

When rocketMQ’s consumption of a message is abnormal, the offset of the abnormal message is submitted to the retry queue in the Broker. When a message consumption exception occurs, the system creates a RETRY queue for topic@group. The queue starts with %RETRY%. The RETRY will be performed when the RETRY time reaches.

5 Synchronous and asynchronous submission of offset

In clustered consumption mode, the Consumer will submit the consumption progress offset to the Broker after consuming the message. There are two ways to submit the consumption progress offset:

Synchronous commit: After consuming a batch of messages, the consumer submits the offset of those messages to the broker and waits for a successful response. If a successful response is received before the wait times out, the next batch of messages is read for consumption (obtaining nextBeginOffset from ACK). If no response is received, resubmit until a response is obtained. And in the waiting process, consumers are blocked. This severely affects the throughput of consumers.

Asynchronous commit: A consumer submits an offset to the broker after consuming a batch of messages, but can continue reading and consuming the next batch of messages without waiting for a successful response from the broker. This approach increases throughput for consumers. Note, however, that the broker will respond to the consumer after receiving the submitted offset. You may not have received an ACK, and the Consumer gets the nextBeginOffset directly from the Broker.