Following on from last week’s Use and Principles of Kafka Producers, this week we’ll look at consumers, again starting with a consumer Hello World:

public class Consumer {

    public static void main(String[] args) {
        // 1. Set parameters
        Properties properties = new Properties();
        properties.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers"."localhost:9092");
        properties.put("group.id"."group.demo");
        // 2. Create KafkaConsumer instance (consumer)
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // 3. Subscribe to topics
        consumer.subscribe(Collections.singletonList("topic-demo"));
        try {
            // 4. Round-robin consumption
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for(ConsumerRecord<String, String> record : records) { System.out.println(record.value()); }}}finally {
            // 5. Close the consumerconsumer.close(); }}}Copy the code

The first two steps are similar to the producer, configuring parameters and then creating an instance based on the parameters, except that the consumer uses a deserializer and has a mandatory parameter group. Id that specifies the consumer group to which the consumer belongs. About the concept of consumer groups in the basic concept of graphic Kafka described, consumer group has the spending power of consumers can be horizontal extension, this time to introduce a new concept “more balanced” * * * *, its meaning is to partition the ownership of redistribution, occurred in consumers have new customers to join or a consumer outage. Let’s start with the idea of rebalancing, but we won’t go into how.

Continuing with the code above, we subscribe to the topic we expect to consume in step 3, and then go to step 4, which cycles the poll method to pull messages from the Kafka server. A Duration object is passed to the poll method to specify the poll method’s timeout, which is how long the poll method blocks when there is no data to consume in the cache to avoid too frequent polling. The poll method returns a ConsumerRecords object that internally encapsulates the ConsumerRecored of multiple partitions, with the following structure:

public class ConsumerRecords<K.V> implements 可迭代<ConsumerRecord<K.V>> {
    
    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
    // ...
    
}
Copy the code

ConsumerRecord, on the other hand, is similar to ProducerRecord and encapsulates the relevant attributes of the message:

public class ConsumerRecord<K.V> {
    private final String topic;  / / theme
    private final int partition;  / / partition number
    private final long offset;  / / the offset
    private final long timestamp;  / / timestamp
    private final TimestampType timestampType;  // Timestamp type
    private final int serializedKeySize;  // The serialized size of key
    private final int serializedValueSize;  // value Specifies the serialized size
    private final Headers headers;  // Message header
    private final K key;  / / key
    private final V value;  / / value
    private final Optional<Integer> leaderEpoch;  // The leader cycle number
Copy the code

More properties than ProdercerRecord, with emphasis on offsets, which are the unique identification of a message in a partition. Each time the consumer calls the poll method, it is de-partitioned to pull the corresponding message based on the offset. When a consumer goes down, rebalancing will occur and the partition it is responsible for will be handed over to other consumers. At this time, it can continue to start from the consumption position before the outage according to the offset.

In case of a consumer outage, offsets are not stored in the consumer’s memory, but persisted to a Kafka internal theme __consumer_offsets. In Kafka, the offsets stored are referred to as commits. Each time the message is consumed, the message sender will submit the offset. The offset submitted is the location of the next consumption. For example, if the offset of the current consumption is X, x+1 will be submitted.

We don’t see the commit code displayed in the code, so what is Kafka’s default commit mode? By default, consumers commit automatically at regular intervals with auto_commit_interval_ms (5 seconds). The commit takes place in the poll method, and the offset to be pulled is checked before pulling, and if so, the offset to be pulled is committed.

Let’s look at a scenario where the last commit had an offset of 2, and the current consumer has processed messages 2, 3, and 4, and is about to commit 5, but it goes down. When rebalancing occurs, other consumers will continue to consume from the submitted 2, and repeat consumption occurs.

We can reduce the window size by reducing the automatic submission interval, but it is still impossible to avoid the occurrence of double consumption.

In a linear program’s mind, since auto-commit is deferred, that is, committed after the message has been processed, there should be no message loss, that is, the offset committed is greater than the offset being processed. However, in a multi-threaded environment, message loss is possible. For example, thread A calls the poll method to pull the message and put it in A queue, and thread B handles the message. If an outage occurs when thread A has committed offset 5 and thread B has not finished processing messages 2, 3, and 4, messages will be lost.

From the description of the above scenario, we can see that automatic commit is risky. In addition to automatic commit, Kafka provides manual commit methods, which can be subdivided into synchronous commit and asynchronous commit, corresponding to the commitSync and commitAsync methods in KafkaConsumer, respectively. Let’s first try using the synchronous commit modifier:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
    consumer.commitSync();;
}
Copy the code

Offsets are committed after a batch of messages is processed, which reduces the window size for repeated consumption, but because of synchronous commits, the program blocks waiting for the next message to be processed, which limits the program’s throughput. Let’s use asynchronous commit instead:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
    }
    consumer.commitAsync();;
}
Copy the code

In an asynchronous commit, the program will not block, but an asynchronous commit will not retry if the commit fails, so there is no guarantee of success. So we can use a combination of both submission methods. Asynchronous commit is used in round robin, and a synchronous commit is used to ensure a successful commit when the consumer is closed.

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for(ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } consumer.commitAsync(); }}finally {
    try {
        consumer.commitSync();
    } finally{ consumer.close(); }}Copy the code

Both of the no-parameter commits described above are batches of data returned by the submitted poll. If you want to further reduce double consumption, you can pass partitions and offsets for commitAsync and commitSync in the for loop for a more fine-grained commit. For example, we commit every 1000 messages:

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());

        // The offset is increased by 1
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                           new OffsetAndMetadata(record.offset() + 1));
        if (count % 1000= =0) {
            consumer.commitAsync(currentOffsets, null); } count++; }}Copy the code

So much for submission. Among the agents that use consumers, we can see that the **poll method ** is the most core method for pulling messages that we need to consume. So let’s go behind the scenes with the consumer API and see what happens in the poll method, which is implemented as follows:

public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}
Copy the code

In our poll method that sets the timeout, the overloaded method is called. The second parameter, includeMetadataInTimeout, is used to identify whether the fetch of the metadata is counted within the timeout. The value passed here is true, that is, the elapsed time. Here’s an implementation of the overloaded poll method:

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    // 1. Obtain the lock and ensure that the consumer is not closed
    acquireAndEnsureOpen();
    try {
        // 2. Record the poll start
        this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

        // 3. Check whether there are subscribed topics
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        do {
            // 4. Wake up the consumer safely
            client.maybeTriggerWakeup();

            // 5. Update offset (if necessary)
            if (includeMetadataInTimeout) {
                // try to update assignment metadata BUT do not need to block on the timer for join group
                updateAssignmentMetadataIfNeeded(timer, false);
            } else {
                while(! updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE),true)) {
                    log.warn("Still waiting for metadata"); }}// 6. Pull message
            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if(! records.isEmpty()) {// 7. If a message is pulled or there is an unprocessed request, the user needs to process the unprocessed message
                // The message will be pulled again (asynchronously) to improve efficiency
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.transmitSends();
                }

                // 8. Invoke consumer interceptor processing
                return this.interceptors.onConsume(newConsumerRecords<>(records)); }}while (timer.notExpired());

        return ConsumerRecords.empty();
    } finally {
        // release the lock
        release();
        // 10. Record the poll
        this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); }}Copy the code

We walk through the above code step by step, starting with the acquireAndEnsureOpen method in step 1, which acquires the lock and ensures that the consumer is not closed:

private void acquireAndEnsureOpen(a) {
    acquire();
    if (this.closed) {
        release();
        throw new IllegalStateException("This consumer has already been closed."); }}Copy the code

The acquire method is used to acquire the lock. This is because KafkaConsumer is not thread safe, so it needs to be locked to ensure that only one thread uses KafkaConsumer to pull messages, which is implemented as follows:

private static final long NO_CURRENT_THREAD = -1L;
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refcount = new AtomicInteger(0);

private void acquire(a) {
    long threadId = Thread.currentThread().getId();
    if(threadId ! = currentThread.get() && ! currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
    refcount.incrementAndGet();
}
Copy the code

Using an atomic variable currentThread as lock, with cas acquiring a lock operation, if the cas fails, which failed to get the lock, said the competition, there are multiple threads in use KafkaConsumer, will throw ConcurrentModificationException, If cas is successful, the refcount is also increased by one for reentrant purposes.

Look at steps 2 and 3, which record the start of the poll and check if there are any subscribed topics. It then enters the do-while loop, and if no message is pulled, the loop continues without timeout.

Step 4, safely wake up the consumer, not wake up, but check if there is a risk of wake up, if the program is executing an unbreakable method or if it receives an interrupt request, it will throw an exception, but I’m not sure I understand that, so let me put that down.

Step 5, update the offset, which we mentioned earlier checks whether the offset can be committed before the pull operation.

Step 6, the pollForFetches method pulls the message, which is implemented as follows:

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
    long pollTimeout = coordinator == null ? timer.remainingMs() :
    Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

    // 1. If the message already exists, return immediately
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if(! records.isEmpty()) {return records;
    }

    // 2. Prepare the pull request
    fetcher.sendFetches();

    if(! cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { pollTimeout = retryBackoffMs; } Timer pollTimer = time.timer(pollTimeout);// 3. Send a pull request
    client.poll(pollTimer, () -> {
        return! fetcher.hasAvailableFetches(); }); timer.update(pollTimer.currentTimeMs());// 3. Return a message
    return fetcher.fetchedRecords();
}
Copy the code

The fetcher returns immediately if it already has a message, which corresponds to step 7 below. If there is no message, Fetcher is used to prepare the pull request and then the ConsumerNetworkClient sends the request and returns the message.

Why news will have, we go back to the step 7 poll, if pull the messages or have a pending request, because the user need process unhandled messages, then you can use the asynchronous way to launch the next pull message request, the data pull ahead of time, reduce the waiting time for network IO, improve the efficiency of the program.

In step 8, the ConsumerInterceptor is called, just like the ProducerInterceptor in KafkaProducer and the ConsumerInterceptor in KafkaConsumer, which processes the returned message and returns it to the user.

At steps 9 and 10, the lock release and poll recording end, corresponding to steps 1 and 2.

So much for the poll method for KafkaConsumer. Finally, use a mind map to review the important points below:

reference

  1. The Definitive Guide to Kafka
  2. In-depth Understanding of Kafka’s core design and practice
  3. Kafka source code analysis -KafkaConsumer class code analysis
  4. Kafka consumer source code analysis one Kafka consumer