The environment

  • springcloudAlibaba
  • docker
  • zookeeper
  • kafka

reference

Blog.csdn.net/yuanlong122…

The installation

Mirror pull

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
Copy the code

They are running

docker run -itd --name zookeeper -p 2181:2181 wurstmeister/zookeeper
Copy the code

Note: Many web sites run with run-i, which may cause an error when you connect later

Run the kafka

docker run -itd --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME = 127.0.0.1 - env KAFKA_ADVERTISED_PORT = 9092 wurstmeister/kafka: latest parameters KAFKA_ZOOKEEPER_CONNECT= Configure the path for ZooKeeper to manage kafka. 10.9.44.11:2181/kafka KAFKA_ADVERTISED_HOST_NAME= Register the address port of Kafka with ZooKeeper KAFKA_ADVERTISED_PORT= ZooKeeper port numberCopy the code

Note: -itd is also used here, otherwise the Kafka container may fail

To successfully start Kafka, run the two containers in turn, and then use Docker exec to enter the Kafka container

Go to the Kafka folder and create the theme using the following command

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic topic1
Copy the code

Note: If Kafka has only one server, then the –replication-factor configuration must be less than 2

Instead of creating a topic manually, kafka automatically creates a topic for us when we send a message in kafkatemplate-send (“topic1”, normalMessage), but in this case we create a topic with only one partition by default. Partitions also have no copies.

Introduction of depend on

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
Copy the code

Configuration instructions

Kafka: bootstrap-Servers: 127.0.0.1:9092 Producer: # Number of times a message was resent after an error occurred. Retries: 0 # When multiple messages need to be sent to the same partition, the producer puts them in the same batch. This parameter specifies the amount of memory, in bytes, that can be used by a batch. Batch-size: 16384 # Sets the size of the producer memory buffer. Buffer-memory: 33554432 # Key serializer: Org.apache.kafka.com mon. Serialization. StringSerializer # value way of serializing the value - serializer: Org.apache.kafka.com mon. Serialization. # StringSerializer acks = 0: producers in successful writing messages before will not wait for any response from the server. # acks=1: The producer will receive a successful response from the server as soon as the cluster leader node receives the message. # acks=all: The producer receives a successful response from the server only when all participating nodes have received the message. Acks: 1 # properties: use a custom partition selector # # {partitioner. Class: org. Example. Config. CustomizePartitioner} consumer: < span style = "font-size: 14px; line-height: 14px;" 1S # This property specifies what the consumer should do if it reads a partition with no offset or if the offset is invalid: # latest (default) If the offset is invalid, the consumer will start reading data from the latest record (records generated after the consumer started) # earliest: In the case of invalid offsets, the consumer will read the partition's record from the starting position auto-offset-reset: The default value is true. In order to avoid duplicate data and data loss, it can be set to false, and then manually submit the offset. Enable-auto-commit: False enable-auto-commit: False Enable-auto-commit: false Enable-auto-commit: false Org.apache.kafka.com mon. Serialization. StringDeserializer # value way of deserialized value - deserializer: Org.apache.kafka.com mon. Serialization. StringDeserializer group - id: # defaultConsumerGroup most mass consumption 50 # Max - poll - records: 50 Listener: # Number of threads running in the listener container. Concurrency: 5 # Listner commit ACK-mode: manual_immediate missing-topics-fatal: false every time you call ackCopy the code

practice

General Message Push

producers
@RestController @RequestMapping("kafka") public class KafkaController { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @getMapping ("pushMessage") public void pushMessage(@requestParam (required = false) String message) { kafkaTemplate.send("topic1", message); }}Copy the code
consumers
@Component public class KafkaConsumer { /** * @description: TODO consumption listener * errorHandler can specify methods to consume error calls * containerFactory can use custom message filters * @sendto ("topic2") message forwarding * @Author Shock Wave 2 * @date * @version 1.0 */ @kafkalistener (Topics = {"topic1"},containerFactory = "filterContainerFactory") public void @date * @version 1.0 */ @Kafkalistener (topics = {"topic1"},containerFactory = "filterContainerFactory" onMessage(ConsumerRecord<? ,? > Record, Acknowledgment ACK){// The second parameter is manual · ACK parameter // Which topic and partition message to consume, and print the message content system.out.println (" Simple consumption: "+record.topic()+"-"+record.partition()+"-"+record.value()); Ack.acknowledge (); }}Copy the code

Bring back notification push

/** * @description: * @author Shock wave 2 * @date * @version 1.0 */ @getMapping ("pushCallBackMessageV1") public void pushCallBackMessage(@RequestParam(required = false) String message) { kafkaTemplate.send("topic1", Message). The addCallback (success - > {/ / message is sent to the topic of String topic = success. GetRecordMetadata (). The topic (); / / message is sent to the partition int partition = success. GetRecordMetadata (). The partition (); / / the message within the partition of the offset long offset = success. GetRecordMetadata (). The offset (); Println (" send message successfully :" + topic + "-" + partition + "-" + offset); }, failure -> {system.out.println (" failed to send message :" + failure.getMessage()); }); }Copy the code

Transaction message

/** * @description: * @author Shock Wave 2 * @date * @version 1.0 */ @getMapping ("pushTransactionMessage") public void PushTransactionMessage (@requestParam (required = false) String Message){ Behind the error message not sent kafkaTemplate. ExecuteInTransaction (operations - > {operations. The send (" topic1 ", message); throw new RuntimeException("fail"); }); // Kafkatemplate-send ("topic1",message); // throw new RuntimeException("fail"); }Copy the code

Custom message filters

/** * @author shock wave 2 * @version 1.0 * @description: TODO Kafka message filter * @date 2021/8/3 16:19 */ @Component public class KafkaConfigConsumer {@autowired private ConsumerFactory consumerFactory; / / message filter @ Bean public ConcurrentKafkaListenerContainerFactory filterContainerFactory () { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); // The filtered messages will be discarded factory.setackdiscarded (true); / / message filtering strategy factory. SetRecordFilterStrategy (consumerRecord - > {System. Out. Println (consumerRecord); if (consumerRecord.value().toString().contains("22")) { return false; } // If a message returns true, it will be filtered. }); return factory; }}Copy the code

The consumer uses the following comment to specify the filter

@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
Copy the code

Custom partitioning

/** * @author shock wave 2 * @version 1.0 * @description: TODO (); /** * @author shock wave 2 * @version 1.0 * @description: TODO (); * * ② If a message is sent without a partition but a key is specified (Kafka allows one key for each message), hash the key value and route it to the specified partition according to the calculation result. In this case, all messages with the same key can be guaranteed to go to the same partition. * * (3) If neither patition nor key is specified, a patition is selected using the default kafka partitioning policy. * @date 2021/8/3 16:07 */ public class CustomizePartitioner implements Partitioner { @Override public int Partition (String s, Object O, byte[] bytes, Object o1, byte[] bytes1, Cluster Cluster) {// User-defined partition rule return 0; } @Override public void close() { } @Override public void configure(Map<String, ? > map) { } }Copy the code

Custom exception handling

@Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return (message, exception, User) -> {system.out.println (" user: "+message.getPayload()); return null; }; }Copy the code

The consumer uses the following comments to customize the exception handling of a consumption failure

@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory",errorHandler="consumerAwareErrorHandler")
Copy the code

Mass consumption

The configuration file

# Enable batch processing listener: type: batch # Batch processing a maximum of 50 data consumer: max-poll-records: 50 For details, see the preceding configuration fileCopy the code

consumers

/**
 * @description: TODO 使用list接收批量消息
 * @author shock wave 2
 * @date
 * @version 1.0
 */
@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
    System.out.println(">>>批量消费一次,records.size()="+records.size());
    for (ConsumerRecord<?, ?> record : records) {
        System.out.println(record.value());
    }
}
Copy the code

other

@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {@TopicPartition(topic = "topic1", partitions = { "0" }),@TopicPartition(topic = "topic2", partitions = "0", PartitionOffsets = @partitionOffset (partition = "1", initialOffset = "8"))}) * 1 * ② groupId: consumer groupId; * ③ Topics: topics that can be listened on * (4) topicPartitions: more detailed listener information can be configured, and topic, parition, offset listener can be specified.Copy the code