The figure creates a name forfooPartition number is3Topic, in the above case, discussMultithreaded development ConsumerThe scheme.

Worker thread pool mode

One of this typeConsumerGroupThere are one or more Consumer instances that are constantly subscribing frompartitionIn thepollMessage, which delivers the retrieved message toworkerThreadPoolThe thread pool.

advantages

  1. The task is split into message fetch and message processing, with different threads handling them. The biggest advantage is its high scalability, which means that we can adjust the number of threads in which the message is fetched and the number of threads in which the message is processed independently, regardless of whether the two affect each other. If your consume is slow, increase the number of consume threads. If message processing is slow, increase the number of threads in the Worker thread pool.

disadvantages

  1. It is difficult to implement the equivalent of having two thread groups: message fetch and message processing.
  2. This scheme separates message acquisition from message processing, meaning that the thread that gets a message is not the same thread that processes it, so consumption order within the partition cannot be guaranteed. This is also his fatal flaw
  3. Because of disadvantage 2, it can lead to message re-consumption problems and message loss problems.

Many Consumer model

The consumer program starts multiple threads, and each thread maintains its own KafkaConsumer instance, which is responsible for the entire message retrieval and message processing process.

advantages

  1. It’s easy to implement, just create multiple Consumer instances
  2. Threads are isolated, and there is no additional handling of interthread interactions
  3. Because the number of partitions consumed by each thread is fixed, the correctness of a unique commit is guaranteed, and as long as it is processed properly, there will be no message duplication or message loss

disadvantages

  1. Because each Consumer instance is equivalent to oneTCPLink, so the corresponding resource consumption is relatively large
  2. The number of instances of Consumer is limitedpartitionThe number of instances of Consumer must be less than or equal topartitionThe number.

Since the Worker thread pool pattern does not guarantee successful consumption of messages, the multi-consumer pattern is strongly recommended


coded

  1. Worker thread pool mode
.private KafkaConsumer<String, Object> kafkaConsumer;

private final ExecutorService workers = Executors.newFixedThreadPool(4);

public void run(a) {
    kafkaConsumer.subscribe(Arrays.asList("foo"."bar"));
    ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100));
    records.forEach(record->{
        workers.submit(()->handlerRecord(record));
    });
}
Copy the code
  1. Multiple Consumer instance mode
public class KafkaConsumerWorker implements Runnable {

    private final KafkaConsumer<String, Object> consumer;

    private final AtomicBoolean closed = new AtomicBoolean(false);

    public KafkaConsumerWorker(KafkaConsumer<String, Object> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run(a) {
        while(! closed.get()) {try {
                ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(100));
                // todo handler records
            } catch (WakeupException e) {
                // Ignore exception if closing
                if(! closed.get()) {throwe; }}finally{ consumer.close(); }}}public void shutdown(a) {
        closed.set(true); consumer.wakeup(); }}Copy the code