Hello, this is AKA Baiyan who loves Coding, Hiphop and drinking a little wine.

Kafka is a distributed, multi-partitioned, multi-replica, distributed message flow platform based on Zookeeper. Kafka is also an open source, publish-subscribe message engine system.

How does Kafka ensure that messages are not lost, consumed sequentially, or consumed repeatedly?

Are these three questions familiar? Are you often asked this question in job interviews and in your day-to-day work?

Ensuring that messages are not lost and re-consumed is operationally relatively simple. This article will not be discussed. If you are interested, you can leave a message to me. I will issue a separate explanation.

This article focuses on how Kafka ensures sequential consumption when consumer is single-threaded versus multi-threaded.

My first nuggets short book “simple DDD” has been on the nuggets online, welcome to try to read ~

Single-threaded sequential consumption

In order to avoid some friends first contact with the concept of sequential consumption, I will first introduce what sequential consumption is.

Double eleven, a large number of users rush to place orders at 0 o ‘clock. For a user-friendly experience, we packaged the order generation logic and the payment logic into MQ messages sent to Kafka, letting Kafka backlog some of the messages to prevent instant traffic from overwhelming the service.

The problem here is that both order generation and payment are wrapped as messages. The two messages are in strict order, and the order generation logic must precede payment.

So how does Kafka guarantee their order?

Different topic:

If payment and order generation correspond to different topics, you have to deal with them at the consumer level. Since consumers are distributed, you have to find a middle party (such as Redis’s queue) to maintain the ordering of MQ in order to ensure sequential consumption, which is too costly and logically disgusting.

The same topic:

What if we send messages to the same topic? We know that a topic can correspond to multiple partitions, which correspond to multiple consumers. In fact, there is no essential difference between different topics.

1. The same topic or division:

Kafka’s messages are strictly ordered within the partition. That is, we can send all the messages of the same order to the same partition of the same topic in the order they were generated. Consumers can then sequentially consume messages for the same order.

When a producer sends a message, the corresponding ID of the message is modeled and the same ID is sent to the same partition. Messages are ordered within a partition, and each partition corresponds to a consumer, ensuring that messages are consumed sequentially.

Second, multithreaded sequential consumption

Single-threaded sequential consumption has solved the problem of sequential consumption, but it is poorly scalable. In order to improve the processing speed of consumers, but to ensure sequential, we can only horizontally expand the number of partitions to increase the number of consumers.

That means adding machines to increase the processing power of your system.

Emmm, yes, then you won’t be far away.

If not, add the machine, the boss will die for you.

Therefore, we must do concurrent processing after the consumer receives the Kafka message.

So let’s get this straight. What if we get a message and just throw it into the thread pool?

It is not reasonable that the processing speed of the thread is fast or slow, but the payment message is still faster than the order message processing.

Can I emulate kafka’s partitioning idea? We hash the received kafka data to different queues, and then start multiple threads to consume the data in the corresponding queue.

Wuhu, nice ~

Three, multithreaded consumption code implementation

In? Take a look at the code: kafka-sort-consume

You can pull down the code and read the following to make it easier to understand.

Overall idea:

  1. Initialize the sequential consumption thread pool for the corresponding business at application startup (order consumption thread pool in demo)
  2. The order-listening class pulls the message submission task to the corresponding queue in the thread pool
  3. Threads in the thread pool process the task data in the bind queue
  4. Each thread increases the number of offsets to be submitted after processing the task
  5. In the listener class, check whether the number of offsets to be submitted is equal to the number of records to be pulled. If so, manually submit offset

3.1. Sequential consumption of thread pool definitions

We can increase message processing power by specifying the number of threads consumed.

/** * Kafka sequential consumption tool thread pool 1.0 ** Stopped hooks support ** @author baiyan * @date 2022/01/19 */ @slf4j @data Public class KafkaConsumerPool<E> {** ** thread concurrency level */  private Integer concurrentSize; /** * private List<Thread> workThreads; /** * queues */ private List<ConcurrentLinkedQueue<E>> Queues; /** * Private volatile Boolean stopped; /** * Private AtomicLong pendingOffsets; /** * private final static String kafka_work_thread_prefix = "kafka-sort-consumer-thread-"; Public KafkaConsumerPool(KafkaSortConsumerConfig<E> config){ this.concurrentSize = config.getConcurrentSize(); // Initialize the task queue this.initQueue(); this.workThreads = new ArrayList<>(); this.stopped = false; this.pendingOffsets = new AtomicLong(0L); // Initialize thread this.initworkThread (config.getBizName(),config.getBizService()); } /** * Queues */ private void initQueue(){this.queues = new ArrayList<>(); for (int i = 0; i < this.concurrentSize; i++) { this.queues.add(new ConcurrentLinkedQueue<>()); Private void initWorkThread(String bizName, Consumer<E> bizService){// Create a specified thread for (int I = 0; i < this.concurrentSize; i++) { String threadName = KAFKA_CONSUMER_WORK_THREAD_PREFIX + bizName + i; int num = i; Thread workThread = new Thread(()->{// if the queue is not empty or the Thread is false, enter the loop while (! queues.get(num).isEmpty() || ! stopped){ try{ E task = pollTask(threadName,bizName); If (objects.nonnull (task)){// BizService.accept (task); Log.info (" Thread: {}, execute task: {}, success ",threadName, gsonutil.beantojson (task)); / / perform task plus 1 pendingOffsets. IncrementAndGet (); }}catch (Exception e){log.error(" thread: {}, execute task: {}, failed ",threadName,e); }} log.info(" thread: {} exit ",threadName); },threadName); // Add threads.add (workThread); Workthread.start (); }} /** * * * @param TASK Specifies the task to be submitted for processing */ public void submitTask(Long ID, E task){ ConcurrentLinkedQueue<E> taskQueue = queues.get((int) (id % this.concurrentSize)); taskQueue.offer(task); } private E pollTask(String threadName,String) {private E pollTask(String threadName,String) {private E pollTask(String threadName,String) bizName){ int threadNum = Integer.valueOf(threadName.replace(KAFKA_CONSUMER_WORK_THREAD_PREFIX+bizName, "")); ConcurrentLinkedQueue<E> taskQueue = queues.get(threadNum); return taskQueue.poll(); }}Copy the code

The flow chart

3.2. The consumer side

A single consumer can consume multiple topics, so each listener class that requires multithreaded sequential processing needs to be bound to a separate sequential consumption thread pool.

The listener class submits the task to be executed through the thread pool after receiving the message.

Here we need to turn off automatic commit in Kafka and wait until the pulled task has finished processing before committing the shift.

/** * @author baiyan * @date 2022/01/19 */ @component @slf4j @configurationProperties (prefix = "kafka.order") @Data @EqualsAndHashCode(callSuper = false) public class OrderKafkaListener extends AbstractConsumerSeekAware { @Autowired private OrderService orderService; /** * Sequential consumption of concurrent levels */ private Integer concurrent; Private KafkaConsumerPool<OrderDTO> KafkaConsumerPool; PostConstruct public void init(){KafkaSortConsumerConfig<OrderDTO> config = new KafkaSortConsumerConfig<>(); config.setBizName("order"); config.setBizService(orderService::solveRetry); config.setConcurrentSize(concurrent); kafkaConsumerPool = new KafkaConsumerPool<>(config); } @KafkaListener(topics = {"${kafka.order.topic}"}, containerFactory = "baiyanCommonFactory") public void consumerMsg(List<ConsumerRecord<? ,? >> records, Acknowledgment ack){ if(records.isEmpty()){ return; } records.forEach(consumerRecord->{ OrderDTO order = GsonUtil.gsonToBean(consumerRecord.value().toString(), OrderDTO.class); kafkaConsumerPool.submitTask(order.getId(),order); }); // When the number of completed tasks in the thread pool reaches the number of pulled records // Note that some services are blocked for a long time, which will cause the displacement of the submission. Be sure to do some measure of fusing the while (true) {if (records. The size () = = kafkaConsumerPool. GetPendingOffsets (). The get ()) {ack. Acknowledge (); Log.info ("offset submit: {}", record.get (record.size ()-1).offset()); kafkaConsumerPool.getPendingOffsets().set(0L); break; }}}}Copy the code

Corresponding data processing flow chart

3.3. The extension point

The idea we provided in the demo is to set the number of concurrency levels to handle messages.

However, take taxi-hailing software for example, the traffic peak is in the morning peak and evening peak, so the corresponding taxi-hailing message load will be very high. When the peak is flat, the flow is much lower.

Therefore, we should set a relatively high number of concurrency levels during peak times to process messages quickly, and a small number during off-peak times to free up system resources.

Do we need to constantly reboot the application to change the concurrency level? Too Muggle.

As I mentioned in how to synchronize global configuration in a distributed environment using NACOS, Meituan provides an idea for configuration center to modify configuration to dynamically set thread pool parameters.

We can also emulate this idea to dynamically expand or shrink the sequential consumption of thread pools.

I didn’t implement this logic in my demo to make it easier for you to understand, but I did leave out a hook.

One of the properties in KafkaConsumerPool is stopped, and setting this to true interrupts the pool while it is being started, but exits after the task is finished.

Therefore, if we want to implement dynamic capacity expanding, we can use the configuration center to refresh the value configured concurrent in the OrderKafkaListener listening class. When changing the value of concurrent through the set method, the stopped value is changed first to stop the currently executing thread pool. After the execution, a new thread pool is created based on the new concurrency level to achieve dynamic capacity expansion and reduction.

In the capacity expansion phase, remember to block kafka data consumption commit, will report an error

Finally, paste the flow chart

Four,

In this article, we introduce the idea of kafka single-thread and multi-thread sequential consumption. Both ensure ordering by binding messages to targeted partitions or queues, and increase consumption by adding partitions or threads.

Finally, we also provide you with the configuration center to dynamically expand and shrink the thread pool ideas, if you are interested in writing the corresponding logical merge to me oh ~

Five, the last

If there is an incorrect place in the article, welcome to correct, writing the article is not easy, point a thumbs-up, yao yao da ~

WeChat: baiyan_lou

Public account: Uncle Baiyan

Small volume: simple DDD

Please paste the original link and author in a prominent position at the beginning of the article.