Acacia one night stand, the end of the world is not long. -- Zhang Zhongsu, The Swallow BuildingCopy the code

In this paper, from the author’s CSDN (blog.csdn.net/zhanshenzhi…). Original, because nuggets have removed the reprint function, so we have to upload again, the following pictures still maintain the original published watermark (such as CSDN watermark). (The original will be shared and published on multiple platforms in a newly created state)

preface

The concept of Offset was roughly described in the last article. This article will talk about Offset in detail.

Producers Offset

  1. The producer message is allocated to its own partition, each partition has an Offset, and it is the largest Offset of the producer and the largest Offset of the partition.
  2. When we write ProducerRecord, we do not specify the offset of the partition. Kafka does this by itself.

Consumers Offset

1. In the figure above, C1 and C2 come from different consumer groups (because the same area can only be consumed by one consumer in the same consumer group)

  1. The offset of production submission is 4, C1 consumer starts from 0 to 3, C2 consumer starts from 0 to 4, and the next time they come to consume, they start from the offset of their last consumption, or they can choose to start from scratch or from the most recent record.

Auto-.offset. reset: This property specifies what the consumer should do if it reads a partition without an offset or if the offset is invalid: Latest (default) In the case of invalid offset, the consumer will start reading from the latest record (records generated after consumer startup) earliest: In the case of invalid offset, the consumer will read from the start of the partition’s record API method: commitSync()

Commit offset

Where does the offset exist? I’m sure you’ve been asked this question, right?

Kafka offset used to be stored in ZK. Zookeeper is not suitable for high load writes (such as offset updates) for the service, so it is removed and stored in the theme for the following reasons:

Zookeeper is not a good way to service a high-write load such as offset updates because zookeeper routes each write though every node and hence has no ability to partition or otherwise scale writes. We have always known this, but chose this implementation as a kind of “marriage of convenience” since we already depended on zk. Zookeeper is not a good way to service high write loads, such as offset updates, because Zookeeper routes each write through each node and therefore cannot partition or extend writes. We always knew this, but chose this implementation as a “marriage of convenience” because we were already dependent on ZK.

Offsets are currently sent to a topic with _comsumer_offsets when submitted by consumers, and a memory structure is kept: group/topic/partition, mapped to the latest offsets for quick retrieval. You can also see files similar to the following on the disk:

_comsumer_offsets-1
_comsumer_offsets-2
_comsumer_offsets-3
Copy the code

Show me code:

How to submit the offset

Automatically submit

enable.auto.commit = true

In Spring Boot 2.X, the value of type Duration needs to conform to specific formats, such as 1S,1M,2H,5D

auto.commit.interval.ms = 100

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        log.trace("ConsumerRecord={}",record.toString()); }}Copy the code

disadvantages

  • Autocommit interval repeat consumption will lead to repeat purchases, such as automatic submission time 100 s, submit the offset for the first time is 20, and consumers pull 5 news consumption, at the time of 50 seconds broker suddenly goes down, the partitioning equilibrium again, this time before the news of the partition from consumers before transferred to another new customer, The new consumer will read the offset submitted 50 seconds ago, resulting in repeated consumption.
  • Lost messages When a consumer pushes a message in bulk, the consumer consumes only 20 messages. If the broker goes down and a partition rebalancing occurs, the consumer will re-consume the message from the last committed offset, resulting in a batch of 80 messages being lost. An offset is automatically submitted before the close method is called, but exceptions or early exit polling are not. You need to define your own policy to ensure this. For example, the finally method manually submits commitSync()

Manual commit – Current offset

Enable.auto.mit = false commintSync()

If the commit fails, throw an exception Show me code:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        log.trace("ConsumerRecord={}",record.toString());
    }
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        // Todo compensation mechanism
        log.error("commitSync failed", e)
    }
}
Copy the code

disadvantages

  • The application is blocked until the broker responds to the submission, which reduces throughput
  • After the poll loop gets the latest offset, be sure to commitAync() if all processing is complete
  • If an exception occurs, a corresponding compensation mechanism is required; otherwise, messages will be lost
  • There may be repeated consumption when regions are rebalanced

Asynchronous commit – Current offset

MIT = false API methods: 1. CommintAsync () or commitAysnc(OffsetCommitCallback var1)

Asynchronous, as the name suggests, is not blocking, so that’s its advantage.

Show me code:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        log.trace("ConsumerRecord={}",record.toString());
    }
    // 1. Choose between no callback
    try {
        consumer.commitAsync();
    } catch (CommitFailedException e) {
        // Todo compensation mechanism
        log.error("commitAsync failed", e)
    }
	// No callback end
	
	// 2. There are two options for a callback
	consumer.commitAsync(new OffsetCommitCallback() {	
		@Override
		public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e)       
        {
			if(e ! =null) { System.out.println(offsets.toString()); System.out.println(e.toString()); }}});// There is a callback end
}
Copy the code

disadvantages

  • If it fails, it does not retry, but it supports asynchronous callbacks.

Q1: Why not retry? Q2: How to resolve this problem A2: You can use asynchronous callback to record the commit offset and error message. Refer to the demo “With callback” above.

Asynchronous + synchronous commit

Enable.auto.com MIT = false API methods: commintAsync() and commitSync()

If the commit is the last commit offset before turning off the consumer or partition rebalancing, then the asynchronous + synchronous commit approach is appropriate to ensure full commit.

Show me the code:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        log.trace("ConsumerRecord={}",record.toString());
    }
    try {
        consumer.commitAsync();
    } catch (CommitFailedException e) {
        // Todo compensation mechanism
        log.error("commitAsync failed", e)
    } finally{
    	try {
    		consumer.commitSync();
    	 }  catch (CommitFailedException e) {
	        // Todo compensation mechanism
	        log.error("commitAsync failed", e)
	    } finally{ consumer.close(); }}}Copy the code

Commit a specific offset

Enable.auto.com MIT = false

  1. commitSync(); And commitSync (Map < TopicPartion OffsetAndMetadata > var1)
  2. CommitAsync () and commitAsync (Map < TopicPartion OffsetAndMetadata > var1, OffsetCommitCallback var2)

If you want to commit in the middle of a batch, or at one of the locations where a poll returns a result, you can use a commit specific offset. Because commitAsync and commitSync commit only one last offset, commitSync commit with a specific offset is appropriate for those where a large number of ComSumerRecords are re-loaded after partition rebalancing, and you don’t want to re-process the committed records (in this case, you need to maintain offsets for all partitions).

Show me the code:

Map<TopicPartition,OffsetAndMetaData> map = new HashMap<>();
int index = 0;
try {
	while (true) {
	    ConsumerRecords<String, String> records = consumer.poll(100);
	    for (ConsumerRecord<String, String> record : records) {
	        log.trace("ConsumerRecord={}",record.toString());
	        map.put(new TopicPartition(record.topic(),record.partition()),new (OffsetAndMetaData(records.topic(),"Welcome to CSDN ordinary gentleman https://blog.csdn.net/zhanshenzhi2008"));
	        // Every 200 batches are submitted
	        if(index  % 200= =0) consumer.commitSync(map); } index++; }}}catch (CommitFailedException e) {
        // Todo compensation mechanism
        log.error("commitAsync failed", e)
} finally {
	try {
		consumer.commitSync();
	 }  catch (CommitFailedException e) {
	     // Todo compensation mechanism
	     log.error("commitAsync failed", e)
	 } finally{ consumer.close(); }}Copy the code

disadvantages

  • The amount of coding is large. If it is necessary to record the offset information of each partition, a DB table can be established to record the offset information of each message

How do I exit the poll loop

  1. You don’t have to, because Kafka automatically gracefully rolls out the selection. When poll pulls the message, it automatically throws an exception if it finds none or the consumer restarts

  2. Wakeup () if you must exit by yourself, you can exit by using the comsumer.wakeup() method. If you loop through the main thread, you can exit by using the hook method ShutdownHook, show me code:

// mainThread mainThread n lines of code omitted here
Runtime.getRuntime().addShutdownHook(()->{
	comsumer.wakeup();
	try{}catch(Exception e){ mainThread.work(); }});Copy the code
  1. Before I quit,comsumer.close()This is necessary because it commits anything that hasn’t been committed yet, sends a message to the group broker telling it to leave the group, and then triggers rebalancing without waiting for the session to time out.

After the order

(4) Message queue-Kafka partition rebalancing (option)