Kafka cluster

Kafka uses Zookeeper to maintain information about the brokers. Each broker has a unique identity broker.ID that identifies it within the cluster, which can be configured in the configuration file server.properties or generated automatically by the program. The following is the automatic creation of the Kafka Brokers cluster:

  • Every time a broker starts, it will be in Zookeeper/brokers/idsCreate one under pathTemporary nodeAnd will ownbroker.idWrite to register itself with the cluster;
  • When there are multiple brokers, all of them are competitively created on Zookeeper/controllerNodes. Since the nodes on Zookeeper are not duplicated, only one broker must be created, which is called the Controller Broker. In addition to other broker functions,It is also responsible for managing the state of subject partitions and their copies.
  • A Watcher event registered with Zookeeper is triggered when the broker breaks down or exits, causing the Zookeeper session to time out. Kafka then performs fault tolerance. A new Controller election is also triggered if the controller Broker goes down.

Ii. Duplicate mechanism

To ensure high availability, Kafka partitions are multi-replica, and if one replica is lost, partition data can be retrieved from other replicas. However, this requires the data of the corresponding copy to be complete, which is the basis of Kafka data consistency, so you need to use controller Broker for specialized management. Kafka’s replica mechanism is explained in detail below.

2.1 Partitions and Copies

Kafka themes are divided into partitions, which are Kafka’s basic unit of storage. There can be multiple copies per partition (which can be specified at theme creation time using the replication-factor parameter). One replica is the Leader replica. All events are sent directly to the Leader replica. The other copies are Follower replicas, which need to be replicated to keep the data consistent with the leader replica. When the leader replica is unavailable, one of the Follower replicas becomes the new leader.

2.2 ISR mechanism

Each zone has an IN-sync Replica (ISR) list to maintain all synchronized and available replicas. A leader copy must be a synchronous copy, and a follower copy must satisfy the following conditions to be considered a synchronous copy:

  • There is an active session with Zookeeper, that is, the heartbeat must be sent to Zookeeper periodically.
  • Messages were retrieved from the master replica with low latency within the specified time.

If the replica does not meet the above criteria, it will be removed from the ISR list and will not be added again until it meets the criteria.

Here is an example of a topic creation: use –replication-factor to specify a replica coefficient of 3. After successful creation, use the –describe command to see that partition 0 has 0,1, and 2 replicas, all of which are in the ISR list, where 1 is the leader replica.

2.3 Incomplete election of chief

For copy mechanism, the level of broker has an optional configuration parameters unclean. Leader. Election. The enable, the default value is false, forbid not completely represent the leader of the election. This is for when the princes duplicates of hang up and there are no other available in the ISR, whether to allow an incomplete copy of synchronous be leader and this may lead to loss of data or data inconsistency, in some higher requirements for data consistency of the scene (such as the financial sector), it may not be able to tolerate, so its default value is false, Set to true if you can allow some data inconsistency.

2.4 Synchronize at least one Copy

Another parameter related to the ISR mechanism is min.insync.replicas, which can be configured at the broker or topic level to represent at least several replicas available in the ISR list. Assuming a value of 2, the entire partition is considered unavailable when the number of available replicas is less than this value. At this point the client to write data to the partition will throw an exception when org.apache.kafka.com mon. Errors. NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required.

2.5 Sending Confirmation

Kafka has an optional ACK parameter on the producer, which specifies how many partitions copies must receive the message before the producer considers the message to have been written successfully:

  • Acks =0: the message is considered successful when sent and does not wait for any response from the server;
  • Acks =1: The producer receives a successful response from the server as soon as the cluster leader receives the message.
  • Acks =all: The producer receives a successful response from the server only when all participating nodes have received the message.

Data request

3.1 Metadata request mechanism

Of all replicas, only the lead replicas can read and write messages. Since the lead copy of different partitions may be on different brokers, if a broker receives a Partition request but the lead copy of the Partition is Not on the broker, it will return an error response of Not a Leader for Partition to the client. To solve this problem, Kafka provides a metadata request mechanism.

Each broker in the cluster caches partitioned copies of all topics, and clients periodically send and send metadata requests, which are then cached. The interval for periodically refreshing metadata can be specified by configuring metadata.max.age.ms for the client. With metadata information, the client knows the broker of the lead copy and then sends read and write requests directly to the corresponding broker.

If the election of the Partition copy occurs within the time interval of the scheduled request, it means that the original cached information may be outdated, and the wrong response may be received from Not a Leader for Partition. In this case, the client will request to issue metadata again, and then refresh the local cache. Then go to the correct broker to perform the corresponding operation as shown below:

3.2 Data visibility

Note that not all data stored on the partition leader can be read by the client. To ensure data consistency, only data stored by all synchronous replicas (all replicas in the ISR) can be read by the client.

3.3 zero copy

All data in Kafka is written and read using zero copy. The differences between traditional copy and zero copy are as follows:

Four copies and four context switches in traditional mode

Take sending disk files over the network as an example. In traditional mode, the file data is read into the memory and then sent out through the Socket, as shown in the following pseudocode.

buffer = File.read
Socket.send(buffer)
Copy the code

This process actually takes place four times. The file data is first read into the kernel-state Buffer (DMA copy) through a system call. The application then reads the memory-state Buffer data into the user-state Buffer (CPU copy). The user program then copies the user-mode Buffer data to the kernel-mode Buffer (CPU copy) when sending data through the Socket, and finally copies the data to the NIC Buffer through DMA copy. At the same time, there are also four context switches, as shown in the figure below:

Sendfile and transferTo implement zero-copy

The Linux 2.4+ kernel provides zero copy via the SendFile system call. After the data is DMA copied to the kernel Buffer, the data is directly DMA copied to the NIC Buffer without CPU copying. This is where the term zero copy comes from. In addition to reducing data copying, performance is greatly improved because the entire read file to the network is sent by a single sendFile call with only two context switches. The zero-copy process is shown in the following figure:

In terms of implementation, Kafka transfers data through TransportLayer. The transferFrom method of its subclass PlaintextTransportLayer implements zero copy by calling FileChannel’s transferTo method in Java NIO, as shown below:

@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    return fileChannel.transferTo(position, count, socketChannel);
}
Copy the code

Note: transferTo and transferFrom do not guarantee the use of zero copy. Whether or not you can actually use zero-copy depends on the operating system. If the operating system provides a zero-copy system call like SendFile, then the two methods will take full advantage of zero-copy through such a system call, otherwise they cannot implement zero-copy by themselves.

Four, physical storage

4.1 Partition Allocation

When creating a theme, Kafka first decides how to allocate partitioned copies between brokers, following the following principles:

  • Distribute partition copies evenly across all brokers
  • Ensure that each copy of the partition is distributed across different brokers;
  • If you usebroker.rackParameter specifies the rack information for the broker, so as much as possible, copies of each partition are allocated to brokers on different racks to avoid the entire partition becoming unavailable when one rack becomes unavailable.

For the above reasons, if you create a 3-copy topic on a single node, the following exception is usually thrown:

Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactor   
Exception: Replication factor: 3 larger than available brokers: 1.
Copy the code

4.2 Partition Data Retention Rules

Data retention is a fundamental feature of Kafka, but Kafka does not keep data forever and does not delete messages until all consumers have read them. Instead, Kafka configures a data retention period for each topic, specifying how long data can be retained before it is deleted or how much data can be retained before it is cleansed. They correspond to the following four parameters:

  • log.retention.bytes: Indicates the maximum amount of data allowed before deleting data. The default value is -1, indicating no limit.
  • log.retention.ms: Specifies the number of milliseconds to save data files. If this parameter is not set, this parameter is usedlog.retention.minutesThe default value is null.
  • log.retention.minutes: Specifies the number of minutes to retain data files. If this parameter is not set, this parameter is usedlog.retention.hoursThe default value is null.
  • log.retention.hours: Number of hours to retain data files. The default value is 168, or one week.

Because finding and deleting messages in a large file is time-consuming and error-prone, Kafka splits the partition into fragments. The fragments that are currently writing data are called active fragments. Active fragments are never deleted. If you keep the data for a week by default and use a new fragment every day, you will see that one of the oldest fragments is deleted while using a new fragment every day, so there will be 7 fragments in the partition most of the time.

4.3 File Format

The format of the data stored on disk is usually the same as the format of the message sent by the producer. If a producer sends a compressed message, messages from the same batch are compressed together, sent as a “wrapped message” (in the format shown below), and then saved to disk. Consumers then read and unpack the message themselves to get the details of each message.

The resources

  1. Neha Narkhede, Gwen Shapira,Todd Palino(Translated). The Definitive Guide to Kafka. Posts and Telecommunications Press. 2017-12-26
  2. Kafka high performance architecture approach

See the GitHub Open Source Project: Getting Started with Big Data for more articles in the big Data series