Illustration: pixabay

Last week, I made an internal sharing in the company about kafka science popularization. Summary output:

What is Kafka?

Open source messaging engine system. Stream processing platform. We’re talking more about “message queues”.

What is stream processing?

Streams are data. Handling is action. Stream processing is the act of continuously calculating results from data. It is more applicable to the following scenarios:

  • Monitoring alarm
  • Log flow processing
  • BI model training
  • .

What is MQ that we always talk about?

The message queue. The message queue

Messages are data. A queue is a container for messages. First-in, first-out data structures, you’re all familiar with that.

So what is the nature of it?

Send – deposit – receive

How kafka stacks up in MQ?

Referenced from MQ contrast selection

features ActiveMQ RabbitMQ RocketMQ Kafka
Single machine throughput The throughput is an order of magnitude lower than RocketMQ and Kafka The throughput is an order of magnitude lower than RocketMQ and Kafka At class 100,000, RocketMQ is also a MQ that can support high throughput One hundred thousand levels, Kafka’s biggest advantage is throughput, generally with big data class system to real-time data calculation, log collection and other scenarios
Impact of the number of topics on throughput Topics can be in the hundreds or thousands, with a small drop in throughput. This is an advantage of RocketMQ, which can support a large number of topics on the same number of machines As the number of topics goes from a few dozen to a few hundred, the throughput drops dramatically. So Kafka tries to keep the number of topics under control for the same number of machines. More machines are needed to support large scale topics
timeliness Ms level Microsecond level, which is a major feature of RabbitMQ, minimizes latency Ms level The delay is within ms level
availability High availability based on master-slave architecture High availability based on master-slave architecture Very tall, distributed architecture Very high, Kafka is distributed, multiple copies of one data, a few machines down, no data loss, no unavailability
Message reliability There is a low probability of losing data After parameter optimization configuration, zero loss can be achieved After the parameter configuration, the message can be achieved zero loss
Function support The MQ domain is extremely functional Based on Erlang development, so the concurrency performance is very strong, excellent performance, low latency MQ has complete functions and good distributed scalability The function is relatively simple, mainly support the single MQ function
advantage Very mature, powerful, and used in a number of companies and projects in the industry Erlang language development, excellent performance, very low latency, throughput ten thousand levels, MQ function is complete, very good management interface, active community; Internet companies use it more The interface is simple and easy to use, the Product of Ali is guaranteed, the throughput is large, the distributed extension is convenient, the community is active, the large-scale Topic is supported, the complex business scenario is supported, and the customized development can be carried out based on the source code High throughput, MS level delay, high availability and reliability, easy distributed scaling
disadvantage There is a low probability of losing messages occasionally, and community activity is not high Throughput is low, Erlang voice development is not easy to customize, and dynamic cluster scaling is troublesome The interface does not follow the standard JMS specification, some system migrations require significant code changes and there is a risk that the technology will be abandoned Repeated consumption of messages is possible
application Mainly used for decoupling and async, less used in large-scale throughput scenarios Have to use It is used for large-scale throughput and complex services It is widely used in real-time computing and log collection of big data and is the industry standard

Why did XXX choose Kafka as a unified queue? (omitted)

  • Maintenance costs
  • High availability
  • Technology stack

What are the performance benefits of Kafka?

Zero copy – for reads

Zero copy Kafka

So what’s a zero copy?

Zero-copy technology, because we don’t copy the data at the memory level, that is, we don’t move the data through the CPU, all the data is transferred through DMA.

Batch compression

For logging class scenarios, consider compression. Compression is not recommended in other scenarios. Compression consumes additional CPU.

There is no batch sending if it is sent synchronously. When sent in bulk, the messages are compressed together; when sent individually, each message is compressed separately. We know that gZIP compression is poor when files are very small, and may even crush more than the source file.

Sequential write to disk

In the case of sequential read and write, the disk sequential read and write speed is equal to that of memory

Because hard disks are mechanical, each read and write will address -> write, where addressing is a “mechanical action”, it is the most time consuming. So hard disks hate random I/O most and love sequential I/O most. Kafka uses sequential I/O to speed up reading and writing to a hard disk.

Batch, speaking, reading and writing

A Consumer in Kafka can pull up multiple pieces of data at a time and submit the offset all at once.

A Kafka sender can also hoard multiple messages to be sent at once, using batch.size to set the batch size. However, this can only be applied to a single partition, that is, multiple messages are sent to the same partition.

Both the consumer and the sender have two parameters to control the batching policy. One has to do with size, one has to do with time, and either has to do with time. You can understand it yourself.

At the moment we are using more batch processing on the consumer side.

Partition segment + index

This relates to kafka’s storage model.

Let’s start with a simple kafka:

Step 1: Download + start ZK + Start Server + Create topic

Step 2: Send a few messages

Step 3: Consume the message

Step 4: View log files

Step 5: View the index file

Step 6: View the time index file

Through some practical observations above, we find that:

  • Each partition generates a log folder

  • Each folder contains at least 3 files called segments (this is a logical group).

    • The index file. Offset index file, mapping between message offsets and physical addresses, to quickly locate the location of messages in the log file
    • The log file. Detailed log file
    • Timeindex file. Time index file, according to the timestamp to find the corresponding offset information, that is, the corresponding.index file.
  • The offset in the.index file and the timestamp in the.timeindex file are monotonically increasing. Why? Because Kafka’s indexing mechanism uses sparse indexes, which are simply segmented. Kafka does not store all the offset mappings to the physical address. It stores one at a time and one at a time. If you want to find a message, and you know the offset, you first find the current offset based on the offset. Find the offset at the beginning and end of the segment, according to the offset at the end of the segment, go to the log file to find the corresponding interval binary search for the corresponding MSG.

So why should Kafka have sparse indexes?

Prevent the index file is too large, search laborious

So why is Kafka segmented?

Prevent a single log file from being too large for easy search.

Select * from segment where offset is located, select * from segment where offset is located, select * from segment where offset is located.

Again, we’re going to do this by dichotomy

Let’s take a look at a picture to further understand the above text

Here are the commands to execute:

# download kafka wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz CD kafka_2. 11-1.0.0 mkdir logs # modified log directory Vim config/server. The properties & the Dirs = # logs start zk bin/zookeeper - server - start. Sh - daemon config/zookeeper. Properties Sh --create --zookeeper localhost:2181 --replication-factor 1 -- Partitions 5 --topic Liuli -test # Check topic bin/kafka-topics. Sh --list --zookeeper localhost:2181 # Check topic bin/kafka-topics Config /server.properties # Start kafka producer terminal./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Liuli - test # to check the kafka index file (according to the offset lookup). / bin/kafka - run - class. Sh kafka. View DumpLogSegments - files . / logs/liuli - test - 4/00000000000000000000. # index view kafka log file. / bin/kafka - run - class. Sh kafka. View DumpLogSegments - files . / logs/liuli - test - 4/00000000000000000000. The log - print - data - log # to check the time index file (according to the timestamp lookup). / bin/kafka - run - class. Sh Kafka. Tools. DumpLogSegments -- files. / logs/liuli - test - 4/00000000000000000000. Timeindex # news consumption ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic liuli-test --from-beginningCopy the code

A few octave questions for Kafka?

How does Kafka guarantee sequentiality?

How did this problem arise?

The default partition policy in Kafka is rotation. If neither partition nor hash code is specified. Messages are sent to the partition on the server according to the default policy. Then it is possible for the data of the same primary key to go into different partition queues. The consumer can guarantee the serial processing of the data of a single partition, but cannot guarantee the serial processing of the data of different partition queues. So there will be a situation where messages sent later are consumed first.

How to solve it?

Single partition,

How to ensure kafka does not lose data?

Copy mechanism, synchronous sending, manual ACK

How does Kafka guarantee idempotent data?

The reliability of message delivery is what Kafka promises to producers and consumers about the messages they want to process. There are three types of common commitments:

  • At Most Once: Messages may be lost, but they are never sent twice.
  • At least once: The message is not lost, but may be sent repeatedly.
  • Exactly once: Messages are not lost or sent twice.

How does Kafka do it exactly once? In short, this is done through two mechanisms: Idempotence and Transaction.

To specify the idempotence of Producer, it is easy to set only one parameter: enable.idempotence. After the enable.idempotence is set to true, the Producer is automatically upgraded to an idempotence Producer. The underlying principle is the classic space for time optimization, which is to save more fields on the Broker. When a Producer sends messages with the same field value, the Broker automatically knows that these messages have been repeated and can silently “discard” them in the background. However, it can only guarantee idempotency on a single partition and a single session.

Core principles of Kafka

Kafka architecture

Several concepts need to be understood to understand Kafka

  • Topics are used to categorize messages, and every piece of information that enters Kafka is placed under a Topic
  • Broker The host server used to implement data storage. Each Broker is an instance of a Kafka service, and multiple brokers constitute a Kafka cluster. Messages published by producers are stored in the brokers, and consumers pull messages from the brokers for consumption
  • Partition Messages in each Topic are divided into several partitions to improve the efficiency of message processing. A Topic can be divided into multiple partitions. Each Partition is an ordered queue. Each message in the Partition has an ordered offset (Offest).
  • Offset. Message shift, which represents the position information of each message in the partition, is a monotonically increasing and unchanging value.
  • Up. Duplicates. In Kafka, the same message can be copied to multiple places to provide data redundancy. These places are called duplicates. Duplicates are also divided into leader duplicates and follower duplicates, each with different roles. Replicas are at the partition level, that is, each partition can be configured with multiple replicas to achieve high availability.
  • Consumer Offset. Consumer shift represents the consumption progress of consumers. Each consumer has its own consumer shift.
  • Producer Indicates the Producer of the message
  • Consumer A Consumer of messages
  • Consumer Group A Group of Consumer instances that consume multiple partitions simultaneously to achieve high throughput.
  • Coordinator. This process is known in Kafka as the Rebalance.
  • Consumer Offset. Consumer shift represents the consumption progress of consumers. Each consumer has its own consumer shift.
  • ISR. Class cadre list. The ISR is set so that when the broker is down, the partition leader is elected from the ISR list.
  • AR. Assigned Replicas: all Replicas.
  • OSR. AR – the ISR.
  • Rebalance. Rebalancing the process by which a consumer instance within a consumer group dies and other consumer instances automatically reassign subscribed topic partitions. Rebalance is an important way to achieve high availability on the consumer side of Kafka.

The controller

  • Theme manager
  • Partition management
  • The election
  • Cluster Member Management
  • Data services

Kafka election

  1. Controller election
  2. District duplicate election
  3. Consumer election

Controller election. The controller is the broker, and a Kafka cluster has multiple broker nodes. The broker leader listens for information about other brokers, including partition status, ISR list, and replicas. If a broker leader fails, the followers will scramble for the leader position. If a follower is suspended, the leader reads the status of the suspended broker on the ZK and notifies the other brokers. If a duplicate leader exists on the broker, the leader will trigger a duplicate election.

In summary, it selects a controller among all brokers, and the Leader election of all partitions is determined by the controller. That is, select the controller first. After the elections, they then control the regional elections.

The controller notifies the responding Broker of Leader changes directly via RPC (which is more efficient than the ZooKeeper Queue)

What are the advantages?

  • Prevent fissure
  • Prevent herd effect

Constituency duplicate election. A partition produces multiple copies distributed across multiple brokers. A leader copy is elected to handle external services, and all requests received by other copies are forwarded to the leader copy.

Consumer elections. Select a leader from a consumer group to coordinate the consumption of a CONN partition. When a consumer exits, the partition corresponding to this consumer is allocated to another consumer group for consumption.

ISR collection. In Sync Replicas: synchronizes the Replicas.

AR. Assigned Replicas: all Replicas.

OSR. AR – the ISR.

kafka consumer group

Relationships between consumer groups and consumers to topics and partitions

1. A partition in a single topic can only be subscribed by one consumer in the same consumer group.

2. A consumer group can subscribe to multiple topics. But not recommended.

Ideally, the number of Consumer instances should equal the total number of partitions to which the Group is subscribed. It is also possible for a consumer to consume several partitions

Where is the consumer shift stored?

  • zk
  • Kafka internal

Kafka producer

kafka rebalance

Essentially a protocol that specifies how each consumer in a consumer group agrees to assign each partition under a subscription topic. Rematch consumer resources with partition queues to achieve load balancing

When does a Consumer Group Rebalance? There are three triggers for Rebalance.

  1. The number of group members changed. Procedure
  2. The number of subscribed topics changed
  3. The number of partitions subscribed to the topic has changed

During the Rebalance, all Consumer instances will stop making purchases and wait for the Rebalance to complete.

When all brokers are started, the Coordinator component is created and enabled. That is, each Broker has its own Coordinator component.

When a Consumer application submits a shift, it submits the shift to the Broker where the Coordinator resides. Similarly, when a Consumer application is started, requests are sent to the Broker where the Coordinator resides, and the Coordinator performs metadata management operations such as registering the Consumer group and managing member records.

What’s the downside of Rebalance? In summary, there are three points as follows:

  1. STW affects the rate of consumption
  2. slow
  3. Inefficient, requires all members to participate. Can’t you refer to the consistent hash?

To avoid the situation of group members leaving or exiting accidentally due to various parameters or unreasonable logic, the main parameters related to which are:

  • Session.timeout.ms (session)
  • Heartbeat.interval. Ms (heartbeat)
  • Max.poll.interval.ms (maximum interval between two pulls)
  • The GC parameter

Why does Kafka not allow both Leader replicas and followers to provide services? Support read/write separation like Redis and MySQL?

  • Reading is consistent with writing
  • The master-slave delay problem

How does the I/O model relate to Kafka?

In fact, The Underlying Kafka client uses Java selectors. On Linux, selectors are implemented by epoll, and on Windows, by SELECT. So deploying Kafka on Linux has an advantage at this point because of the more efficient I/O performance.

Why is Kafka designed for partitioning? Is it a good idea to use multiple themes?

  • Provides load balancing capabilities
  • In order to achieve high scalability of the system
  • Implement business-level message order

Unclean Leader Election

Can be understood as a dirty copy of the election. The normal election is from the ISR replicas, because the replicas in the ISR set are synchronized with the leader, so there are always some replicas that are not performing well and are too far behind the leader replicas. These replicas certainly cannot enter the ISR replica set. But what if all the ISR copies are dead? No one is left. In order to ensure the availability of the cluster, it is necessary to select the leading replica from the lagging replica. This is called Unclean leadership election. The downside of Unclean leadership election is that it can lead to data loss.

What is high water level (to be studied)

Is a state used to represent a message shift.

  1. Define message visibility, which identifies which messages under the partition are available for consumption by consumers
  2. Help Kafka complete replica synchronization

Application of Kafka in XXX (Omitted)

reference

acecodeinterview.com/kafka/

Geek Time Kafka Core Technology and Practical column

References

[1] MQ contrast selection: *note.dolyw.com/mq/00-MQ-Se…

[2] Kafka zero copy: *blog.csdn.net/qq_37865420…