Basic introduction to Kafka

Kafka is a distributed, partitioned, multi-copy, multi-subscriber, zooKeeper coordinated distributed logging system (also known as MQ system). Kafka is commonly used for Web/Nginx logging, access logging, messaging, and more. Linkedin contributed to the Apache Foundation in 2010 and became a top open source project, Java Core Learning Notes Sharing.

The application scenarios are as follows: Log collection system and message system.

The main design goals for Kafka are as follows:

  • The message persistence capability is provided in O(1) time complexity to ensure constant time access performance even for data larger than TB.
  • High throughput. Even on very cheap commercial machines, it can be done on a single machine to support 100K messages per second.
  • Supports message partitioning between Kafka servers and distributed consumption, while ensuring the sequential transmission of messages within each partition.
  • Both offline data processing and real-time data processing are supported.

Kafka design principle analysis

A typical Kafka cluster contains producers, brokers, consumers, and a Zookeeper cluster. Kafka uses Zookeeper to manage cluster configuration, elect the leader, and rebalance when the Consumer group changes. Producer uses push mode to publish messages to the broker, and consumer uses pull mode to subscribe to and consume messages from the broker.

Kafka terms:

  • Broker: Message-oriented middleware processes nodes. A Kafka node is a Kafka Broker. Multiple Brokers can form a Kafka cluster.
  • Topic: A type of message. A Kafka cluster can distribute multiple topics simultaneously.
  • Partition: A physical grouping of topics. A topic can be divided into multiple partitions, each of which is an ordered queue.
  • Segment: Partition Physically consists of multiple segments.
  • Offset: Each partition consists of a series of ordered, immutable messages that are continuously appended to the partition. Each message in a partition has a sequential sequence number called offset, which uniquely identifies a message.
  • Producer: Publishes messages to Kafka Broker.
  • Consumer: Message Consumer, the client that reads messages to Kafka Broker.
  • Consumer Group: Each Consumer belongs to a specific Consumer Group.

Transaction characteristics of Kafka data transfer

  • This is similar to a JMS “nonpersistent” message, which is sent once and will not be resent, regardless of success or failure. The consumer fetches the message, then saves the offset, then processes the message; When the client saves the offset, an exception occurs during the message processing, causing some messages to fail to be processed. Then “unprocessed” messages will not be fetched. This is “at most once”.
  • At least once: A message is sent at least once. If the message is not received successfully, the message may be sent again until the message is received successfully. The consumer fetches the message, processes the message, and then saves the offset. If the saving operation fails due to zooKeeper exception in the offset saving stage after the message processing is successful, the next fetch may obtain the message processed last time, which is “at least once”. Cause The offset is not submitted to ZooKeeper in time, and ZooKeeper is restored to the original offset state.
  • Exactly once: The message is sent only once. Kafka is not strictly implemented (based on 2-phase commit), and we don’t think this strategy is necessary in Kafka.

Usually, “at-least-once” is our first choice.

Kafka message storage format

Topic & Partition

A topic can be thought of as a class of messages, and each topic will be divided into multiple partitions, each of which is an Append log file at the storage level.

In a Kafka file store, there are multiple partitions under the same topic, and each partition is a directory. The partiton is named as topic name + ordered number. The number of partitions for the first partiton starts from 0, and the maximum number of partitions is reduced by 1.

  • Each partion is equivalent to a huge file divided equally among multiple equally-sized segment data files. However, the number of messages in each segment file may not be the same. This feature allows old segment files to be deleted quickly.
  • Each partiton only needs to support sequential reads and writes. The segment file life cycle is determined by server configuration parameters.

In this way, unnecessary files can be deleted quickly and disk utilization can be improved.

  • Segment file: Consists of two main files: index file and data file. These two files correspond to each other in pairs. The suffix “.index” and “.log “denote the segment index file and data file respectively.
  • The name of each subsequent segment is the offset value of the last message in the preceding segment. Values are up to 64 bits long, 19 digits long, and no digits are padded with zeros.

The physical structure of index and data file in the segment is as follows:

In the figure above, the index file stores a large amount of metadata, and the data file stores a large amount of messages. The metadata in the index file points to the physical offset address of Message in the corresponding data file.

The third message is represented in the data file (368772 message is represented in the global partiton), and the physical offset address of the message is 497.

The segment data file consists of a number of messages. The physical structure of the message is as follows:

Parameter Description:

8 Byte offset Each message in the PARition partition has an ordered ID number, which is called offset, which uniquely determines the position of each message in the parition partition. Offset specifies the size of the Partiion. Message4 Byte message sizemessage 4 Byte CRC32 Specifies the size of the Partiion. Message1 byte magic specifies the Kafka protocol version 1 Byte “Attributes” represents a standalone version, an identity compression type, or an encoding type. 4 Byte key length Indicates the key length. When the key is -1, the K byte key field is not filled in. K Byte Key Optional Value bytes Payload Indicates the actual message data.

Replication policy

The guarantee of Kafka’s high reliability comes from its robust replication policy.

  1. Data synchronization

Kafka did not provide Replication for partitions prior to version 0.8. Once the Broker went down, all partitions on it became unserviceable, and partitions did not back up data, reducing data availability. Therefore, a Replication mechanism was provided after 0.8 to guarantee Broker failover.

After Replication is introduced, there may be multiple replicas in the same Partition. In this case, a Leader needs to be selected among these replicas. The Producer and Consumer only interact with the Leader. Other replicas act as followers and copy data from the Leader.

  1. Copy Placement strategy

For better load balancing, Kafka tries to evenly distribute all partitions across the entire cluster.

The algorithm of Kafka allocation Replica is as follows:

  • Sort all N surviving Brokers and partitions to be allocated
  • Allocate the i-th Partition to the (I mod N) Broker. The first Replica of this Partition exists on the allocated Broker and will be the priority copy of the Partition
  • Allocate the JTH Replica of the ith Partition to the ((I + j) mod n) Broker

Suppose there are four brokers in the cluster, and a topic has four partitions, each with three replicas. The following is the copy allocation on each Broker.

  1. Synchronization strategies

When publishing messages to a Partition, Producer first uses ZooKeeper to find the Leader of the Partition, and then, regardless of the Replication Factor of the Topic, Producer sends the message only to the Leader of the Partition. The Leader writes the message to its local Log. Each Follower pulls data from the Leader. In this way, the followers store data in the same order as the Leader. The Follower sends an ACK to the Leader after receiving the message and writing its Log. Once the Leader receives all Replica ACKS in the ISR, the message is considered committed. The Leader adds the HW and sends the ACK to the Producer.

To improve performance, each Follower sends an ACK to the Leader as soon as it receives data, rather than waiting until the data is written to the Log. Therefore, Kafka can only ensure that a committed message is stored in the memory of multiple replicas, but not persisted to disk. Therefore, it cannot be completely guaranteed that the message will be consumed by consumers after an exception occurs.

The Consumer reads messages from the Leader, and only messages that have been committed are exposed to the Consumer.

The data flow for Kafka Replication is shown below:

For Kafka, defining whether a Broker is “alive” involves two conditions:

  • One is that it must maintain a session with ZooKeeper (this is done through ZooKeeper’s Heartbeat mechanism).
  • Second, followers must be able to copy the Leader’s messages in a timely manner and not “fall too far behind”.

The Leader keeps track of the Replica list that is synchronized with the Replica. This list is called ISR (in-sync Replica). If a Follower crashes, or falls too far behind, the Leader will remove it from the ISR. The “too far behind” described here refers to the fact that the number of messages copied by the Follower falls behind the Leader exceeds the predetermined value or that the Follower does not send the fetch request to the Leader for a certain period of time.

Kafka only resolves fail/ Recover. A message is not considered committed until all followers in the ISR have copied it from the Leader. This prevents some of the data from being written to the Leader and crashing before it can be copied by any followers, resulting in data loss (consumers cannot consume the data). The Producer, on the other hand, can choose whether to wait for the message to commit. This mechanism ensures that as long as the ISR has one or more followers, a committed message will not be lost.

  1. Leader election

The Leader election is essentially a distributed lock, and there are two ways to implement zooKeeper-based distributed locks:

  • Unique node name: When multiple clients create a node, only the client that successfully creates the node can obtain the lock
  • Temporary order nodes: All clients create their own temporary order nodes in a directory, and only the one with the smallest sequence number gets the lock

The Majority Vote election strategy is similar to ZooKeeper’s Zab election. In fact, ZooKeeper itself implements the Majority rule election strategy. Kafka uses the first method for electing the leader copy of a Partition: The first copy of the successfully created ZNode is the Leader node. The other copies register Watcher listeners on this ZNode. If the Leader fails, the corresponding temporary nodes will be deleted automatically. At this point, all followers registered on the node receive listener events and attempt to create the node. Only the Follower who successfully creates the node becomes the Leader (ZooKeeper ensures that only one client can be successfully created on a node). Other followers continue to re-register to listen for events.

Kafka message grouping, message consumption principle

A message on the same Topic can only be consumed by one Consumer within the same Consumer Group, but multiple Consumer groups can consume the message at the same time.

This is what Kafka uses to broadcast a Topic message (to all consumers) and unicast (to a single Consumer). A Topic can correspond to multiple Consumer groups. If you want to implement broadcasting, you can just have a separate Group for each Consumer. To implement unicast, all consumers need to be in the same Group. Consumer groups also allow consumers to Group freely without having to send messages multiple times to different topics.

Push vs. Pull

As a messaging system, Kafka follows the traditional pattern of having producers push messages to the broker and consumers pull messages from the broker.

The push pattern is difficult to accommodate consumers with different consumption rates, because the message sending rate is determined by the broker. The goal of the push mode is to deliver messages as quickly as possible, but this can easily cause consumers to fail to process messages, typically through denial of service and network congestion. The Pull pattern consumes messages at an appropriate rate based on the Consumer’s ability to consume.

For Kafka, the pull mode is more appropriate. The Pull pattern simplifies broker design and allows consumers to control the rate at which messages are consumed, as well as the way they consume them — either in bulk or on a piece-by-piece basis — and to choose different delivery methods to implement different transport semantics.

Kafak sequential write and data read

The producer is responsible for sending data to Kafka. Kafka writes all the messages it receives to its hard disk. Kafka never loses any data. To optimize write speed Kafak uses two techniques, sequential write and MMFile.

Sequential writes

Because hard disks are mechanical, each read and write is addressed, and addressing is a “mechanical action” that is the most time consuming. So hard disks hate random I/O and prefer sequential I/O. To speed up reading and writing hard drives, Kafka uses sequential I/O.

Each message is appended to this Partition and is sequentially written to the disk, which is highly efficient.

Kafka does not delete data. Kafka keeps all data. Each Consumer has an offset for each Topic to indicate the number of messages it has read.

Even with sequential writes to the hard drive, it’s impossible for the hard drive to catch up with the memory. So instead of writing data to hard disk in real time, Kafka takes advantage of modern operating systems’ paged storage to use memory to improve I/O efficiency.

In Linux Kernal 2.2 after the appearance of a called “zero-copy” system call mechanism, is to skip the “user buffer” copy, establish a direct mapping of disk space and memory space, data is no longer copied to the “user buffer” system context switch reduced 2 times, can improve double performance.

With MMAP, processes read and write to memory (virtual machine memory, of course) just as they read and write to hard disks. You can get a big I/O boost in this way, eliminating the overhead of user-to-kernel copying (the read of the calling file puts the data into kernel memory and then copies it into user-space memory).

Consumer (reading data)

Imagine a Web Server sending a static file, how to optimize? The answer is zero copy. In traditional mode we read a file from the hard disk like this.

First copy to kernel space (read is a system call, so it is in DMA), then copy to user space (1, 2); Copy it from user space to kernel space (the socket you’re using is a system call, so it has its own kernel space), and finally send it to the nic (3, 4).

Zero Copy goes directly from the kernel space (DMA) to the kernel space (Socket) and sends the nic. This technique is very common and is used by Nginx.

In effect, Kafka stores all messages in a file, and Kafka sends “files” directly to consumers when they need data. When there is no need to send out the entire file, Kafka calls Zero Copy’s sendfile function, which includes:

  • Out_fd as output (usually the handle to the socket)
  • In_fd as the input file handle
  • Off_t indicates the offset of in_fd (where to start reading from)
  • Size_t means how many to read