This is the 19th day of my participation in the August More Text Challenge

Recommend a very good kafka management platform, Kafka soul mate didi open source Logi-Kafkamanager one-stop Kafka monitoring and control platform


instructions

  • Starting with version 2.2.4, you can specify Kafka consumer properties directly on annotations that override all properties with the same name configured in the consumer factory. You cannot specify group.id and client.id attributes in this way. They will be ignored;

  • You can use #{… } or attribute placeholders (${… }) configure most of the properties on annotations on SpEL.

Such as:

   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}", clientIdPrefix = "myClientId")
Copy the code

The concurrency property will take the listener. concurrency value from the container. If it does not exist, the default is 3


@ KafkaListener,

Id Id of a listener

①. Naming rules for consumer threads

Fill in:

120 [INFO] Threads :Thread[consumer-ID5-1-C-1,5,main]-groupId: base-demo Consumer – id5 consumption

No ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] Threads, Thread [org. Springframework. Kafka. KafkaListenerEndpointContainer# 0-0 – C – 1, 5, the main] consumer – id7

② Listener ids in the same container must be unique

Otherwise, an error will be reported

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
Copy the code

③. Consumer group GroupId that will cover consumer factories

If the config file property has a consumer group kafka.consumer.group-id= base-demo, it is the default consumer group in the container. However, if @kafkalistener (id = “consumer-id7”, Topics = {“SHI_TOPIC3”}) So the current consumer’s consumer group is consumer-id7;

Of course, if you don’t want it as a groupId you can set the propertyidIsGroup = false; The default GroupId will still be used;

④ If the groupId attribute is configured, it has the highest priority

 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")
Copy the code

For example, in the code above, the final consumer’s consumption group GroupId is “groupid-test”

This ID attribute (if present) will be used as the Kafka consumer group.id attribute and override the configured attribute (if present) in the consumer factory. You can also set groupId explicitly or set it to idIsGroup false to restore the previous behavior grou.id using the consumer factory.

GroupId Consumer group name

Specify the consumer group name for the consumer group; For the configuration of the consumer group name, look at the ID listener id above

How do I get the consumer group.id

In the listener call KafkaUtils. GetConsumerGroupId groupId () can get the current; Can be printed in the log; You can tell which client is consuming it;

Topics specifies which topics to listen on (choose topicPattern or topicPartitions)

Can listen to multiple topics at the same time = {“SHI_TOPIC3″,”SHI_TOPIC4”}

TopicPattern matches Topic for listening (choose 1 from topics or topicPartitions)

TopicPartitions are explicitly partitioned

Listeners can be configured with explicit topics and partitions (and optional initial offsets)

@KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) })
public void listen(ConsumerRecord
        record) {... }Copy the code

Listen on partition 0,1 of topic1; Listen on partition 0 of topic2, and partition 1 consumes from offset 100;

ErrorHandler Indicates the exception handling

Achieve KafkaListenerErrorHandler; Then do some exception handling;

@Component public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler { @Override public Object handleError(Message<? > message, ListenerExecutionFailedException exception) { return null; } @Override public Object handleError(Message<? > message, ListenerExecutionFailedException exception, Consumer<? ,? > consumer) { //do someting return null; }}Copy the code

When called, fill in beanName; For example errorHandler = “kafkaDefaultListenerErrorHandler”

ContainerFactory Specifies the listener factory

Specify the factory class to generate listeners;

For example, I write a factory class for batch consumption

    /** * listener factory batch consumption *@return* /
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        Consumerconfig. MAX_POLL_RECORDS_CONFIG is set in the Kafka configuration parameter for each batch
        factory.setBatchListener(true);
        return factory;
    }
Copy the code

So let’s say containerFactory = “batchFactory”

ClientIdPrefix Indicates the client prefix

The kafka. Consumer. Client-id attribute overrides the consumer factory; The prefix is followed by -n, which is a number

Concurrency concurrency

Concurrency is the concurrency of a consumer factory. For example, in the single-player case, you set it to 3; It is equivalent to starting three clients to allocate consumption partition; Concurrency =concurrency* Concurrency number of machines; See the property concurrency setting for “RoundRobinAssignor”.

    /** * listener factory *@return* /
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }
Copy the code
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)
Copy the code

Although the factory used is concurrencyFactory(Concurrency Configuration 6); But he ends up generating 1 listener;

Properties Configures other properties

Kafka attributes in the see org. Apache. Kafka. Clients. Consumer. ConsumerConfig; The same name can be changed;

usage

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1" , clientIdPrefix = "myClientId5",groupId = "groupId-test", properties = { "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")
Copy the code

@ KafkaListener use

KafkaListenerEndpointRegistry

    @Autowired
    private KafkaListenerEndpointRegistry registry;
       / /... Gets all registered listeners
        registry.getAllListenerContainers();

Copy the code

Set the input parameter validator

When you put the Spring starter for the Boot and verifying when used together, will automatically configure LocalValidatorFactoryBean: as follows

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    privateLocalValidatorFactoryBean validator; .@Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator); }}Copy the code

use

@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {... }@Bean
public KafkaListenerErrorHandler validationErrorHandler(a) {
    return (m, e) -> {
        ...
    };
}
Copy the code