This article summarizes the Settings for submitting offset when consuming data using Springboot-kafka.

There are several Settings for the commit offset. If enable.auto.mit in the Consumer Property is set to true, Kafka automatically commits offsets based on its configuration.

@Bean
public Map<String, Object> consumerConfigs(a) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
        bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
    // highlight-start
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    // highlight-end
    return props;
}
Copy the code

If false, the Listener container supports multiple AckMode Settings. The list is as follows. The default AckMode is BATCH. In springboot-kafka version 2.3 and later, the default value of enable.auto.mit is false, and before this, the default value is true.

The consumer’s poll() method returns one or more ConsumerRecords. Each of the ConsumerRecords calls a MessageListener, and the following list describes the actions taken by the container on each AckMode (when no transactions are used) :

  • RECORD: Submits the offset when the listener returns from processing the RECORD.
  • BATCH: The offset is submitted after all the records returned by poll() have been processed.
  • TIME: Commit offset when all records returned by poll() have been processed and have exceeded the ackTime since the last commit.
  • COUNT: Commit offset when all records returned by poll() have been processed and ackCount records have been received since the last commit.
  • COUNT_TIME: Similar to TIME and COUNT, but commits if either condition is true.
  • MANUAL: the listener is responsible for calling Acknowledgment. Acknowledge (), after the call, submit a similar BATCH behavior.
  • MANUAL_IMMEDIATE: when the listener calls the Acknowledgment. Acknowledge () method, after submit immediately.
  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // highlight-start
    factory.getContainerProperties().setAckMode(AckMode.TIME);
    // highlight-end
    return factory;
  }
Copy the code

When using transactions, offsets are sent to the transaction, semantically equivalent to RECORD or BATCH, depending on the Listener type (RECORD or BATCH).

: : : tip MANUAL and MANUAL_IMMEDIATE pattern, which requires the listener is AcknowledgeingMessageListener or BatchAcknowledgeingMessageListener: : :

Consumers use commitSync() or commitAsync(), which is determined by the syncCommits attribute. The default is true. You can set setSyncCommit to this attribute. You can set the synchronization timeout with setSyncCommitTimeout.

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // highlight-start
    factory.getContainerProperties().setSyncCommits(true);
    factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(1));
    // highlight-end
    return factory;
  }
Copy the code

When asynchronous commit is used, you can set the callback method. The default callback is LoggingCommitCallback. If the log level is DEBUG, a success log is printed.

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.TIME);
    // highlight-start
    factory.getContainerProperties().setSyncCommits(false);
    factory.getContainerProperties().setCommitCallback(new OffsetCommitCallback(){

      @Override
      public void onComplete(Map
       
         map, Exception e)
       ,> {
        LOG.INFO(map.values());
        LOG.ERROR(e.printStackTrace(););
      }
    });
     // highlight-end
    return factory;
  }
Copy the code

This should be Acknowledgment in the following ways:

public interface Acknowledgment {
  // This method allows the listener to control when the offset is committed.
  void acknowledge(a);

  // Record listener is used
  default void nack(long sleep) {
    throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
  }

  // Batch listener is used
  default void nack(int index, long sleep) {
    throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment"); }}Copy the code

When using a Record listener, when nack() is called, all offsets pulled from the previous poll() that are being processed are committed, the rest are discarded, and failed or unprocessed records are passed on to the next poll. By setting the sleep parameter, the consumer thread can be paused before redelivery. This and throw an exception when the container configuration SeekToCurrentErrorHandler function similarly.

When using the Batch Listener, you can specify the failed index in the batch. When nack() is called, offsets are committed for the records that preceded the index, and a search for failed and discarded records is performed on the partition so that they can be repassed on the next poll(). This is for SeekToCurrentBatchErrorHandler improvement, the latter only for the whole batch to deliver.