An overview of the

How do I deal with message stacking? This is a common interview question. Beyond that, of course, there are practical application scenarios. The message accumulation problems in company, my company do flow monitoring of cyber attacks, so need to deal with the router all traffic and risk filtering, you can imagine the traffic data of a company, employees are just in the use of computer in and out of traffic, we are to deal with every request. In addition, I’ve also shared some common mistakes I’ve seen in the way I handle message stacking.

Add partitions and add consumption instances

Kafka in a consumer a consumer threads within the group can only corresponding to a topic within a partition for consumption, that is to say, if you increase consumer threads to message accumulation problems alone does not have any effect, will only waste redundant consumer threads, only if the increased partition, redundant consumer threads to work on, otherwise idle. The following I did a case for your reference

Case Background:

1. The topic for the quickstart – events

2. Partitions is 3

3. There are 200 messages waiting to be consumed

4. The consumer group is test-consumer-group-2

Case code and results:

/ / code
@KafkaHandler
    @KafkaListener(topics = "quickstart-events",groupId = "test-consumer-group-2",concurrency = "10")
    public void test4(String msg){
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Received message :"+msg+"--"+System.currentTimeMillis()+"-"+Thread.currentThread().hashCode());
    }
    
// Run the resultMessage received :hello1(message)--1623552780784(Consumption time)--1714576524(Consumer thread hashCode) -- the first consumer receives the message :hello2--1623552780784---1194736805Message received :hello0--1623552780784---1755757420Message received :hello183--1623552781479---1714576524Message received :hello198--1623552781490---1194736805Message received :hello186--1623552781490---1714576524Message received: Hello188 --1623552781500---1714576524Message received :hello191--1623552781510---1714576524Message received :hello199--1623552781521---1714576524-- Final consumption1.The consumption time of each message is 10ms, and the total consumption is 2000ms. The consumption time of three threads is 737ms on average2.If the concurrency ="1"And the consumption time is2000+ms
3.Here I set concurrency ="10", obviously see only1714576524.1194736805.1755757420In the use of4.So setting concurrency to be too large is a common mistakeCopy the code

Single consumer thread using asynchronous consumption (thread pool)

When our partition is fixed, we can actually solve the message accumulation problem by increasing the consumption speed of a single consumer thread. The most common solution is to use thread pools, which actually maximize the CPU of the machine and thus increase consumption power. In fact, this solution is essentially the same as adding partitions and consuming thread instances. It improves concurrency, but adding partitions is more convenient to implement, while using asynchronous operations requires us to manage offset and prevent message loss. The following I did a case for your reference

Case Background:

1. The topic for the quickstart – events

2. Partitions is 3

3. There are 200 messages waiting to be consumed

4. The consumer group is test-consumer-group-2

5. Close the autocommit spring. Kafka. Consumer. Enable – auto – commit = false

6.spring.kafka.listener.ack-mode=manual

@KafkaHandler
    @KafkaListener(topics = "quickstart-events",groupId = "test-consumer-group-2",concurrency = "1")
    public void test5(String msg,Acknowledgment ack){

        executorService.submit(()->{
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Received message :"+msg+"--"+System.currentTimeMillis()+"-"+Thread.currentThread().hashCode());
            // Manually commit
            ack.acknowledge();
        });
    }
    
// Run the resultMessage received :hello5(message)--1623554862457(Consumption time) --996771787(Thread pool thread hashCode) -- the earliest consumed message received message :hello1--1623554862457---1477599004Message received :hello3--1623554862457---1621154349Message received :hello14--1623554862473---1621154349Message received: Hello188 --1623554863146---1621154349Message received :hello191--1623554863148---996771787Message received: Hello194 --1623554863148---1477599004Message received: Hello196 --1623554863156---1621154349Message received :hello198--1623554863159---1477599004-- Latest consumption news1.The consumption time of each message is 10ms, and the total consumption is 2000ms. The consumption time of three threads is 702ms on average2.concurrency = "1", but using multithreaded consumption internally3.To prevent message loss, manually submit offset(ack.acknowledge()) is enabled4.So using thread pool consumption directly without manually managing offsets is also a common mistakeCopy the code