Apache Kafka plays an important role in real-time data or streaming data architecture due to its high throughput and high reliability, and is favored by many enterprise users. However, with the advent of the cloud era, public cloud vendors have launched message queue services, and many users have gradually transferred from self-built message cluster to using message queue services on the cloud. This paper will take Mogujie Kafka service migration to the upper cloud as an example to explain how Tencent cloud message queue CKafka produces value to users. (Edit: Middleware Q sister)

Apache Kafka profile

Apache Kafka is A distributed Streaming platform. Distributed streaming computing platform, which is described as follows:

Scalable and scalable Kafka® is used for building real-time data Pipelines and Streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

原 文 : Kafka® is used to build real-time data pipelines and streaming applications. It is horizontally scalable, fault tolerant, super fast and can be put into production by thousands of companies.

Kafka, which has been upgraded to 2.0+, adds a new definition of itself as a streaming computing platform. But in enterprise-level usage, Kafka is often used as a data pipeline, performing the basic functions of a message queue. Typical usage scenarios are as follows:

  • Data pipes and systems are decoupled.
  • Asynchronous processing and event driven.
  • Flow peak clipping.
  • Final consistency between transaction messages and distributed transactions.

Pain points for self-built Kafka clusters

Because Kafka is simple and convenient to build, and its performance is efficient and stable, many enterprises choose to build their own Kafka clusters. However, there is an implicit risk behind this seemingly perfect solution: when the amount of message data in the business reaches a certain level, the self-built message queue cluster will cause all kinds of problems, so how to solve the problem?

We all know that Kafka is easy to get started with, but there are barriers to progress. To solve the problem of r & D personnel need to have a solid computer foundation (familiar with computer networks, IO, etc.), and Kafka’s underlying principles, various configuration parameters have a deep understanding, can carry out Kafka cluster parameters tuning, rapid processing of sudden failure, recovery of cluster jitter and dynamic cluster expansion capacity. As a result, some problems arise. On the one hand, enterprises need to invest more human and material costs. On the other hand, they need to monitor the health status of the cluster and troubleshoot problems in time to ensure the stable operation of services. Therefore, self-built Kafka cluster is simple, but it needs to bear increasing r&d and operation and maintenance costs.

Cloud background on Mushroom Street

Mogujie’s business scenario and software architecture determine its strong dependence on Kafka. As a leader in the e-commerce field, its message volume reaches 100 billion messages per day and its peak production bandwidth reaches gigabit per second. Its main business scenarios are distributed big data processing scenarios, such as advertising, transaction, security, offline processing, etc.

After realizing the pain point of self-built Kafka cluster, Mogujie chose to use the message queue service CKafka on the cloud in order to ensure data security and cluster stability. CKafka not only supports multiple availability zones (AZS) Dr, but also helps customers separate hot and cold data and solve the bottleneck of frequent disk I/O reads, ensuring stable service running. Next, we will analyze and explain how CKafka achieves availability zone disaster recovery and high performance cluster server IO.

Cluster cross-availability Dr Solution

In A Kafka messaging system, the client-aware server’s core operations are production and consumption. Cross-availability zone Dr Aims to ensure that clients can produce and consume data without being aware of faults (such as fire or power failure) in an availability zone, ensuring stable service running. To meet the requirements of availability zone Dr, the following problems should be solved at the technical level (the diagram above is taken as an example) :

  • Cross-availability distribution of partitioned copies, which ensures that copies of each partition are distributed in different availability zones. For example, if the cluster spans two azs in District 2 and District 4 in Shanghai and the zone has four duplicates, ensure that two duplicates are distributed in each az.
  • Kafka relies heavily on Apache Zookeeper. When Zookeeper fails to provide services properly, the Kafka cluster is affected. Therefore, cross-zone Dr Must be implemented for Kafka as well as Zookeeper. Like Kafka, Apache Zookeeper supports cross-area DISASTER recovery (Dr).
  • The IP addresses of Broker nodes need to be transparent to clients. The client is not aware of the address of the Broker. In this way, when the back-end Borker fails and the IP of the switching machine changes, the client can still run normally without sensing.

To solve the above problems, the following four technical solutions are needed.

Transparent floating Broker node IP

Why do Broker node IP and ports need to be transparent to clients? Let’s start with the following code:

Properties props = new Properties();
 props.put("bootstrap.servers"."192.168.10.10:9092192168 10.11:9092192168 10.12:9092");
 props.put("acks"."all");
 props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i),
 Integer.toString(i)));
 producer.close();
Copy the code

This is the simplest Kafka Produce code. 192.168.10.10:9092, 192.168.10.11:9092, and 192.168.10.12:9092 are the IP addresses and ports of three real Kafka brokers. Let’s consider the following situation:

When one of the machines, 192.168.10.10, failed and could not be recovered, we restarted another Borker, such as 192.168.10.13:9092, to provide service. At this point, all clients need to be notified to change the Kafka address from: “192.168.10.10:9092192168 10.11:9092192168 10.12:9092” changed to “192.168.10.13:9092192168 10.11:9092192168 10.12:9092”. If the IP configuration is hardcoded into client code, the code needs to be modified, packaged and published. ** Client configuration changes and reboots due to server adjustments are a disaster! ** So how do you solve this problem?

The solution is Virtual IP Address. As shown in the figure below, we mount a four-tier Virtual IP(VIP) and Virtual Port(VPORT) before each provided Broker. Users access the VIP and VPORT to access the actual Broker service. For example, 10.0.0.1:9092 corresponds to the real Broker service 192.169.10.10.9092. This makes the actual Broker IP transparent to the user.

So what is drift? Services need to implement cross-availability disaster recovery. That is, the Virtual IP Address provided by us can switch between available zones. When the available zone fails, the VIP can quickly switch to another available zone and continue to provide services. This VIP should have access to all available areas. As shown in the following figure, when az 2 fails, the Virtual Ip Service automatically switches to an available broker instance in AZ 1 to ensure the normal use of clients.

Cross-region distribution of partition copies

Native Kafka randomly allocates copies based on the principle that copies of the same availability area cannot be allocated on the same machine. Copy distribution logic is no aware available area. When a broker in the cluster has free space, copies are distributed across the broker. It is possible to distribute partitions of the same Partiton in the same partition.

As shown in the Virual IP switchover across availability zones above, when creating a Partition with three Replication replicas, it is highly likely that the Replication of the Partition will end up in Availability zone 2 of Shanghai. If az 2 in Shanghai is faulty, the Partition cannot provide services normally and services are affected. How to solve this problem?

CKafka adds availability markers to the broker, and when it finds that a topic created by a client is a cross-availability topic, copies of the same partition are allocated to multiple availability zones to ensure that there are still surviving copies of the partition if one availability zone fails. By modifying the partition allocation logic of Kafka source code and adding the available area marking logic, different Replicatitons can be allocated to different brokers according to requirements. These brokers belong to different zones of availability. The implementation principle is as follows:

The node /broker/ Topics /test-topic on Zookeeper looks like this:

{"version": 1,"partitions": {"0": [10840108] 39."1": [10838108] 40,"2": [10839108] 38}}Copy the code

The test-topic topic is divided into 0,1, and 2 partitions. The 0 partition is located on the broker[10840,10839], the 1 partition is located on the broker[10838,10840], and so on. Therefore, we only need to modify the content generation logic to control the distribution of Partiton and realize the logic.

Cross-region deployment of Zookeeper

Zookeeper components, which are heavily dependent on Kafka, also need to be deployed across regions to ensure their availability. First, let’s take a look at the election strategy of Zookeeper: The leader can be elected only when more than half of the nodes agree. If the number of nodes is even, the number of votes may be the same, so that the leader will fail to be selected and the cluster will eventually become invalid. In addition, if the number of faulty Nodes in the Zookeeper cluster exceeds half, the Zookeeper cluster cannot work properly.

Based on the distributed consistency algorithm of Zookeeper, it can be concluded that if one Zookeeper node is deployed in each zone, 2N +1 zones must be deployed to ensure that Zookeeper partitions are available to support ZONE N Dr (zK nodes in N zones are also disabled). That is, if n is 1, three availability zones must be deployed to ensure that the single availability zone of the ZooKeeper cluster is available.

Broker configuration optimization

Depending on the design, some parameters need to be adjusted as brokers are deployed in different areas of availability. These parameters ensure the maximum availability of services for cross-area Dr. The following three configurations need to be modified:

unclean.leader.election.enable=true
min.insync.replicas=1
offsets.topic.replication.factor=3
Copy the code

What do these three configurations mean? Take a look at them in turn:

unclean.leader.election.enable

Official description: Indicates whether to enable replicas not in the ISR set to be selected as leader as a last resort, Even though doing so may result in data loss.

Explanation: The default value of this field is False. By default, the leader cannot be selected from the list of non-ISR replicas; Because leader is selected in the non-ISR replica list, some data may be lost. So why open this field? If this field is set to False, the service will simply hang. If this field is set to True, allowing the leader to be selected from a non-ISR list, the service can continue to be used despite the possibility of data loss. Therefore, this parameter must be turned on according to the service characteristics.

min.insync.replicas

Official description: When a producer sets acks to “all” (or “-1”), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be Considered successful.

Explanation: The default value of this field is 1. Indicates that when acks=-1, at least one Replica confirms that data is written successfully. This parameter is often changed to 2 during cluster setup to ensure data integrity. The reason for this change to 1 is to ensure that the client can provide normal service in extreme cases where only one copy is working and everything else dies. If the value is set to 2, when only one copy is working, production fails all the time at the production end, affecting services.

offsets.topic.replication.factor

Official website Description: The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Explanation: This value defaults to 1. Specifies the number of internal topic consumer_offsets in Kafka. The consumer_offsets has only one copy of the partition when the broker on which the copy is located goes down. Any consumer that uses the partition to store the consumption group offset location will be affected, and the offset cannot be committed, resulting in producers being able to send messages but consumers being unavailable. Therefore, you need to set the value of this field to be greater than 1.

Cluster IO pressure optimization solution

Users of self-built messaging clusters often encounter a problem: during peak traffic, cluster I/O pressure is heavy, and users can only temporarily solve the problem by expanding the capacity. However, this is a stopgap measure after all. In order to help users really solve this problem, Tencent Cloud CKafka team conducted in-depth analysis of various indicators and business scenarios on the client server. We found that the largest proportion of CLUSTER I/O pressure is disk read pressure. But why is disk reading stressful? Let’s start by looking at Kafka’s underlying disk storage design.

Kafka disk storage design principle

Kafka’s disk storage design can be summed up in three words: sequential disk reads and writes, Page Cache, and zero copy.

  • Sequential disk read and write: That is, Kafka data is written and read sequentially. According to the locality principle, in actual tests, the performance ratio difference between sequential write and random write can be up to 6000 times.
  • Page Cache: This is the key technology that enables sequential reads and writes in Kafka. In addition, it is a disk cache implemented primarily by the operating system to reduce disk I/O operations. In this way, the data on the disk is cached in the memory, and the access to the disk is changed to the access to the memory. Data in the Page Cache is updated to disk according to certain policies.
  • Zero copy: Copies data directly from disk files to the network adapter device, without application hands. Doing so greatly improves application performance and reduces context switching between kernel and user mode. For Linux, zero-copy technology relies on the underlying sendFile () method implementation. For the Java language, the underlying implementation of the filechannal.transferto () method is also the sendFile () method

Why server read pressure?

In theory, the cluster read pressure should not be so high because most of the read pressure should hit the Page Cache and not be read from disk. In reality, however, there is a lot of disk read activity. After analysis, there are multiple business scenarios in which customers consume the same message. According to the real-time consumption, message consumer behavior can be divided into two categories: real-time consumer and offline consumer.

  • Real-time consumers: they have high requirements for real-time data and need to consume messages in real time. In the real-time consumption scenario, Kafka uses the system’s Page cache to produce messages to the broker, which are then forwarded directly from memory to the real-time consumer with zero disk pressure. This operation is usually called hot reading. Common service scenarios include advertising and recommendation.
  • Offline consumers: also known as periodic consumers, consuming messages that are usually minutes or hours old. Such messages are usually stored on disks and will trigger I/O operations on disks when consumed. Cold read is usually called cold read, which is suitable for periodic service scenarios such as report calculation and batch calculation.

In the case of very high message volumes, simultaneous consumption of a cluster by both real-time and offline consumers can cause two problems:

  • Real-time consumers are affected by offline consumers: As offline consumers consume, falling disk data and real-time data are frequently exchanged into and out of the memory, which directly affects the real-time performance of real-time services and increases the response latency of real-time services.

  • Offline data will cause heavy DISK I/O operations: When the amount of data read by offline tasks is very large, the DISK I/O util reaches 100%, affecting cluster stability.

Method of optimization: cold and hot data separation scheme

In view of the coexisting problem of cold read and hot read in user cluster, we believe that the optimal solution is to separate the cold and hot data of cluster data. How do you separate hot and cold data without changing the behavior at the production end? Tencent Cloud CKafka launched a data synchronization service based on the open source Kafka Connector to solve the above problems. The architecture diagram is as follows:

Broker clusters are split into live and offline clusters. The two clusters are responsible for simultaneously booting offline services and consuming offline clusters. CKafka adds a connector cluster between the two clusters. The Connector cluster synchronizes the messages subscribed by offline services from the real-time cluster to the offline cluster based on the topic dimension. The Connector cluster synchronizes data with real-time consumers in real time. This operation not only has no impact on disk IO, but also has no impact on other real-time consumers.

CKafka’s value to the business

CKafka provides message queue services with high throughput performance and high scalability. It has super advantages in performance, scalability, service security, operation and maintenance, so that users can enjoy low-cost and super functions without tedious operation and maintenance work.

Please scan our wechat official number and look forward to meeting you