1. Requirement: Multithreading reads messages in Kafka and counts them to a list
  2. Initial approach: start one thread per partition, each thread reads asynchronously with a consumer, and finally summarize all the results
  3. Problems encountered:

I. Each consumer reads a quantitative message, and no new message enters during the process


CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    ArrayList<Object> dataList = new ArrayList<>();
    try( Consumer consumer = factory.createConsumer();) {
                    consumer.subscribe(topicList);
                    int count = 0;
                    while (true) {
                        ConsumerRecords<Object, Object> records = consumer.poll(100);
                        for (ConsumerRecord<Object, Object> record : records) {
                            Object data = record.value();
                            currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
                            if (count % 10000 == 0) {
                                consumer.commitAsync(currentOffsets, null);
                            }
                            count++;
                            dataList.add(data);

                        }
                        consumer.commitAsync();
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
                return dataList;   
}, pool);
Copy the code

Return dataList will not be executed. We want each consumer to read the partition message, and then return a list of data for all consumers

Ii. When each consumer reads the message, there will be new messages, how to calculate the statistics?