Message queues are somewhat point-to-point and publish-subscribe

Message queuing functions include decoupling, peak clipping, and asynchron

Core Kafka concepts

Broker: Kafka service node

Topic: Classifies messages

Partition: Partitions the topic

Replication: Leader and Flower, backing up partitions

Group: Each consumer Group can consume only one message

Docker installation Kafka

Install the zookeeper

docker pull wurstmeister/zookeeper

docker run -d --name myZK -p 2181:2181 wurstmeister/zookeeper:latest
Copy the code

Install the kafka

docker pull wurstmeister/kafka docker run -d \ --name myKafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT = 182.42.113.36: \ - e KAFKA_HEAP_OPTS = 2181"-Xmx256M -Xms128M"\ e KAFKA_ADVERTISED_LISTENERS = PLAINTEXT: / / 182.42.113.36: \ 9092Register kafka's address port with ZooKeeper- e KAFKA_LISTENERS = PLAINTEXT: / / 0.0.0.0: \ 9092Configure kafka's listening port
wurstmeister/kafka
Copy the code

Windows installation Kafka

Configure the config/server. The properties

Node id #
broker.id=0
# Where data is stored
log.dir=/usr/local/kafka/data
# Time to cache data
log.retention.hours=168
#zk Connected host and port
zookeeper.connect=localhost:2181
#zk connection timeout time
zookeeper.connection.timeout.ms=6000
Copy the code

Start the kafka

Properties = server.properties
./bin/kafka-server-start.sh -daemon ./config/server.properties
Copy the code

Viewing startup Logs

tail -f -n 1000 ./logs/kafkaServer.out 
Copy the code

Basic use of Kafka

Check out Kafka in ZK

# Storage node information
ls /brokers/ids

# Store leader information
ls /controller

# Record leader iteration information
ls /controller_epoch
Copy the code

The theme

Create topics, partitions, and replicasSh --zookeeper 182.42.113.36:2181 --create --topic topicName --partitions 1 --replication-factor 1# to check the topic. / kafka - switchable viewer. Sh - zookeeper 182.42.113.36:2181 - the listCopy the code

producers

The producer sends the messageSh --broker-list 182.42.113.36.9092 --topic myNewTopic./kafka-console-producer.sh --broker-list 182.42.113.36.9092 --topic myNewTopicCopy the code

consumers

Start receiving messages from the last messageSh --bootstrap-server 182.42.113.36.9092 --topic myNewTopic./kafka-console-consumer.sh --bootstrap-server 182.42.113.36.9092 --topic myNewTopicReceive messages from the start location
./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092 --from-beginning --topic myNewTopic
Copy the code

Consumer groups

No more than one consumer in the same group can receive the same message
./kafka-console-consumer.sh --bootstrap-server 182.42.113.36:9092 --consumer-property group.id=myGroup --topic myNewTopic

# current-offset: indicates the CURRENT OFFSET. Log-end-offset: indicates the total number of messages. LAG: indicates the number of remaining messagesSh --bootstrap-server 182.42.113.36.9092 --describe --group myGroupCopy the code

The log

  • Kafka theme partition information in docker is stored in /kafka/kafka-logs-7d0690199775/
  • __consumer_offsets -n stores the offset of the consumer’s consumption topic, which is submitted to the corresponding partition using the hash function

Kafka cluster

Create multiple Kafka services

Docker run -d --name myKafka \ -p 9093:9093 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_ZOOKEEPER_CONNECT=182.42.113.36:2181 \ -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"\ e KAFKA_ADVERTISED_LISTENERS = PLAINTEXT: / / 182.42.113.36: \ 9093 - e KAFKA_LISTENERS = PLAINTEXT: / / 182.42.113.36: \ 9093 wurstmeister/kafkaCopy the code

To view the zk

Zkcli. cmd-server 182.42.113.36:2181 ls /brokers/ IDSCopy the code

Check the topic

Leader: the Leader partition Replicas of the current node synchronize the Isr to the Replicas of other nodes: the survival state of the Replicas./kafka-topics. Sh --zookeeper 182.42.113.36.2181 --describe --topic topicNameCopy the code

Send a message

Sh --broker-list 182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094 --topic topicNameCopy the code

News consumption

. / kafka - the console - consumer. Sh -- -- the bootstrap - server 182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094 - from - beginning - topic topicNameCopy the code

Java client

Guide package

  • kafka-client
  • slf4j-api

producers

Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094");

prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topicName".0."myKey"."myValue");

/ / synchronize
RecordMetadata message = producer.send(producerRecord).get();

/ / asynchronous
RecordMetadata message = producer.send(producerRecord, (recordMetadata, e) -> {
    if(e ! =null) {
        System.out.println("Message sending failed");
    } else {
        System.out.println("Message sent successfully");
    }
}).get();

System.out.println("== Message sent complete ==");
System.out.println(message);
Copy the code

The synchronous

  • When ack=0, the result is returned immediately after being sent to the Kafka service
  • When ack=1, it is sent to the leader in Kafka and returns the result
  • If ack=-1, send min.insync.replicas to kafka and return the result
/ / set the ack
prop.put(ProducerConfig.ACKS_CONFIG, "1");

// Set the retry times and retry interval
prop.put(ProducerConfig.RETRIES_CONFIG, 3);
prop.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
Copy the code

Message buffers (producers have buffers, and the thread that sends the data)

// Set the send buffer size
prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 65535);

// Set the size of a send
prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 65535);

// Set the maximum interval for sending events
prop.put(ProducerConfig.LINGER_MS_CONFIG, 10);
Copy the code

consumers

Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "182.42.113.36:9092, 182.42.113.36:9093, 182.42.113.36:9094");

prop.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumer");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

consumer.subscribe(Arrays.asList("topicName"));

while(true) {
    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
    for(ConsumerRecord<String, String> record : records) {
        System.out.println("[Received message]"+ record.value()); }}Copy the code

Manual and automatic submission

  • Automatic commit is when consumers poll the message down to submit the offset
  • Manual commit is when the consumer consumes the message and then commits the offset
// Set automatic commit and commit interval
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

// Set up manual synchronous commit
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumer.commitAsync();

// Set up manual asynchronous commit
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumer.commitAsync((offsets, e) -> {});
Copy the code

Long polling poll

// Set the maximum number of pulls in a poll
prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

// Set the maximum duration to 1 second while pulling
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));

// Set the maximum interval between two pulls, if exceeded, the consumer is kicked out of the consumer group
prop.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000 * 30);
Copy the code

The heartbeat check

// Set the consumer to send a heartbeat to Kafka every 1 second
prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

// Set Kafka to check the consumer heartbeat every 10 seconds and kick the consumer out of the consumer group if it doesn't
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
Copy the code

Specify partition and offset consumption

// Specify partition 0 consumption
consumer.assign(Arrays.asList(new TopicPartition("topicName".1)));

//offset is consumed from 0
consumer.seekToBeginning(Arrays.asList(new TopicPartition("topicName".1)));

// Specify the consumption offset to start the consumption
consumer.seek(new TopicPartition("topicName".1), 3);

// Start consuming from the offset of the consumer group
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Copy the code

Integrated SpringBoot

Import dependence

  • org.springframework.kafka

The configuration file

spring:
  kafka:
    producer:
      retries: 3			Retry times of sending failures
      batch-size: 65535		# set the size of one send
      buffer-memory: 65534	Set the buffer size
      acks: 1				Set the reply mechanism
      key-serializer: . StringSerializer		
      value-serializer: . StringSerializer
    consumer:
      group-id: default-group		# Consumer Group
      enable-auto-commit: false		Set manual submission
      auto-offset-reset: earliest	Set consumer group earliest offset
      key-deserializer: . StringDeserializer
      value-deserializer: . StringDeserializer
      max-poll-record: 500			# Maximum number of pulls at one time
    listener:
      ack-mode: MANUAL Submit offset after a batch of poll processing
      ack-mode: MANUAL_IMMEDIATE Submit offset separately for each message
Copy the code

producers

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void send(a) {
    kafkaTemplate.send("topicName".0."myKey"."myValue");
}
Copy the code

consumers

@KafkaListener(groupId="myGroup", topicPartitions={ @TopicPartition(topic="t1", partition="0"), @TopicPartition(topic="t2", partition="1"), })
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String value = record.value();
    // Manually submit offset
    ack.acknowledge();
}
Copy the code

Kafka mechanism

Rebalance (when the number of partitions or consumer groups changes)

  • The formula calculates how many zones each consumer consumes

  • polling

  • Sticky, on the basis of not disrupting the original allocation

HW and LWO

  • HighWaterMark, which is accessible to consumers when messages in a shard are synchronized to all replicas
  • LogEndOffset, the last offset of the message, that is, the message can be consumed when both logendoffSets are reached

Preventing message loss

  • Setting ack to 1 or -1 prevents message loss
  • Min.insync.replicas Sets it to the number of replicas

Prevent consumers from repeating their purchases

  • The producer did not receive an ACK due to network jitter, causing the producer to send the ACK repeatedly
  • Solving idempotence starts from actual business

Ensure sequential message consumption

  • Specify a key to send messages sequentially to the same partition

Resolve message squeezing

  • Increase or increase the number of consumers