In the previous article, we have briefly described some of the elements of Kafka consumers. The use of consumers can be summarized in one sentence: The application needs to read the full message, so please set a consumer group for the application; If the app doesn’t have enough spending power, consider adding customers to that group.

1. Kafka consumption

1.1 Creating consumers

The creation of a Kafka consumer is very similar to that of a producer, simply by creating a kafkaConsumer object, for example

Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers"."broker1:9092,broker2:9092");
kafkaProps.put("group.id"."NameGroup");
kafkaProps.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer kafkaConsumer = new KafkaConsumer<String, String>(kafkaProps);
Copy the code

Of these, only group.id is a different attribute, and it is not strictly required. This parameter is the consumer’s consumption group.

1.2 Subscribing to topics

Once you have created a consumer, you can subscribe to a topic simply by calling subscribe(), which accepts a list of topics, or by using regular expressions to match multiple topics. If a new topic is matched, the consumer group will consume it immediately:

kafkaConsumer.subscribe(Collections.singletonList("name"));
kafkaConsumer.subscribe(Collections.singletonList("test.*"));
Copy the code

1.3 Pull loop

It is also easy to retrieve consumption data. Since consumers are not aware of the data generated by producers, KafkaConsumer uses polling to periodically retrieve data from Kafka Broker. If there is data, it is consumed, and if there is no data, it continues polling and waiting.

try {
    while (true) {
        
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));/ / {1}
        for (ConsumerRecord<String, String> record : records){
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());/ / {2}}}}finally {
    kafkaConsumer.close();/ / {3}
}
Copy the code

Among them, the above marks need to be explained:

  1. This is the core line of code. We call poll pull data over and over again, and if pull stops, Kafka thinks the consumer is dead and rebalances. The parameter value is a timeout indicating how long the thread waits without data, with 0 indicating that it does not wait for an immediate return.
  2. The poll() method returns a list of records, each containing key/value and subject, partition, and shift information.
  3. Active shutdown allows Kafka to rebalance immediately without waiting for the session to expire.

2. Consumer configuration

Like producers, consumers have many custom parameters that can be configured according to actual conditions. The following describes some important parameters.

  • fetch.min.bytes

This parameter allows the consumer to specify a minimum amount of data to read messages from the broker. When a consumer reads a message from the broker, if the amount of data is less than this threshold, the broker waits until there is enough data before returning it to the consumer. The default is 1 byte.

  • fetch.max.wait.ms

This parameter specifies the maximum wait time for the consumer to read, thus avoiding long blocks. The default value is 500ms.

  • max.partition.fetch.bytes

This parameter specifies the maximum number of bytes returned per partition. The default is 1M. If a topic has 10 partitions and 5 consumers, then each consumer needs 2M of space to process messages. In practice, we need to set up more space so that when there is a consumer outage, other consumers can take on more partitions.

It is important to note that the Max. Partition. The fetch. The bytes must be better than the broker to receive one of the biggest news (by Max. Message. The size set), otherwise it will lead to consumer spending is not the message. In addition, in the above the sample as you can see, we usually circular calling poll method to read the news, if Max. Partition. The fetch bytes set is too large, so consumers need more time to deal with, may result in failing to poll and session expiration. In this case, or reduce the Max. Partition.. Fetch bytes, or extended the session time.

  • session.timeout.ms

This parameter sets the consumer session expiration time, which defaults to 3 seconds. That is, if the consumer does not send a heartbeat during this time, the broker will consider the session expired and rebalance the partition. The smaller session.timeout.ms is, the more quickly Kafka can detect failures and rebalance, but it also increases the probability of miscalculation (for example, consumers may be processing messages slowly rather than crashing).

  • heartbeat.interval.ms

Ms controls how long it takes KafkaConsumer’s poll() method to send a heartbeat. This value should be smaller than session.timeout.ms, which is usually 1/3 of a second.

  • auto.offset.reset

This property specifies what consumers should do if they read a partition without an offset or if the offset is invalid. The value can be “latest” (consume from the latest news) or “earliest” (consume from the oldest news). The default for the latest

  • enable.auto.commit

Specifies whether the consumer automatically submits the consumption shift. Default is true. If you want to reduce repeated consumption or data loss, you can set it to false. If true, you may want to pay attention to the frequency of automatic submission, which is set by auto.mit.interval.ms.

  • partition.assignment.strategy

We already know that when there are multiple consumers in a consumer group, the partitions of a topic need to be assigned to consumers according to certain policies. This policy is determined by the PartitionAssignor class, and there are two default policies:

  • Range: For each topic, each consumer is responsible for a contiguous Range of partitions. If consumer C1 and consumer C2 subscribe to two topics, each of which has three partitions, using this policy results in consumer C1 taking care of partition 0 and partition 1 (subscript based on 0) for each topic, and consumer C2 taking care of partition 2. As you can see, if the number of consumers is not divisible by the number of partitions, then the first consumer will have several more partitions (determined by the number of topics).
  • RoundRobin: all subscribed subject partitions are assigned to consumers one by one in order. Using the above example, consumer C1 is responsible for partitions 0 and 2 for the first topic, and for partition 1 for the second topic; The other partitions are the responsibility of consumer C2. As you can see, this strategy is more balanced, with the difference in the number of partitions between all consumers being at most 1.

Partition. The assignment. The strategy set up allocation strategy, the default is org. Apache. Kafka. Clients. Consumer. RangeAssignor (use scope strategy), You can set to org. Apache. Kafka. Clients. Consumer. RoundRobinAssignor (using polling strategy), or to achieve an allocation strategy is then partition. The assignment. The strategy to the implementation class.

  • client.id

This parameter can be any value and is used to specify the client from which the message is sent. It is usually used to print logs, measure metrics, and allocate quotas.

  • max.poll.records

This parameter controls the number of records returned by a poll() call, which can be used to control the amount of data applied to the pull loop.

  • . The receive buffer, bytes, the send buffer. The bytes

The size of the TCP buffer used by the socket for reading and writing data can also be set. If they are set to -1, the operating system defaults are used. If the producer or consumer is in a different data center than the broker, you can increase these values appropriately because networks across data centers tend to have higher latency and lower bandwidth.

3. Commit and offset

When we call poll(), the method returns messages that we didn’t consume. When messages are returned from the broker to the consumer, the broker does not track whether or not they are received by the consumer. Instead, Kafka lets the consumer manage the shift of the consumption itself and provides an interface for the consumer to update the shift, which is called commit.

The consumer sends a message to a special topic called _consumer_offset, which stores the partition offset in each sent message. The main function of this topic is to record the offset used after the consumer triggers rebalancing. The consumer sends a message to this topic every time, normally rebalancing is not triggered. This theme does not work. When rebalancing is triggered, consumers stop working and each consumer may be assigned to a corresponding partition. This theme is set up to allow consumers to continue processing messages.

If the offset submitted is less than the offset last processed by the client, the message between the two offsets is processed repeatedly, as shown below:

If the offset committed is greater than the offset at the time of the last consumption, the message between the two offsets will be lost as follows:

Therefore, the mode of submission displacement will have a great impact on the application. The following are different modes of submission:

Automatically submit

The simplest way to do this is to commit automatically, we simply set enable.auto.mit to true, and the consumer will commit the shift every 5 seconds (as specified by auto.mit.interval.ms) after the poll() method is called. Be aware, however, that this approach can lead to repeated message consumption.

Synchronization to submit

To reduce repeated consumption of messages or avoid message loss, manual submission is generally used, with auto.mit.offset set to false. Manual submission can be divided into synchronous and asynchronous.

For synchronization, the application needs to actively commit the shift itself by calling commitSync(), which commits the last shift returned by poll.

try {
    kafkaConsumer.commitSync();
} catch (CommitFailedException e) {
    e.printStackTrace();
}
Copy the code

Asynchronous submission

The drawbacks of synchronization are well known. Your application blocks when making a commit call, so it’s natural to use asynchrony. Your application can call commitAsync() to send a commit request and then return immediately. However, there is a disadvantage to asynchronous commits. If the server returns a commit failure, asynchronous commits will not be retried, and if multiple asynchronous commits exist at the same time, retry may result in displacement overwriting. In contrast, synchronous lift retries until success or finally throws an exception to the application.

Therefore, in this case, it is common to record the commit result with a callback for an asynchronous commit, for example:

kafkaConsumer.commitAsync((offsets, exception) -> {
    if(exception ! =null){ exception.printStackTrace(); }});Copy the code

Alternatively, if you want to retry while maintaining commit order, a simple way is to use a monotonically increasing sequence number, like an optimistic lock, which is incremented each time an asynchronous commit is initiated and passed to the callback method as an argument. When the message fails to commit the callback, check the ordinal value in the parameter and the global ordinal value, if equal then retry the commit, otherwise abandon.

Mixing synchronous and asynchronous commits

In general, for the occasional failed commit, not retrying is not too much of a problem, as long as subsequent commits succeed. But in cases where you need to ensure a successful commit (e.g., program exit, rebalance), a very common approach is to mix asynchronous commit and synchronous commit, as shown below:

try {
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records){
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } kafkaConsumer.commitAsync(); }}catch (Exception e) {
    e.printStackTrace();
} finally {
    try {
        kafkaConsumer.commitSync();
    } finally{ kafkaConsumer.close(); }}Copy the code

We used asynchronous commits to improve performance, but ended up using synchronous commits to ensure a successful displacement commit.

Commit specific displacement

If poll() returns a very large number of messages, we might want to commit a shift in the process of processing those batch messages so that the rebalance does not cause consumption and processing from scratch. CommitSync () and commitAsync() allow us to specify specific displacement parameters as a partition and displacement map. For example, every 1000 messages processed are committed asynchronously

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0; . ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records){
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1."no metadata"));
    if (count % 1000= =0){
        kafkaConsumer.commitAsync(currentOffsets, null);
    }
    count++;
}
Copy the code

4. Rebalance Listener

Before partition rebalancing, if the consumer knows that it will no longer be responsible for a partition, it may need to move the already processed messages to commit. Kafka allows us in consumers’ new partition or lost partition processing, we only need to call the subscribe () method is introduced to ConsumerRebalanceListener object, the object has two main methods:

  • public void onPartitionRevoked(Collection partitions)This method is called after the consumer has stopped consuming and before rebalancing begins.
  • public void onPartitionAssigned(Collection partitions)This method is called after the partition is assigned to the consumer and before the consumer begins reading the message.

Let’s look at an example that commits a shift when a consumer loses a partition (so that other consumers can then consume the message and process it) :

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}@Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) { kafkaConsumer.commitSync(currentOffsets); }}...try {
    kafkaConsumer.subscribe(Collections.singletonList("name"),new HandleRebalance());
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records){
            currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1."no metadata")); kafkaConsumer.commitAsync(); }}}catch (Exception e) {
    e.printStackTrace();
} finally {
    try {
        kafkaConsumer.commitSync();
    } finally{ kafkaConsumer.close(); }}Copy the code

5. Start consuming from the specified displacement

In addition to using poll() to start consuming from the last commit shift, we can also start consuming from a specified shift.

  • seekToBeginning(TopicPartition tp): Restarts consumption at the start of the partition
  • seekToEnd(TopicPartition tp)Consume the latest messages from the very end of the partition
  • seek(TopicPartition partition, long offset): Starts consumption from the specified displacement.

There are many application scenarios for starting consumption with a specified displacement, the most typical of which is that the displacement exists in another system (such as a database) and is subject to the displacement of the other system.

Consider the following scenario: we read the consumption from Kafka, process it, and write the result to the database; We don’t want to lose messages, nor do we want duplicate message data in the database. For such a scenario, we might follow the following logic:

while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
    {
        currentOffsets.put(newTopicPartition(record.topic(), record.partition()), record.offset()); processRecord(record); storeRecordInDB(record); kafkaConsumer.commitAsync(currentOffsets); }}Copy the code

This logic seems fine, but note that after persistence to the database is successful, commit migration to Kafka may fail, which may result in repeated processing of messages. The first thing to think about in this case is to implement persistence to the database and commit shift as atomic operations that either succeed or fail at the same time, but unfortunately this is not possible. So we can save the record to the database at the same time, but also save the shift, and then use the shift of the database to start consuming when the consumer starts consuming. This works, we just need to specify the partition shift to start consuming by seeking (). Here is an improved sample code:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        Commit the database transaction before the consumer's responsible partition is reclaimed, keeping records and shifts of consumption
        commitDBTransaction();
    }
    
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Before starting consumption, get the partition displacement from the database and use seek() to specify the starting consumption displacement
        for(TopicPartition partition: partitions)
            kafkaConsumer.seek(partition, getOffsetFromDB(partition));
    } 
}

    kafkaConsumer.subscribe(topics, new SaveOffsetOnRebalance());
    // Poll once after subscribe() and get the partition's shift from the database, using seek() to specify the shift to start consuming
    kafkaConsumer.poll(0);
    for (TopicPartition partition: kafkaConsumer.assignment()){
         kafkaConsumer.seek(partition, getOffsetFromDB(partition));
    }
       
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records)
        {
            processRecord(record);
            // Save the record result
            storeRecordInDB(record);
            // Save the displacement
            storeOffsetInDB(record.topic(), record.partition(), record.offset());
        }
        // Commit database transactions, save records of consumption and displacement
        commitDBTransaction();
    }
Copy the code

6. Exit gracefully

In general, we loop the poll message in a main thread and process it. When we need to exit the poll loop, we can use another thread to call consumer.wakeup(), which causes poll() to throw a WakeupException. If wakup() is called while the main thread is processing a message, an exception will be thrown the next time the main thread calls poll(). After the main thread throws a WakeUpException, it calls consumer.close(), which commits the shift and sends a message to Kafka’s group coordinator to exit the consumer group. The group coordinator receives the message and immediately rebalances it (without waiting for this consumer session to expire).

Thread mainThread = Thread.currentThread(); .// Registers a callback hook that will be called when the JVM is shut down.
Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run(a) {
        System.out.println("Starting exit...");
        kafkaConsumer.wakeup();
        try {
            // The main thread continues so that the consumer can be closed and the offset committed
            mainThread.join();
        } catch(InterruptedException e) { e.printStackTrace(); }}}); .try {
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("topic = " + record.topic() + ", partition = " + record.partition() 
                               + ", offset = "+ record.offset()); } kafkaConsumer.commitSync(); }}catch (WakeupException e) {
    // Do not handle exceptions
} finally {
    It is necessary to call close() before exiting the thread, which commits anything that has not already been committed and sends a message to the group coordinator telling it to leave the group. Rebalancing is then triggered without waiting for the session to time out.
    kafkaConsumer.commitSync();
    kafkaConsumer.close();
    System.out.println("Closed consumer");
}   
Copy the code