I can’t remember how many people asked the following question:

I think this question is very frequent, and very classic, here I use Kafka as an example, say I understand some of Kafka’s order of messages, if there is a misunderstanding of the place please leave a comment.

Usually when we talk about sequential consumption we mean that producers send in order and consumers consume in order, which sounds simple, but it’s very difficult to do.

We all know that both Kafka and RocketMQ have several partitions (RocketMQ calls queues) under each topic. If messages are allocated to different partitions, Kafka cannot guarantee the order in which the messages are consumed because each partition is allocated to a consumer. In this case, consumer order cannot be guaranteed. Therefore, if you want to ensure that messages are consumed in sequence, you can specify a key on the production end. These messages are sent using the same key. Since only one consumer can listen to a partition, the messages are sequential in their consumption.

The production end

However, the above situation can only guarantee the order of messages under normal circumstances, but after a fault occurs, the order of messages cannot be guaranteed. I summarize the following two points:

1. When the production end sends messages asynchronously, some messages fail to be sent. For example, you send 1, 2, and 3 messages asynchronously.

2. When the Broker is down, the production side may send sequential messages to different partitions, which may cause transient message order inconsistency.

In view of the above two points, the production side must make the pledge that we shall send single thread synchronization, it is a good solution, for the second point, want to do the news of the strict order, will ensure that when the cluster fault occurs immediately after the cluster is not available, or a topic into a single partition, but do so at the expense of cluster high availability greatly, single partition will also be another clustering performance greatly reduced.

With regard to the second point above, let’s take a look at some of the unexpected situations in a Kafka cluster that can throw messages out of order.

1. Partition changes

Assume that there are two partitions of topic A in A cluster, and the production end needs to send three sequential messages to partition 1. We know that the production end determines which partition to send the messages to based on the message Key modulus calculation. If topic A adds A partition before the production end sends the third message, The third message is routed to other partitions. As a result, the three sequential messages are not in the same partition. In this case, the consumption order of the three messages cannot be guaranteed.

2. Partition does not change

1.1. Partition single copy

The production end needs to send three sequential messages to partition 1. The first two messages have been successfully sent to partition 1. The broker of partition 1 is suspended (because the replica factor is only 1, partition 1 becomes unavailable). If partition 1 is unavailable when the production end sends the third message, the production end fails to send the message and tries to send the message again. If partition 1 is not available, the production end routes the message to other partitions, causing the three messages to be in different partitions.

1.2. Partition multiple copies

In the case of a single copy partition, it is natural to think of a multi-copy partition to avoid this situation. In the case of multiple replicas, the sender sends the message synchronously. Acks = all indicates that the message is sent successfully only after all replicas are synchronized. This ensures that all replicas are in the ISR list and that partitions are not unavailable if one broker breaks down. It does seem to avoid the possibility of messages being routed to other partitions as a result of partition single copy partition not being available.

However, I would like to say that there is another extreme phenomenon. When a broker breaks down, the leader copy on the broker becomes unavailable. Then the controller elects the leader of the partition, and during the election, the leader of the partition becomes unavailable. The production end sends a short warning to the No leader. In this case, the production end may also send messages to other partitions.

The consumer end

Of course, another reader asked:

The following analysis assumes that the production side has successfully sent sequential messages to the same partition.

Kafka

KafkaConsumer consumption class KafkaConsumer is not thread safe, so users can not share a KafkaConsumer instance in multithreading, and KafkaConsumer itself does not realize multithreading consumption logic, such as multithreading consumption, also need to be implemented by the user. Here I will talk about Kafka’s two multithreaded consumption models.

1. Maintain a KafkaConsumer per thread

This is equivalent to having multiple consumers in a process, or it can be said that the members of the consumer group are composed of kafkaconsumers in multiple threads.

As can be seen from the consumption model, each KafkaConsumer is responsible for a fixed partition, so it cannot improve the consumption capacity of a single partition. If a topic partition has a large number of partitions, we can only increase the consumption capacity by adding KafkaConsumer instances. As a result, the number of threads is too large, resulting in a huge overhead of project Socket connection. Generally, the thread model is not used in the project to consume.

2. Single KafkaConsumer instance + multiple worker threads

To address the shortcomings of the first thread model, we can decouple the KafkaConsumer instance from the message consumption logic and put the message consumption logic into a separate thread. The thread model is as follows:

As can be seen from the consumption thread model, when the KafkaConsumer instance is decoued from the message consumption logic, we do not need to create multiple KafkaConsumer instances to carry out multi-threaded consumption, and worker threads can be dynamically adjusted according to the consumption load, which has strong independent scalability. The multi-threaded consumption model used in the company is the single KafkaConsumer instance + multi-worker thread model.

But this consumption model due to the consumption of logic is to use multithreading to consume, so there is no guarantee that the news consumption order, here we can introduce the blocking queue model, a woker threads corresponding to a blocking queue, thread constantly training in rotation get messages from the blocking queue to consume, to modulus of messages with the same key, And put them into the same queue to realize sequential consumption. The consumption model is as follows:

However, there is a problem with the above two consumption thread models:

In the process of consumption, if the Kafka consumption group weight balance, partition were assigned to other consumer groups at this time, if you pull back the news is not consumption, although Kakfa ConsumerRebalanceListener interface, can be in a new round of heavy balance offset voluntarily submitted before consumption, But that doesn’t seem to address the possibility that unconsumed messages get scrambled, right?

Therefore, before consuming, you need to actively determine whether the partition is allocated to other consumers, and you need to ensure that the partition is not allocated to other consumers during consumption (which Kafka currently does not do).

Consider RocketMQ:

Before consuming, the ProcessQueue#isDropped method is actively called to determine whether the queue has expired and to lock the queue (request the broker to lock the queue).

RocketMQ

RocketMQ is not as “native” as Kafka. RocketMQ has already prepared your needs for you. Its consumption model is single consumer instance + multiple worker threads.

org.apache.rocketmq.client.impl.consumer.PullMessageService#run
Copy the code

RocketMQ allocates a PullRequest to each queue and places it in the pullRequestQueue, The PullMessageService thread will poll to pull the PullRequest message from the pullRequestQueue and then send the PullRequest message to the ConsumeMessageService for processing. ConsumeMessageService has two subinterfaces:

// Concurrent message consumption logic implementation class
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// Sequential message consumption logic implementation class
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
Copy the code

ConsumeMessageConcurrentlyService interior with a thread pool, for concurrent consumption, in the same way, if you need to order consumption, So RocketMQ provides ConsumeMessageOrderlyService class to manipulate the order news consumption.

After consideration of Kafka thread consumption model, can see from the ConsumeMessageOrderlyService source RocketMQ can realize local consumption order, I think basically has the following two points:

1) RocketMQ will create an object lock for each message queue, so that as long as the message queue is processed in the thread pool, the next consumption will have to wait until the processing is complete, ensuring that the messages in the same queue are serial consumed in the current Consumer.

2) Request the Broker to lock the current sequential consumption queue to prevent it from being allocated to other consumers in the consumption process and thus disrupt the consumption order.

conclusion

After the analysis of this article, try to answer readers’ questions:

1. Production end:

1) The production side must ensure that single lines are sent in step and sequential messages are sent to the same partition (of course, if something unexpected happens in the Kafka cluster described in this article, it is possible to disrupt the order of messages, so neither Kafka nor RocketMQ can guarantee strict sequential messages);

2. Consumer end:

2) In the case of multiple zones:

If you want to ensure that Kafka is consumed sequentially, you can maintain a Single instance of KafkaConsumer per thread and pull messages and consume them one by one (to prevent rebalancing that might disrupt the consumption order). For businesses that can tolerate transient out-of-order messages (again, a Kafka cluster does not guarantee strict message order), a single KafkaConsumer instance + multiple worker threads + one thread for one blocking queue consuming thread model can be used.

3) In case of single partition:

Since there is no rebalancing problem in a single partition, sequential consumption is guaranteed in both threading models.

In addition, for RocketMQ, MessageListenerOrderly listens for consumption to ensure the order of message consumption.

Many people also wonder: since neither Kafka nor RocketMQ can guarantee strict sequential messages, does sequential consumption make sense?

In general, plain sequential messages are sufficient for most business scenarios, but strict sequential messages are not required if the business can tolerate transient message inconsistencies in cluster abnormal states.

If you have any questions or supplements to the article or find mistakes in the article, please pay attention to my public account “back-end advanced” message to me, we discuss together.

Author’s brief introduction

The author Zhang Chenghui, good at messaging middleware skills, responsible for the company’s millions of TPS level Kafka cluster maintenance, maintenance of the public number “back-end advanced” irregularly share Kafka, RocketMQ series does not speak of the concept of direct combat summary and details of the source code analysis; At the same time, the author is also a Seata Contributor, an Ali open source distributed transaction framework, so he will share his knowledge about Seata. Of course, the public account will also share WEB related knowledge such as Spring bucket. The content may not be exhaustive, but it must make you feel that the author’s pursuit of technology is serious!

Public number: back-end advanced

Tech blog: objcoding.com/

GitHub:github.com/objcoding/