Kafka study

Before learning a little bit but not systematic learning, this learning is to refer to the video, mainly to learn the use of Kafka and the overall knowledge, as well as the related knowledge points for the interview, mainly for the spring recruitment preparation, but also in order to let yourself learn more and more knowledge, continue to come on!

1. Introduction of Kafka

Kafka is a message queue used to process large amounts of data. It is usually used for log processing. Since it is a message queue, Kafka has the characteristics of a message queue.

Benefits of message queues

  • decoupling

    • The coupled state means that when you implement a function, you connect directly to the current interface, and with the message queue, you can send the corresponding message to the message queue, so that if something goes wrong with the interface, it will not affect the current function.
  • Asynchronous processing

    • Asynchronous processing replaces the previous synchronous processing. Instead of letting the process finish and return a result, asynchronous processing can send a message to a message queue and return the result, leaving it to another business processing interface to pull the consumption processing from the message queue.
  • Traffic peak clipping

    • When the traffic is high, the message queue can be used as the middleware to save the peak traffic in the message queue, thus preventing the high request of the system and reducing the request processing pressure of the server.

1.1 Kafka consumption Patterns

There are two main types of consumption in Kafka: one is one-to-one consumption, or point-to-point communication, where one sends and one receives. The second is one-to-many consumption, where a message is sent to a message queue and the consumer pulls the message consumption based on the subscription to the message queue.

One to one

The message producer publishes a message to a Queue, notifying the consumer to pull the message from the Queue for consumption. After a message is consumed, it is deleted. Queues support multiple consumers, but for a message, only one consumer can be consumed, that is, a message can only be consumed by one consumer.

More than a pair of

This model is also known as the publish/subscribe pattern, it is using Topic store messages, message producers will release to the Topic, at the same time there are multiple consumers to subscribe to this Topic, consumers can consume news, notice the news will be published to the Topic more consumer spending, consumer spending data, the data will not be cleared, Kafka keeps it for a period of time by default and then removes it.

1.2 Kafka infrastructure

Kafka, like other MQS, has its own infrastructure, consisting of Producer, Kafka cluster Broker, Consumer, and Zookeeper.

  • Producer: A message Producer who publishes messages to Kafka.
  • Consumer: Message Consumer, the client that pulls the message consumption from Kafka.
  • Consumer Group: A Group that contains multiple consumers. Messages in different sections of the current Topic in the Consumer Consumer Broker do not affect each other. All consumers belong to a Consumer Group, that is, a Consumer Group is a logical subscriber. Messages in a partition can only be consumed by one consumer in a consumer group
  • A Kafka server is a Broker. A cluster consists of multiple brokers. A Broker can hold multiple topics.
  • Topic: A Topic can be understood as a queue, and both producers and consumers are oriented to a Topic
  • Partition: For scalability purposes, a very large Topic can be distributed across multiple brokers. A Topic can be divided into multiple partitions, each of which is an ordered queue. (Ordered partitions do not guarantee global order)
  • Replica: Replication. To ensure that Partition data on a node in a cluster is not lost when a node fails, Kafka works normally. Kafka provides a Replica mechanism
  • Leader: The primary role of multiple replicas per partition, the object for which the producer sends data, and the object for which the consumer consumes data.
  • Follower: a Follower of multiple copies in a partition. The Follower synchronizes data from the Leader in real time. If the Leader fails, a Follower becomes the new Leader.

The above Topic will generate multiple partitions, which are divided into the Leader and followers. Messages are usually sent to the Leader, and the followers keep in sync with the Leader through data synchronization. Consumption will also occur in the Leader. In this case, messages from the Leader and each Follower are consumed respectively. When the Leader fails, a Follower becomes the primary node and the offset of the message is aligned.

1.3 Install and use Kafka

Docker installation can see this article:Docker&Docker command learning

#Docker pulls kafka and ZooKeeper images directly
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper 
#Start ZooKeeper first. If not, start Kafka without a place to register messages
docker run -it --name zookeeper -p 12181:2181 -d wurstmeister/zookeeper:latest
#Start the Kafka container, note that you need to start three, note that the port mapping, are all mapped to 9092
#The first oneDocker run-it --name kafka01 -p 19092:9092 -d -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e docker run-it --name kafka01 -p 19092:9092 -d -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e KAFKA_ADVERTISED_LISTENERS = PLAINTEXT: / / 192.168.233.129-19092 - e KAFKA_LISTENERS = PLAINTEXT: / / 0.0.0.0:9092 wurstmeister/kafka:latest#The second stageDocker run-it --name kafka02 -p 19093:9092 -d -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e KAFKA_ADVERTISED_LISTENERS = PLAINTEXT: / / 192.168.233.129-19093 - e KAFKA_LISTENERS = PLAINTEXT: / / 0.0.0.0:9092 wurstmeister/kafka:latest#The third stageDocker run-it --name kafka03 -p 19094:9092-d -e KAFKA_BROKER_ID= 2-e KAFKA_ZOOKEEPER_CONNECT=192.168.233.129:12181 -e KAFKA_ADVERTISED_LISTENERS = PLAINTEXT: / / 192.168.233.129-19094 - e KAFKA_LISTENERS = PLAINTEXT: / / 0.0.0.0:9092 wurstmeister/kafka:latestCopy the code

Note that the above port mappings are all mapped to port 9092 in Kafka! Otherwise it will not be able to connect!

Specific command learning

#Create topic name first, three partitions, and one copy/kafka-topics. Sh --zookeeper 192.168.233.129:12181 --create --topic first --replication-factor 1 -- Partitions 3#Check the first topic information/kafka-topics. Sh --zookeeper 192.168.233.129:12181 -- Describe --topic first topic: first PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: first Partition: 0 Leader: 2 Replicas: 2 Isr: 2 Topic: first Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: first Partition: 2 Leader: 1 Replicas: 1 Isr: 1#Call the producer to produce the message. / kafka - the console - producer. Sh - broker - list 192.168.233.129:19092192168 233.129:19093192168 233.129:19094 - topic first#Calling the consumer to consume the message, from-beginning means to read all the messages. / kafka - the console - consumer. Sh -- -- the bootstrap - server 192.168.233.129:19092192168 233.129:19093192168 233.129:19094 - topic  first --from-beginning#Delete the topic

Copy the code

Delete the topic

Sh –zookeeper 192.168.233.129:12181 –delete –topic second

As shown in the preceding figure, the deletion is marked as marked for deletion but not for deletion. If the deletion is required, set delete.topic.enable=true in config/server.properties

Changing the number of partitions

/kafka-topics. Sh --zookeeper 192.168.233.129:12181 --alter --topic test2 --partitions 3

2. Kafka’s senior

2.1 Workflow

Messages in Kafka are classified by topic. Producers produce messages and consumers consume messages, both of which are topic-oriented.

A Topic is a logical change. A Partition is a physical concept. Each Partition corresponds to a log file, which stores the data produced by the producer. partition=log

The data produced by the Producer is continuously appented to the end of the log file, and each data has its own offset. Each consumer in the consumer group records the offset consumed in real time, so that the consumer can continue to consume from the last position when an error is recovered. Process: Producer => Topic (Log with offset) => Consumer.

2.2 File Storage

Kafka file storage is also through the local disk storage, mainly through the corresponding log and index files to save specific message files.

A log file in Kafka uses 1 GB as its dividing point. When a. Log file is larger than 1 GB, a new. Log file is created. Kafka uses == sharding == and == index == to speed up location.

In Kafka, where the logs are stored, there will be consumption offsets and partition information. Partition information consists of.index and.log files.

Partitions are used for backup, so the same partition is stored on different brokers. In kafka01, there are also files (replicas) of this partition in Kafka03. A partition can have multiple replicas, one of which is the leader. The rest are followers.

If the.log file exceeds its size, a new.log file will be generated. As shown below.

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
Copy the code

In this case, how to quickly locate the data?

The offset of the message stored in the.index file + the true starting offset. The.log file stores real data.

  • We start with binary search.indexFile to find the specific offset of the current message, as shown in the figure above, search is 2, find the second file is 6, then locate to a file.
  • And then go through the first one.indexFile byseekLocate the element at position 3 and get the start offset + current file size = total offset.
  • After obtaining the total offset, locate directly to.logFile to quickly get the current message size.

2.3 Producer Zone Policies

Reasons for partitioning

  • Easy to scale in a cluster: each partition can be adjusted to fit the machine on which it is located, and a Topic can be composed of multiple partitions, so that the cluster can fit the appropriate data
  • Increase concurrency: Read and write data in Partition units. It’s like multiplex.

Principle of zoning

  • When a partition is specified, the specified value is directly used as the value of the partition
  • When no partition is specified, but there is a key value, the hash value of the key is mod the total number of topic partitions to obtain the partition value
  • If there is no value or partition, the first call generates an integer at random, and each subsequent call increases on this integer. The partition value is mod to the total number of available partitions in the topic, that is, the round-robin algorithm.

2.4 Producer ISR

In order to ensure that the data sent by the producer can be reliably sent to the specified topic, each partition of the topic needs to send Ackacknowledgement to the producer after receiving the data sent by the producer, If the producer receives an ACK, the producer sends the ack again. Otherwise, the producer resends the ACK.

Time to send an ACK

Ensure that all followers are synchronized with the leader, and the leader sends an ACK. This ensures that a new leader can be selected after the leader fails (mainly to ensure that data in the followers is not lost).

How much synchronization is completed before the follower sends an ACK

  • After more than half of the followers are synchronized, an ACK can be sent
  • An ACK can be sent only after all followers have been synchronized

2.4.1 Replica data synchronization Policy

An ACK is sent when half of the followers are synchronized

The advantage is low latency

The disadvantage is that when the new leader is elected, it tolerates the failure of N nodes and requires 2N +1 duplications (because half of the duplications need to agree, the prerequisite for the election of the new leader is that more than half of the remaining duplications are available). The fault tolerance rate is 1/2

All followers are synchronized. The ACK is sent

When the new leader is elected, it only needs n+1 copies to tolerate the failure of n nodes, because only one person is required to agree to send an ACK

The disadvantage is high latency because all copies need to be synchronized

Kafka prefers the second option because it has better container rate, and for partitioned data, each partition has a large amount of data, the first option creates a large amount of data redundancy. Although the second type of network latency is higher, the impact of network latency on Kafka is less.

2.4.2 ISR(Synchronous Replica Set)

guess

After the second method is adopted for ACK synchronization, if the leader receives data and all followers start to synchronize data, but one follower fails to synchronize data with the leader due to some fault, the leader will wait until the synchronization is complete before sending an ACK. How do you solve this problem?

To solve

The leader maintains a dynamic in-sync replica set (ISR), that is, the follower set that is synchronized with the leader. When the followers in the ISR complete data synchronization, an ACK is sent to the leader. If a follower does not synchronize data to the leader for a long time, the follower will be kicked out of the ISR. The threshold for this is set by the replica.lag.time.max.ms parameter. When the leader fails, a new leader is elected from the ISR.

2.5 Producer ACK mechanism

For some unimportant data, the reliability of the data is not very high and a small amount of data loss can be tolerated. Therefore, it is not necessary to wait until all followers in the ISR successfully accept the data.

Kafka provides users with three levels of reliability, and users choose different configurations based on the tradeoff between reliability and latency requirements.

Ack Parameter Configuration

  • 0: The producer does not wait for an ACK from the broker. This operation provides the lowest latency. The broker receives an ACK before writing to disk, and may lose data if the broker fails

  • 1: The producer waits for an ACK from the broker. The LEADER of the partition returns an ACK after a successful landing. If the leader fails before the synchronization succeeds, data will be lost. (Only leader plunger)

  • 1 (all) : The producer waits for the BROKER’s ACK and the partition leader and the ISR’s followers to complete the landing. However, if the leader fails after the synchronization of followers is complete and before the broker sends an ACK, data will be duplicated. (The duplicate data here is because the data is not received, so the data is repeated.)

Producer returns ACK, 0 returns directly without a landing, 1 leader falls and returns, -1 all falls and returns

2.6 Data Consistency problems

  • Log End Offset (LEO) : The last Offset of each copy
  • High Watermark (HW) refers to the maximum offset that can be seen by consumers and the smallest LEO in the ISR queue.

The follower and leader are faulty

  • Followers failures: When a follower fails, a temporary ISR is generated. After the follower recovers, the follower reads the last HW recorded on the local disk, intercepts the part higher than the HW, and synchronizes the HW from the log file to the leader. Wait until the LEO of the follower is greater than or equal to the HW of the partition. That is, after the follower catches up with the leader, the follower can rejoin the ISR.
  • Leader failure: If the leader fails, a new leader is selected from the ISR. To ensure data consistency among multiple copies, the followers cut off the log files that are higher than the HW value and then synchronize data from the new leader.

This only guarantees data consistency between replicas, not data loss or duplication

2.7 ExactlyOnce

The ACK level of the Server is set to -1 (all) to ensure that data is not lost between the producer and the Server. That is, At Least Once. Setting the server ACK level to 0 ensures that each producer message will be sent At Most Once.

At Least Once, we can ensure that the data is not lost, but not repeated. At Most Once, we can ensure that the data is not repeated, but not lost. For important data, we can ensure that the data is not lost, but Exactly Once.

Before the 0.11 version of Kafka, only to ensure that the data is not lost, in the downstream of the repeated data deduplicated operation, more than a number of downstream applications, the global deduplicated, has a great impact on performance.

Version 0.11 of Kafka introduces an important feature: idempotentality. == Idempotentality means that no matter how many times a Producer sends duplicate data to the Server, the Server will persist only one piece of data ==. Idempotency combined with At Least Once semantics constitutes Kafka’s Exactly Once semantics.

Enable. idempotence=true in the Producer parameter. A Producer with idempotence on is assigned a PID when it is initialized. is cached. If a message has the same primary key, the Broker will persist only one. ,partition,seqnumber>

However, PID changes after restart, and different partitions have different primary keys, so idempotentiality does not guarantee Exactly Once across partitions and sessions.

3. Consumer partition allocation strategy

Consumption patterns

The Consumer reads data from the broker in a pull and pull manner.

The push pattern is difficult to adapt to consumers with different consumption rates, because the message sending rate is determined by the broker, whose goal is to deliver the message as quickly as possible, but this can leave the consumer too late to process the message, typically resulting in denial of service and network congestion. The pull approach, on the other hand, lets the consumer consume messages at the appropriate rate based on their consumption processing power.

The downside of the pull mode is that if there is no data in Kafka, the consumer can get stuck in a loop (because the consumer listens to the state for the data to be consumed) and always returns empty data. To this end, the consumer will pass in a timeout parameter when consuming the data. The consumer will wait for a period of time before returning, which is timeout.

3.1 Partition Allocation Policy

There are more than one consumer in a consumer group and more than one partition in a topic. Therefore, the problem of partition allocation is inevitably involved, that is, the problem of determining which partition is consumed by the consumer.

Two allocation strategies in Kafka:

  • Round – robin cycle
  • range

Round-Robin

All partitions are allocated by polling. The main steps of this policy are as follows:

Let’s assume that there are three topics: T0 / T1 / T2, which have 1/2/3 partitions respectively. There are six partitions, t0-0/ T1-0 / T1-0 / T2-0 / T2-1 / T2-2. Let’s assume that we have three consumers, C0, C1 and C2. T0, T1, C2: T0 / T1 / T2.

In this case, the round-robin mode sorts partitions and consumers according to the partition dictionary. Then, the round-robin mode traverses the partition. If the partition is subscribed to, the round-robin mode consumes the partition. That is, consumers are polled by partition, and messages are consumed.

The partition is looping over the consumer and is subscribed to by the current consumer, if the message goes down with the consumer (the message is consumed), otherwise the consumer goes down with the message (the message is not consumed). Polling results in an inconsistent number of partitions for each Consumer, resulting in uneven stress among consumers. The C2 above is under relatively large pressure due to the large number of subscriptions.

Range

Range’s redistribution strategy first calculates the number of partitions that each Consumer will host, and then assigns a specified number of partitions to the Consumer. Suppose there are two consumers, C0 and C1, and two topics, T0 and T1, each of which has three partitions, so there are six partitions in total: T0-0, T0-1, T0-2, T1-0, T1-1, t1-2. The distribution is as follows:

  • The range is allocated by topic at once, that is, the consumer traverses topic, T0, which has three partitions and two consumers subscribed to the topic, and these partitions and consumers are arranged in lexicographical order.
  • The number of partitions each Consumer gets is evenly distributed, and if not fully divided, the extra partitions are allocated to each Consumer in lexicographical order. In this way, each topic is allocated to the subscribing consumers, and finally the topic partition is allocated.

Distributing by range, which essentially traverses each topic and distributes them evenly according to the number of consumers to which they subscribe, with the rest distributed in the consumer’s lexicographical order, results in the first consumer getting more partition. Resulting in uneven stress among consumers.

3.2 Storage of consumer offset

In the process of consumption, there may be some faults such as power failure and downtime. After the Consumer recovers, he needs to continue consumption from the position before the fault. Therefore, the Consumer needs to record the offset in real time so that he can continue consumption after the fault recovers.

Before kafka 0.9, consumers kept offsets in ZooKeeper by default. After that, consumers kept offsets in kafka’s built-in topic, which is__consumer_offsets

#Read data using __consumer_offsets./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 192.168.233.129:19092192168 233.129:19093192168 233.129:19094 - the formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config .. /config/consumer.properties --from-beginningCopy the code

3.3 Consumer Group cases

Test the number of consumers in the same consumer group, so that there is one consumer spending at a time.

#First of all need to modify the config/consumer. The properties files, can be modified to a temporary file
group.id=xxxx
#Start consumer/kafka-console-consumer.sh --bootstrap-server 192.168.233.129:19093 --topic test --consumer.config.. /config/consumer.properties#Start producer/kafka-console-producer.sh --broker-list 192.168.233.129:19092 --topic test#Send a message
Copy the code

Results the figure

You can see that if a group is selected, a message will only be consumed by one consumer in a group, and only if one of the consumers is logged out by CTRL + C will the other consumer have a chance to consume.

4. Efficient read-write &Zookeeper functions

4.1 Efficient read and write for Kafka

Sequential write to disk

Kafka’s producer writes data to the log file and apends it to the end of the file. The data on the official website shows that the same disk can be written sequentially up to 600M/s, while random write is only 200K/s. This is related to the mechanical structure of the disk. Because it saves a lot of head addressing time.

Zero copy technique

NIC: Network Interface Controller Network Interface Controller

Kafka’s zero-copy technology

This is a normal read operation:

  • The operating system reads data from disk files into a page cache in kernel space
  • The application reads data from kernel space into the user-space buffer
  • The application writes the read data back into the kernel space and into the socket buffer
  • The operating system copies the data from the socket buffer to the network interface, at which point the data is sent over the network to the consumer

  • Zero-copy technology only copies the data from the disk file to the page cache once, and then sends the data directly from the page cache to the network (the same page cache can be used for sending to different subscribers), thus avoiding duplication.

If there are 10 consumers, the traditional method of data replication is 4*10=40 times. With the zero-copy technique, the data is replicated 1+10=11 times. One is copied from disk to the page cache, and 10 times is read from the page cache by each of the 10 consumers.

4.2 Functions of ZooKeeper in Kafka

In a Kafka cluster, one broker is elected as the Controller, which manages the downlink and downlink of cluster brokers, the allocation of partition replicas for all topics, and the election of the leader. The Controller relies on ZooKeeper for work management.

The process of electing a Partition Leader

5. The transaction

Kafka has introduced transactional support since version 0.11, which ensures that Kafka produces and consumes sessions across partitions on the basis of Exactly Once semantics, with either all successful or all failed.

5.1 Producer transaction

To make cross-partition and cross-session transactions, a globally unique Transaction ID should be introduced, and the PID obtained by the Producer (which can be understood as the Producer ID) should be bound to the Transaction ID. In this way, when the Producer restarts, it can obtain the original PID based on the ID of the ongoing Transaction.

To manage transactions, Kafka introduces a new component, the Transaction Coordinator. The Producer interacts with the Transaction Coordinator to obtain the task state corresponding to the Transaction ID. A Transaction Coordinator is also responsible for writing Transaction information to an internal Topic. In this way, even if the entire service is restarted, the ongoing Transaction state can be recovered and can continue.

5.2 Consumer affairs

For consumers, the transaction guarantee is relatively weak compared with that of producers. In particular, it cannot guarantee that the Commit information will be consumed accurately. This is because consumers can access any information through offset and different Segment files have different declaration cycles. Messages for the same transaction may be deleted after restart.

6. API producer process

Kafka’s Producer sends messages in an asynchronous way. In the process of sending messages, two threads are designed, the main thread and the Sender thread, and one thread shares the variable RecordAccumulator. The main thread sends the message to the RecordAccumulator, and the Sender thread keeps pulling the message from the RecordAccumulator and sending it to the Kafka Broker.

6.1 Sending Common Producers asynchronously

/ * * *@author caoduanxi
 * @Date2021/1/13 and *@MottoKeep thinking, keep coding! * Kafka producer */
public class CustomProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // The configuration can be a string or a static variable name configured in producerConfig
        // Set the cluster configuration
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.129:19092");
        / / ack mechanism
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // Retry times
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        // Batch size: send messages only when message size is 16384
        props.put("batch.size".16384);
        // Wait time: If the message size is not batch.size, the message is directly sent after linger
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // ReadAccumulator Buffer size
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        / / the serialization
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        / / producer
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        // Production message
        for (int i = 1; i <= 10; i++) {
            // Construct the message body
            producer.send(new ProducerRecord<>("test"."test-" + i, "test-"+ i)); } producer.close(); }}Copy the code
  • BATCH_SIZE_CONFIG = "batch.size": the message tobatch.sizeSize before the producer sends the message
  • LINGER_MS_CONFIG = "linger.ms": If the message size does not changebatch.sizeSize, then waitlinger.msSend directly after time

At this point we can turn on a consumer listener in our shell.

The test found that Kafka was very fast, even with 100 watts of data

6.2 Asynchronously send back the producer of the callback function

The callback function is called asynchronously when the producer receives an ACK. This method has two parameters, RecordMetaData and Exception. If Exception is null, the message is successfully sent; if Exception is not null, the message fails to be sent.

A message sending failure initiates a retry mechanism, but requires manual retries in the callback function

/ * * *@author caoduanxi
 * @Date 2021/1/13 11:08
 * @MottoKeep thinking, keep coding! * Kafka with a callback producer */
public class CallBackProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.129:19092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        props.put("batch.size".16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 1; i <= 100; i++) {
            // Construct the body of the message, using a new CallBack() function.
            producer.send(new ProducerRecord<>("test"."test-" + i, "test-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println(recordMetadata.partition() + "-" + recordMetadata.offset());
                    } else{ e.printStackTrace(); }}}); } producer.close(); }}Copy the code

6.3 Testing a Producer Zone Policy

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {};
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {};
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {};
public ProducerRecord(String topic, Integer partition, K key, V value) {};
public ProducerRecord(String topic, K key, V value) {};
public ProducerRecord(String topic, V value) {};
Copy the code

The partition parameter in ProducerRecord (ProducerRecord, ProducerRecord, ProducerRecord) is the specified partition.

Note that if you specify a specific partition, the message will be sent to that particular partition number, but note that if your Topic partition has only one default and you want to send the message to partition 1, the message will fail! Because you only have one partition, partition 0. So be careful when building a topic.

Default partition construct

// Construct the body of the message by adding the specific partition, where 2 is the specific partition number
producer.send(new ProducerRecord<>("aaroncao".2."test-" + i, "test-" + i), new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e == null) {
            System.out.println(recordMetadata.partition() + "-" + recordMetadata.offset());
        } else{ e.printStackTrace(); }}});Copy the code

Custom partition constructor

The custom partition constructor needs to implement the Partitioner, implementing its methods.

You need to add your own implemented Partitioner to the parameters of Producer. Otherwise, DefalutPartitioner will be used by default and entered into the Partitioner. Use CTRL + H in the Idea to see the specific implementation class.

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.duanxi.kafka.partition.CustomPartitioner");
Copy the code
/ * * *@author caoduanxi
 * @Date2021/1/13 * then,@MottoKeep thinking, keep coding! * The producer in Kafka provides custom partitions */
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// Integer nums = cluster.partitionCountForTopic(topic);
// return key.hashCode() % nums;
        return 1;
    }
    @Override
    public void close(a) {}@Override
    public void configure(Map
       
         configs)
       ,> {}}Copy the code

By default, a Producer sends messages asynchronously. If a Producer needs to send messages synchronously, it needs to send messages synchronouslySendAnd then you get the concreteFutureBy callingFuture.get()Methods can be temporarily blocked in order to be sent synchronously.

7. API consumers

7.1 Simple Consumers

Enable.auto.com MIT =true;

/ * * *@author caoduanxi
 * @Date 2021/1/13 12:32
 * @MottoKeep thinking, keep coding! * Consumer consumers in Kafka */
public class CustomConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.129:19092");
        // Set the consumer group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "abc");
        // Set autocommit for offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // Set the offset automatic submission interval
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // The producer is serialized and the consumer is deserialized
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // We need to subscribe to a specific topic
        consumer.subscribe(Collections.singletonList("customconsumer"));
        // The listener is always listening
        while (true) {
            // Since the consumer gets the message consumption through pull, the interval is set to 100ms
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
            // Iterate over the obtained results
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.printf("offset=%d, key=%s, value=%s\n", consumerRecord.offset(),consumerRecord.key(),consumerRecord.value()); }}}}Copy the code

Output results:

offset=0, key=test-1, value=test-1 offset=1, key=test-2, value=test-2 offset=2, key=test-3, value=test-3 offset=3, key=test-4, value=test-4 offset=4, key=test-5, value=test-5 offset=5, key=test-6, value=test-6 offset=6, key=test-7, value=test-7 offset=7, key=test-8, value=test-8 offset=8, key=test-9, value=test-9 offset=9, key=test-10, value=test-10

7.2 Consumer resets offset

It is easy for consumers to ensure reliability when consuming data because Kafka is persistent and there is no fear of data loss. However, the Consumer may encounter faults such as power failure or downtime during the consumption process. After recovering, the Consumer needs to continue consumption from the position before the fault. Therefore, the Consumer needs to record the offset position of his consumption in real time so that he can continue consumption after the fault is recovered.

The maintenance of offset is an issue that consumers must consider when consuming data.

// Offset reset, you need to set automatic reset to earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
Copy the code

Change the id of the consumer group. Otherwise, since a message can only be consumed once by a consumer in a consumer group, the previous message will not be consumed again. Even if the offset reset is set, the reset has no effect.

Notice, hereauto.offset.reset="earliest"Is added to the Linux console when the consumer listens--from-beginningCommand.

Auto. Offset. The reset value

  • Earliest: Reset offset to the earliest position
  • Latest: Resets offset to the latest position. == Default value ==
  • None: Throws an exception if the previous offset is not found in the consumer group
  • Anything else: Throws an exception to the consumer

7.3 Consumer save offset read problem

Enable.auto.com MIT =true Indicates that the offset file is automatically submitted. It is automatically submitted by default.

7.4 Consumers manually submit offset

Auto-submitting offsets is convenient, but because of the fact that it’s time-based, it’s hard for developers to know when to commit offsets, so Kafka provides an API for manually submitting offsets.

There are two methods to manually submit the offset:

  • CommitSync: commitSync is synchronized
  • CommitAsync: commitAsync is asynchronous

Similarity: Both commits commit the highest offset of the batch of data pulled by this poll.

Differences: commitSync blocks the current thread until the commit is successful. If the commit fails, commitSync will retry automatically (the commit fails due to uncontrollable factors). CommitAsync has no failed retry mechanism and may fail to commit.

Synchronization to submit

/ * * *@author caoduanxi
 * @Date2021/1/13: *@MottoKeep thinking, keep coding! * Kafka consumers submit offset */ synchronously
public class SyncCommitOffset {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.129:19092");
        // Set the consumer group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
        // Set autocommit for offset to false
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("customconsumer"));
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
            // Iterate over the obtained results
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.printf("offset=%d, key=%s, value=%s\n", consumerRecord.offset(),consumerRecord.key(),consumerRecord.value());
            }
            // If the commit is synchronized, the commit will be blocked until the commit is successful. You can set a timeout period for the commit. If the timeout period is exceeded, the commit will be releasedconsumer.commitSync(); }}}Copy the code

Asynchronous submission

Asynchronous commit has an extra offset commit callback function.

consumer.commitAsync(new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        if(exception ! =null) {
            System.out.println("Commit failed, offset = "+ offsets); }}});Copy the code

7.5 Analysis of data leakage consumption and repeated consumption

Both synchronous and asynchronous submission of offset may cause missed or repeated consumption of data. The first submission of offset may cause missed consumption of data, while the first consumption and then submission of offset may cause repeated consumption of data.

7.6 User-defined Storage Offset

Before Kafka0.9, offset data is stored in ZooKeeper. After that, offset data is stored in one of Kafka’s built-in topics by default. In addition, Kafka can choose to customize the storage of offset data. Maintenance of Offse is quite cumbersome because it takes into account the consumer’s rebalance process:

Rebalance is triggered when a new consumer joins a consumer group, an existing consumer leaves a consumer group, or a change occurs to the subscribed principal partition.

After a consumer Rebalace occurs, the partition that each consumer consumes changes, so the consumer needs to first obtain the partition to which it has been reassigned, and to continue consuming at the offset position that each partition recently committed. (High Water level)

/ * * *@author caoduanxi
 * @Date 2021/1/13 13:41
 * @MottoKeep thinking, keep coding! * Kafka custom offset commit */
public class CustomOffsetCommit {
    private static Map<TopicPartition, Long> currentOffset = new HashMap<>();

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.129:19092");
        // Set the consumer group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
        // Set autocommit for offset to false
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // The listener interface for Consumer reassignment is defined at the same time as the subscription
        consumer.subscribe(Collections.singletonList("customconsumer"), new ConsumerRebalanceListener() {
            Rebalance is called before the rebalance occurs
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                commitOffset(currentOffset);
            }

            Rebalance is called after this occurs
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                currentOffset.clear();
                for (TopicPartition partition : partitions) {
                    // Locate to the latest offset positionconsumer.seek(partition, getOffset(partition)); }}});while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.printf("offset=%d, key=%s, value=%s\n", consumerRecord.offset(), consumerRecord.key(), consumerRecord.value());
                // Record the current offset
                currentOffset.put(newTopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()); }}}// Obtain the latest offset of a partition
    private static long getOffset(TopicPartition topicPartition) {
        return 0;
    }

    // Submit the offset for all partitions of the consumer
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {}}Copy the code

Offset their record need to submit, using Rebalance partition listener Rebalance event monitoring, in the event of Rebalance, offset submit first, partition then can find the latest offset position to consumption

8. Customize interceptors

Interceptor principle

The Producer interceptor is introduced in Kafka0.10 and is mainly used to customize the control logic of Clients. For the Producer, Interceptor gives the user the opportunity to make some customized requirements on the message, such as modifying the presentation style of the message, before sending the message and before the Producer callback logic. At the same time, Producer allows users to specify multiple interceptors to act on the same message in order to form an interceptor chain. The interface implemented by interceptor is ProducerInterceptor, which mainly has four methods:

  • configure(Map

    configs) : called to get configuration information and initialization data
    ,>

  • OnSend (ProducerRecord record) : This method is encapsulated in the KafkaProducer.send() method, which runs on the main user thread. The Producer makes sure to call this method before the message is serialized, before the partition is computed, and usually before the Producer’s callback logic starts.

  • OnAcknowledgement (RecordMetadata, Exception Exception) : OnAcknowledgement runs in the IO thread of Producer, so don’t put heavy logic in the method, otherwise it will slow the message sending efficiency of Producer.

  • Close () : inteceptor is closed and used to clean up resources.

Inteceptor may be run in multiple threads, so it is necessary to ensure thread safety when using it. In addition, if multiple interceptors are specified, the producer will call them in the specified order. The exceptions that may be thrown by each Interceptor are captured and recorded in the error log instead of passing them up.

Custom add timestamp interceptor

/ * * *@author caoduanxi
 * @Date"2021/1/13 *@MottoKeep thinking, keep coding! * /
public class TimeInterceptor implements ProducerInterceptor<String.String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
                "TimeInterceptor:" + System.currentTimeMillis() + "," + record.value());
    }
    // Omit other methods
}
Copy the code

Custom message send statistics interceptor

/ * * *@author caoduanxi
 * @Date2021/1/13 all *@MottoKeep thinking, keep coding! * /
public class CounterInterceptor implements ProducerInterceptor<String.String> {
    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else{ errorCounter++; }}@Override
    public void close(a) {
        // Output the result and end the output
        System.out.println("Sent successful:" + successCounter);
        System.out.println("Sent failed:"+ errorCounter); }}Copy the code

Add an interceptor to CustomProducer

// Add interceptors
List<Object> interceptors = new ArrayList<>();
interceptors.add(TimeInterceptor.class);
interceptors.add(CounterInterceptor.class);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Copy the code

Note: Interceptor’sclose()Methods are final, so be sure to call themProducer.close()Method or the interceptor’sclose()The method is not called.

9. Kafka monitors Eagle

Eagle

Eagle is open source visualization and management software that allows you to query, visualize, alert, and explore metrics stored anywhere. In short, Eagle gives you the tools to transform Kafka cluster data into beautiful graphics and visualizations.

It is a Web application running on Tomcat.

For detailed installation, refer to the installation of Kafka Monitoring tool Eagle (with high-speed download address).

10. Meet test questions often

Specific reference: Kafka often meet test questions (with personal interpretation of the answer + continuous update)

The above questions are questions that arise during the learning process, and specific interview questions feel like more than just these.

What does ISR(InSyncRepli), OSR(OutSyncRepli), AR(AllRepli) stand for in Kafka?

ISR: indicates the collection of followers whose speed difference with the leader is less than 10 seconds

OSR: A FOLlwer whose speed differs from the leader by more than 10 seconds

AR: indicates the followers of all partitions

What do HW, LEO, etc. in Kafka stand for?

HW: High Water High Water, determined by the lowest LEO in the same partition (Log End Offset)

LEO: the maximum Offset of each partition

How is message sequentiality represented in Kafka?

Within each partition, each message has an offset, so messages are ordered within the same partition, not global order

What about partitions, serializers, and interceptors in Kafka? What is the order of processing between them?

The Partitioner is used to handle the partitioning, that is, the question of which partition the message is sent to. Serializer, which is a tool for serializing and deserializing data. Interceptors, namely Interceptor, are provided with a pre-processing and terminating process for message sending, which is handled smoothly first by == interceptors => serializers => partitions ==

What is the overall structure of the Kafka producer client? How many threads are used for processing? What are they?

Two threads are used: the main thread and the sender thread. The main thread sends data to the RecoreAccumulator thread shared variable through the interceptor, serializer, and divider at one time, and the Sender thread pulls the data from the shared variable and sends it to the Kafka Broker

Batch. size indicates that messages are sent only when the size is reached. If linger.

If the number of consumers in a consumer group exceeds the number of consumers in a topic partition, then some consumers will not be able to consume data.

This statement is true, consumers with more than one partition will not receive data, mainly because messages from a partition can only be consumed by one consumer in a consumer group.

When the consumer submits the consumption shift, does it submit the offset or offset+1 of the latest message that is currently consumed?

The offset of the data sent by the producer starts from 0, and the offset of the data consumed by the consumer starts from 1, so the latest message is offset+1

What are the situations that cause repeat consumption?

Consume first and then submit offset. If the machine outages after consumption, it will cause repeated consumption

What are the scenarios that cause leakage consumption?

If offset is submitted first, it will crash before consumption, which will cause missed consumption

After you create (delete) a topic using kafka-topics. Sh, what logic does Kafka execute behind it?

Zookeeper creates a topic node under the /brokers/ Topics node, such as: /brokers/ Topics /first triggers the Controller’s listener kafka Controller to create topics and update the Metadata cache

Can the number of partitions for a topic be increased? How, if any, can you increase it? If not, why?

You can add or change the number of partitions — ALTER can change the number of partitions

Can the number of partitions for a topic be reduced? How, if any, can it be reduced? If not, why?

Do not reduce the partition, after reducing the partition, the data in the previous partition is difficult to process

Does Kafka have an internal topic? What, if any, is it? What does it do?

Yes, __consumer_offsets are used to save consumer offsets after version 0.9

The concept of partition allocation in Kafka?

For a Kafka cluster, partitioning provides load balancing, improves concurrency and reads efficiency for consumers

What is the log directory structure in Kafka?

Each partition corresponds to a folder named topic-0/topic-1… Each folder contains.index and.log files.

If I specify an offset, how will Kafka Controller find the corresponding message?

Offset indicates the number of the current message. First, you can binary the. Index file in which the current message belongs, and then use seek to locate the position of the current offset in. Through the initial offset and then through the seek to the message in. Log can be found.

What does Kafka Controller do?

In a Kafka cluster, one broker is elected as the Controller, which manages the downlink and downlink of cluster brokers, the allocation of partition replicas for all topics, and the election of the leader. The Controller relies on ZooKeeper for work management.

Where in Kafka do elections take place? What are the electoral strategies in these places?

In the ISR, the Leader needs to be elected, and the first come, first served policy is selected. The Leader and followers need to be elected in a partition.

What is a dead copy? What are the countermeasures?

The ISR kicks out the failed followers whose rate is more than 10 seconds lower than that of the leader and rejoins the ISR within 10 seconds when the rate is close to the leader

What is Kafka’s design that gives it such high performance?

  • Kafka is inherently distributed
  • The log file is segmented and indexed
  • Sequential read and write is used for single nodes. Sequential read and write refers to the sequential append of files, which reduces the disk addressing overhead and is much faster than random write
  • The zero-copy technology is used, and the read and write operations can be completed in kernel mode without switching to user mode, and the data copy times are less.

The message queue interview question suggestion combines the Chinese hucedar – the Internet Java advanced interview training camp to see together will be better!

11. A summary

This time to learn Kafka takes 3 days, basic over the knowledge of Kafka, from Docker install Kafka, ZooKeeper to run to use the command line operation Kafka, to learn the internal principles of Kafka, file storage strategy, partitioning strategy, efficient read and write strategy, file disk strategy, The consumer offset strategy, and the transaction, and the message consumption at least once, at most once, precisely once, and so on. To the end of the use of SpringBoot Kafka integration run a few demos, mainly for the producer and consumer examples, and finally wrote an interceptor example, to the end of some common interview questions, the whole Kafka is finished, but only roughly know the principle of it, feel to understand or need to spend time to see several times! Come on!

Kafka in Silicon Valley: A quick start to the Kafka framework

Keep thinking, keep coding! Written in Nanjing, jan 13, 2021, 20:06:00! Come on!