Author: bai_nian_min_guo www.cnblogs.com/bainianming…

Overview of Kafka

1.1, definitions,

Kakfa is a distributed publish/subscribe message queue that is primarily used for real-time processing of big data

1.2. Message queues

1.2.1 Traditional Message queue & new message queue mode

Above is the traditional message queue. For example, when a user wants to register information, after the user information is written into the database, there are some other processes, such as sending SMS messages, which need to wait for the completion of these processes before returning to the user

Newer queues, like a user registration, are thrown directly into the database and returned directly to the user for success

1.2.2 Benefits of using message queues

A, decoupling

B. Recoverability

C, the buffer

D. Flexibility & peak processing capability

E. Asynchronous communication

1.2.3 Mode of message queue

A. Point-to-point mode

The message producer sends a message to the message queue, and the message consumer retrieves and consumes the message from the queue. After the message is consumed, it is not stored in the queue. So message consumers cannot consume messages that have already been consumed; Queues support multiple consumers, but only one consumer can consume a message; If you want to send the message to multiple consumers, you need to send the message multiple times

B) Publish/subscribe (one-to-many, no messages are cleared after consumers consume data)

A message producer publishes a message to a topic, and multiple message consumers (subscribers) consume the message. Unlike point-to-point, a message published to a topic is consumed by all subscribers. However, data retention is for a period of time, the default is 7 days, because it is not a storage system; Kafka is this model; There are two ways, one is consumers to take the initiative to consume (pull) messages, rather than producers push messages to consumers; The other is that producers actively push messages to consumers, similar to the public account

1.3 kafka’s infrastructure

Kafka’s infrastructure consists of broker, producer, consumer groups, and currently ZooKeeper

The producer is responsible for sending the message

A broker is responsible for buffering messages. Topics can be created within the broker, and each topic has the concepts of partition and replication

The consumer group is responsible for message processing, and the consumers in the same consumer group cannot consume the data in the same partition. The consumer group is mainly to improve the consumption ability, for example, before one consumer consumed 100 pieces of data, now two consumers consume 100 pieces of data, which can improve the consumption ability. Therefore, the number of consumers in a consumer group should be smaller than the number of partitions; otherwise, there will be no partitions for consumers to consume, resulting in a waste of resources

Note: However, consumers of different consumer groups can consume the same partition data

If Kakfa wants to cluster, all it needs to do is register with a ZK, which also keeps the progress or offset or location of message consumption

Offsets are stored in ZK prior to version 0.9

After version 0.9, offsets are stored in Kafka. Kafka defines a topic dedicated to storing offsets.

Why change it? This is mainly due to the stress on Zk__ due to the frequent offset changes, and the complexity of kafka__ itself

1.4 kafka installation

A, Kafka installation just need to decompress the installation package can be completed

Tar -zxvf kafka_2.11-2.1.1. TGZ -c /usr/local/

B. View the configuration file

[root@es1 config]# pwd
/usr/local/kafka/config
[root@es1 config]# ll
total 84
-rw-r--r--. 1 root root  906 Feb  8  2019 connect-console-sink.properties
-rw-r--r--. 1 root root  909 Feb  8  2019 connect-console-source.properties
-rw-r--r--. 1 root root 5321 Feb  8  2019 connect-distributed.properties
-rw-r--r--. 1 root root  883 Feb  8  2019 connect-file-sink.properties
-rw-r--r--. 1 root root  881 Feb  8  2019 connect-file-source.properties
-rw-r--r--. 1 root root 1111 Feb  8  2019 connect-log4j.properties
-rw-r--r--. 1 root root 2262 Feb  8  2019 connect-standalone.properties
-rw-r--r--. 1 root root 1221 Feb  8  2019 consumer.properties
-rw-r--r--. 1 root root 4727 Feb  8  2019 log4j.properties
-rw-r--r--. 1 root root 1925 Feb  8  2019 producer.properties
-rw-r--r--. 1 root root 6865 Jan 16 22:00 server-1.properties
-rw-r--r--. 1 root root 6865 Jan 16 22:00 server-2.properties
-rw-r--r--. 1 root root 6873 Jan 16 03:57 server.properties
-rw-r--r--. 1 root root 1032 Feb  8  2019 tools-log4j.properties
-rw-r--r--. 1 root root 1169 Feb  8  2019 trogdor.conf
-rw-r--r--. 1 root root 1023 Feb  8  2019 zookeeper.propertiesCopy the code

C. Modify the configuration file server.properties

Set broker.id this is the unique identifier that distinguishes each node in a Kafka cluster

D. Set the kafka data storage path

Note: This directory cannot contain other non-Kafka directories, otherwise kafka cluster cannot start

E. Set whether kafka topics can be deleted. By default kafka topics are not allowed to be deleted

F. The retention period of Kafka data is 7 days by default

G. Maximum size of a Log file. If the size of a Log file exceeds 1 GB, a new file will be created

H, Kafka zK address and Kafka connection timeout

J. Number of default partitions

Recommended reading: Master Kafka in 6 steps.

1.5. Start Kafka

1. Kafka can only be started on A single node, so each Kakfa node needs to be started manually

B. Startup mode 2. Daemon startup mode, recommended

1.6. Kafka operation

A. Query the existing topics in the Kafka cluster

Note: this is connected to ZooKeeper, not kafka

Create a topic and specify the number of fragments and replicas

Note:

Replication-factor: indicates the number of copies

Replication-factor: indicates the number of partitions

Topic: Topic name

If the current kafka cluster has only three broker nodes, the replication-factor is at most 3. In the following example, an error will be reported if the replication-factor is 4

C) delete topic

D. Check topic information

Start producer production messages. Kafka comes with a producer and consumer client

A, start A producer. Note that port 9092 is connected to the kafka cluster

B. Start a consumer. Note that port 9092 is still connected, and port 2181 was connected before version 0.9

Here we start two consumers to test it

Note: By default, each consumer belongs to a different consumer group if a consumer group profile is not specified

C, send a message, you can see that each consumer can receive the message

D. Actual data in Kakfa

Second, Kafka architecture in-depth

Kafka does not guarantee global ordering of messages, only within partitions, because consumers consume messages randomly in different partitions

2.1 Kafka workflow

Messages in Kafka are categorized by topic, producer generated messages and consumer consumed messages, both topic-oriented

Topic is a logical concept, while partition is a physical concept

Each partition has the concept of a copy

Each partition corresponds to a log file, which stores the data generated by the producer. The data generated by the producer will be continuously added to the end of the log file, and each data has its own offset, and consumers will record their consumption to that offset in real time. The offset is stored in the index file in order to continue consuming from the last location in case of an error

Kafka’s offsets are ordered within partitions, but are not ordered between partitions. Kafka does not guarantee global order

2.2. Kafka principle

Since messages generated by producers are constantly appended to the end of log files, Kafka uses a fragmentation and index mechanism to divide each partition into multiple segments. Each segment corresponds to two files —-index and log. The two files are located in the same folder. The folder is named topic name + partition number

Indx and log file names are offset of the smallest data currently indexed

How does Kafka consume data quickly?

The Index information of data stored in Index file, the first column is offset, the second column is the offset of the log file corresponding to the data, just like we read the file, use seek () to set the current mouse position, so that the data can be found faster

If you want to consume data whose offset is 3, first use the binary method to find which index file the data is in, and then use the index offset to find the data offset in the log file. In this way, data can be quickly located and consumed

So Kakfa stores data on disk, but it is still very fast to read.

Follow the wechat public account: Java Technology Stack, reply to: Architecture in the background, you can get the N architecture dry goods I sorted out.

Kafka producers and consumers

3.1. Kafka producers

Partition Kafka partition

The main reason for Kafka partitioning is to provide concurrency and improve performance, because read and write are written on a partition basis.

To which partition does the producer send the message?

A. Specify A partition on the client

B. Poll (recommended) Message 1 to P1, Message 2 to P2, message 3 to P3, Message 4 to P1, message 5 to P2, message 6 to P3……

3.2 How does Kafka ensure data reliability? Guaranteed by ack

To ensure that the data sent by the producer can be reliably sent to the specified topic, each partition of a topic needs to send an ACK (confirming receipt) to the producer after receiving the data sent by the producer. If the producer receives an ACK, the next round of sending will be carried out. Otherwise, the data will be re-sent

So when does Kafka send an ACK to a producer

Ensure that the follower and the leader are synchronized. The leader sends an ACK to the producer to ensure that data will not be lost after the leader dies and a new leader is elected from the followers

Then how many followers will send an ACK when the synchronization is complete

Scenario 1: Send an ACK if half have completed synchronization

Solution 2: Send an ACK after all synchronization is complete.

After adopting the second scheme, imagine the following scenario: the leader receives the data and all the followers start to synchronize the data. However, one follower fails to complete the synchronization due to some fault. The leader has to wait until the synchronization is complete before sending an ACK, which greatly affects the efficiency. How to solve this problem?

The Leader maintains a dynamic list of ISRs (synchronization replicas), and only the followers in the list need to synchronize with the Leader. When the follower data in the ISR is synchronized, the leader sends an ACK to the producer. If the follower does not synchronize data to the leader for a long time, the follower will be removed from the ISR. This time threshold is also user-defined. If the leader fails, a new leader is elected from the ISR

How do I choose the ISR nodes?

Firstly, the communication time should be fast, and the communication with the leader should be completed quickly. The default time is 10s

Then look at the leader data gap, which defaults to 10000 messages (removed later)

Why it should be removed: Because Kafka sends messages in batches, the leader receives them immediately, but the followers have not been pulled, so they are frequently kicked out and added to the ISR. This data is stored in the ZK and memory, so the ZK and memory are frequently updated.

However, for some unimportant data, the reliability of the data is not very high and can tolerate a small amount of data loss. Therefore, there is no need to wait for the followers in the ISR to accept all the data successfully

So Kafka provides the user with three levels of reliability that the user can trade off based on reliability and latency. This setting is set in kafka generation: the acks parameter setting

A. The value of acks is 0

Producers don’t wait for ACK, but just drop data to a topic, which has a high probability of dropping data

B. Ack is 1

After the Leader drops the disk, it returns an ACK, causing data loss. If the Leader fails after synchronization, data loss occurs

C, ack = -1 (all)

An ACK is returned only when the Leader and follower (ISR) disks fall off, resulting in data duplication. If a fault occurs when the Leader and follower (ISR) disks fall off, data duplication occurs. For example, the communication between the follower and the leader is very slow, so there is only one leader node in the ISR. At this time, the leader finishes dropping the disk, and an ACK will be returned. If the leader fails at this time, data will be lost

3.3 How does Kafka ensure consistency of consumption data? Guaranteed by HW

LEO: indicates the maximum offset of each follower

HW (high water level) : refers to the largest offset that consumers can see, and the smallest LEO in the LSR queue. That is to say, consumers can only see data from 1 to 6, and cannot see the following data and consume them

To avoid the failure of the leader, for example, after the current consumer consumes 8 data, the leader fails. At this time, for example, F2 becomes the leader and f2 does not have 9 data at all, so the consumer will report an error. Therefore, the parameter HW is designed to expose the least data to the consumer to avoid the above problems

3.3.1 HW Ensures consistency of data stores

A. Follower failure

After the Follower recovers, the Follower reads the last HW recorded on the local disk and intercepts the part of the log file that is higher than the HW to synchronize data with the leader from the HW. If the follower’s LEO is greater than or equal to the Partition’s HW, that is, after the follower catches up with the leader, the follower can join the LSR again

B. The Leader is faulty

If the Leader fails, a new Leader is selected from the ISR. Then, to ensure data consistency among multiple copies, the remaining followers intercept the log files whose values are higher than hW (the new Leader does not intercept the logs themselves) and synchronize data from the new Leader

Note: This is to ensure consistency of data storage across multiple replicas, and does not guarantee data loss or duplication

3.3.2 Precision once (idempotence) to ensure that data is not repeated

If Ack is set to -1, data will not be lost, but at least once will occur.

If Ack is set to 0, data will not be duplicated, but data will not be lost at most once.

But what if you have your cake and eat it? That’s when Exactl once was introduced.

After version 0.11, idempotence was introduced to address data duplication within Kakfa clusters, and before version 0.11, it was handled by consumers themselves

If idempotent is enabled, the ack default is -1. Kafka assigns a PID to each producer, not a SEqnumber to each message. If pid, partition, and seqnumber are the same, Kafka considers duplicate data and does not store it on disk. However, if the producer dies, there will also be data duplication. So idempotence addresses data duplication in a single partition in a single session, but it does not address data duplication between partitions or across sessions

3.4 Consumers of Kafka

3.4.1 Consumption mode

There are two kinds of message queue consumer message way, push (WeChat public) and pull (kafka), push mode cannot adapt to the consumption rate of different consumers, because consumer sending rate is determined by the broker, his goal is as far as possible to the fastest speed send messages, but it is easy to cause the consumer to process the message, Typical manifestations are denial of service and network congestion. The pull approach allows consumers to consume messages at an appropriate rate of consumption power

The drawback of Pull is that if Kafka has no data, the consumer may fall into an infinite loop and return empty data. In this case, kafka’s consumer passes a timeout parameter when consuming data. If there is no data available, the consumer will wait a certain amount of time before returning

3.4.2 Partition Allocation Policy

A consumer group has multiple consumers, and a topic has multiple partitions. Therefore, partition allocation is inevitably involved, that is, to determine which partition is consumed by which consumer

Kafka provides two methods: RountRobin for topic groups and Range for individual topics

Rotation training: The prerequisite is that all the consumers in one consumer subscribe to the same topic. Otherwise there will be problems; Non-default mode

Consumers in the same consumer group cannot consume the same partition at the same time

For example, three consumers consume nine sections of a topic

If there are two consumers in a consumer group, this consumer group consumes two topics at the same time, and each topic has three partitions

First, the two topics are treated as one topic, then hash by topic and partition, and then hash by hash. Then the training was assigned to two consumers in a consumer group

What if you subscribe in the following way?

For example, there are 3 topics, and each topic has 3 partitions, and there are 2 consumers in a consumer group. Consumer 1 subscribes to topic1 and topic2, and consumer 2 subscribes to topic2 and topic3. In such a scenario, there would be a problem with subscribing to topics in rotation

What if you subscribe in the following way

For example, if there are two topics, each of which has three partitions, and a consumer group has two consumers, consumer 1 subscribing to TopIC1 and consumer 2 subscribing to Topic2, there will also be problems in subscribing to topic2 through rotation training

So we’ve been emphasizing that the premise of subscribing to a topic using rotational training is that all consumers in a group subscribe to the same topic;

So rotation is not kafka’s default

Range: is divided by topic, the default allocation

Range’s problems will lead to unbalanced consumer data

For example, if a consumer group subscribes to 2 topics, consumer 1 consumes 4 partitions and another consumer consumes only 2 partitions

When will the partitioning policy be triggered? When the number of consumers in a consumer group changes, it triggers a zoning policy adjustment, such as adding or reducing consumers in a consumer group

3.4.3 Maintain offset

In the process of consumption, consumers may encounter failures such as power outage and downtime. After recovery, consumers need to continue to consume from the position before the failure. Therefore, consumers need to record which offset they consume so that they can continue to consume after recovery of the failure

Offset stores two positions: zk and Kafka

Let’s first look at saving offset to Zk

The unique offset is determined by the consumer group, topic, and partition elements

So if a consumer in the consumer group dies, or consumers can still get the offset

The Controller node communicates with ZK and synchronizes data. The node is the one who gets up first and registers the Controller first. The one is the Controller. The information about other nodes and controllers is synchronized

3.4.5 Case of consumer Group

Change the consumer group ID

Start a consumer to send 3 pieces of data

Specify consumer group to start consumers, start three consumers, you can see that each consumer consumed a piece of data

In the demonstration that different groups can consume the same topic, we see that the consumers of two consumers consume the same data

Start again a consumer that belongs to a different consumer group

Kafka’s efficient read and write mechanism

4.1. Distributed deployment

Multiple nodes operate in parallel

4.2. Write disks in sequence

Kafka’s producer produces data that is written to a log file and continues writing to the end of the file. Sequential writes to the same disk can reach 600M/S, while random writes only reach 100K/S. This is due to the mechanical structure of the disk, sequential writing is fast because it saves a lot of head addressing time

4.3. Zero copy technology

In normal cases, data is first read to the kernel space, then read from the kernel space to the user space, then write to the kernel space after tuning the IO interface of the operating system, and finally to the hard disk

Kafka does this by running IO streams directly in kernel space, so Kafka is very high performance

Zookeeper in Kafka

A single broker in a Kafka cluster is elected as a controller, which is responsible for the up-down of the cluster broker, the allocation of partition copies to all topics, and the election of the leader.

Read more on my blog:

1.Java JVM, Collections, Multithreading, new features series tutorials

2.Spring MVC, Spring Boot, Spring Cloud series tutorials

3.Maven, Git, Eclipse, Intellij IDEA series tools tutorial

4.Java, backend, architecture, Alibaba and other big factory latest interview questions

Life is good. See you tomorrow