Hi, I’m Howie. Nice to meet you again.

I was wondering if you could summarize and share some kafka-related interview questions.

Today we will arrange a series of core interview questions about Kafka, from the basic knowledge, advanced promotion, architecture tuning three directions comb the interview questions, I hope in the golden three silver four key nodes can help you.

Due to a lot of content, I plan to divide it into three parts. This is the first part of the interview series, mainly output basic knowledge of the interview questions.

This article has a lot of dry material. I hope you can read it patiently.

Kafka Basics 15 Q

What is Kafka and what are the scenarios?

Kafka is a distributed streaming platform for building streaming applications in real time. It is mainly used in the field of real-time processing of big data. Kafka is the first choice for message queues due to its high performance, high throughput, high availability, low latency and scalability.

Its main design objectives are as follows:

1) High performance: it provides message persistence capability with time complexity of O(1), ensuring constant time access performance even for data above TB level.

2) High throughput and low delay: it can support the transmission of hundreds of thousands of messages per second on a single machine even on a very cheap machine, and maintain millisecond delay.

3) Persistence and reliability: messages are persisted to disk eventually, and a data backup mechanism is provided to prevent data loss.

4) Fault tolerance: Supports cluster node fault Dr. Even if a Kafka service node in a Kafka cluster goes down, it does not affect the function of the whole system (if the number of replicas is N, n-1 nodes are allowed to fail).

5) High concurrency: it can support thousands of clients to conduct read and write operations at the same time.

Its adaptation scenarios mainly include:

1) Log collection direction: Kafka can be used to collect logs of various services and output them uniformly. For example, the log system ELK uses Kafka for data transfer.

2) Message system direction: Kafka has system decoupling, copy redundancy, traffic peak clipping, message buffering, scalability, fault tolerance and other functions, but also provides the guarantee of message sequence and message backtracking function.

3) Big data real-time computing direction: Kafka provides a complete set of streaming processing framework, which is widely used in big data processing, such as integration with Flink, Spark, Storm, etc.

What are the core components of Kafka and what do they do?

Basic concepts for Kafka’s core components:

1) Producer: a message Producer, a client that sends messages to Kafka Broker.

2) Consumer: message consumers, clients that read messages from Kafka Broker.

3) Consumer Group: a Consumer Group consisting of multiple consumers. Each consumer in the consumer group is responsible for consuming data from different partitions to improve consumption power. A zone can only be consumed by one consumer in the group, and different consumer groups do not affect each other.

4) Broker: A Kafka service node is a Broker. A cluster is composed of one or more brokers, and a Broker can hold multiple topics.

5) Topic: a logical concept. Topic classifies messages. Producers and consumers are oriented to the same Topic, and the message contents of the partitions under the same Topic are different.

6) Partition: In order to achieve Topic scalability and improve concurrency, a very large Topic can be distributed to multiple brokers, and a Topic can be divided into multiple partitions for storage, and the message content of each Partition is ordered.

7) up: In order to implement the function of data backup, Kafka provides a replication mechanism to ensure that when a node fails, the Partition data on the node is not lost, and Kafka can continue to work. For this reason, Kafka provides a replication mechanism. One Leader replica and several Follower replicas.

8) Leader: the Leader is the master copy of multiple copies per partition, the object for which the producer sends data, and the object for which the consumer consumes data.

9) Followers: refers to the slave copies of multiple copies in each partition, which synchronizes data from the Leader copy in real time and keeps the synchronization with the Leader data. When the Leader fails, a Follower is elected as the new Leader and cannot be on the same Broker as the Leader to prevent data from being recovered from a crash.

10) Offset: the location information of consumer consumption. It monitors the location of data consumption. When consumers hang up and resume, they can continue consumption from the consumption location.

What does Zookeeper do in Kafka?

ZooKeeper is responsible for Kafka cluster metadata management, cluster coordination, and connecting to ZooKeeper when each Kafka server is started.

Kafka uses Zookeeper to store cluster metadata, cluster member management, Controller elections, and other administrative tasks. After The KRaft proposal is completed, Kafka will be completely independent of Zookeeper.

1) Cluster metadata: All data of Topic partitions is stored in Zookeeper, and the data saved by Zookeeper prevails.

2) Cluster member management: Broker node registration, deletion, and property change operations. It mainly includes two aspects: the management of the number of members, which is mainly reflected in new members and removal of existing members; Management of individual members, such as changing data for individual brokers.

3) Controller election: the Controller that elects the Broker cluster. In fact, in addition to the general Broker function, it also has the function of electing the topic partition Leader node. When the Kafka system is started, one of the brokers is elected as the controller, responsible for managing the topic partitions and replica state, as well as performing the administrative tasks of partition reallocation. If the current controller becomes unavailable due to a failure while the Kafka system is running, the Kafka system elects a new controller from other functioning brokers.

4) Other management tasks: including but not limited to Topic management and parameter configuration, etc.

There are two reasons for removing Zookeeper dependencies from Kafka 3.x 2.8:

1) Cluster operation and maintenance: Kafka itself is a distributed system, and if it relies heavily on Zookeeper, cluster operation and maintenance costs and system complexity are very high.

2) Cluster performance level: The architecture design of Zookeeper is not suitable for such high-frequency read/write update operations, because the previous submission displacement operations are stored in Zookeeper, which will seriously affect the performance of The Zookeeper cluster.

What are the modes for producers to send messages?

Kafka producers send messages in three main modes:

Send after forget to send mode

Fire-and-forget mode, which simply sends a message, doesn’t care if the message was successfully sent. In essence, messages are sent asynchronously. Messages are first stored in the buffer and then sent in batches after certain conditions are met. This is the most throughput method in Kafka, but it is also the least reliable method for messages, because no processing is done for messages that fail to be sent, and messages can be lost under certain exceptions.

ProducerRecord<k,v> record = new ProducerRecord<k,v>("this-topic", key, value); Try {// the fire-and-forget mode producer. Send (record); } catch (Exception e) { e.printStackTrace(); }Copy the code

Synchronous transmission mode

In sync mode, call the send() method and return a Future object. Then call the Get () method of the Future object and wait for the result to return. Based on the returned result, you can determine whether the message is successfully sent. Only if the message returns data via GET () will the next message be sent.

ProducerRecord<k,v> record = new ProducerRecord<k,v>("this-topic", key, value); Future.get () Future = producer.send(record); RecordMetadata metadata = future.get(); } catch (Exception e) { e.printStackTrace(); } producer.flush(); producer.close();Copy the code

Asynchronous sending mode

Async: A callback function is invoked when the send() method is called. The callback function is triggered when the Broker receives a return. The producer does not terminate until the callback completes, otherwise it is blocked.

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {        
   //intercept the record, which can be potentially modified; 
   //this method does not throw exceptions        
   ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);        
   return doSend(interceptedRecord, callback);
}
Copy the code

Each of the above three methods has its own characteristics, and the specific business application scenario is suitable for which one:

1) Scenario 1: If the service only cares about the throughput of messages and allows a small number of messages to fail to be sent, and does not care about the order in which the messages are sent, then the fire-and-forget mode can be used with the parameter acks = 0. In this way, the producer does not need to wait for a response from the server and sends the message at the maximum speed supported by the network.

2) Scenario 2: If the service requires messages to be sent sequentially and the data can be sent on only one Partition, you can use the sync mode and set the retries value to allow multiple retries when the message fails to be sent. In combination with the acks=all & MAX_IN_FLight_requestS_per_connection =1 parameter, producers can send only one message before receiving a successful response from the server and flush the message immediately after receiving a successful response to control the message sending sequence.

Scenario 3: If the business needs to know whether the message is sent successfully but does not care about the order of the message, it can send the message in the asynchronous async + callback function with the parameter REtries =0. When the message fails to be sent, the failed message is recorded in a log file for subsequent processing.

Why did Kafka design partitions?

Kafka server can store too many Topic messages. If you are under a lot of pressure to write messages, you will be able to write the messages in a single table. This eventually leads to throughput bottlenecks on the Kafka server, so Kafka designs partitions as well as load balancing and scale-out capabilities, as shown in the following figure.

1) Load balancing: When sending messages, data can be evenly distributed according to the number of partitions, so that the data falls on different partitions, which improves the concurrent write performance; Multiple subscribers can consume data from one or more partitions at the same time to support massive data processing capability and improve message read performance.

2) Horizontal expansion: A Topic can be divided into multiple partitions, and different partitions can be deployed on different physical nodes as much as possible, which is very convenient for expansion. Another consumer can consume data in multiple partitions, but horizontal expansion cannot be fully played in this way. And that’s where the consumer group comes in, and we use the consumer group to consume the entire Topic, a consumer consumption sub-topic.

How do producers select partitions when sending messages?

When sending messages, the producer selects a partition policy in the following four ways:

1) Polling: Sequential distribution of messages. This ensures that messages are evenly distributed across all partitions in the order in which they are sent. Kafka uses this policy by default if the key is null when the message is created.

2) Message key specifies partition policy: Kafka allows you to define a key for each message. That is, when the message is created, the key is not empty. Kafka hashes the message based on the key, then modulates the Partition to the specified Partition based on the hash value. The advantage of this is that messages with the same key will be sent to the same partition. In this way, Kafka can not guarantee global order, but can ensure that the messages in each partition are ordered. This is the message partition ordering, which is used in scenarios where orders and payments are expected to be ordered. Partition ordering can be achieved by sending messages with the order ID as the key.

3) Random strategy: Randomly send messages to a partition, which seems to evenly disperse messages to each partition, but the performance is still not as good as polling strategy. If uniform distribution of data is pursued, it is better to use polling strategy.

4) custom strategy: can be realized by org. Apache. Kafka. Clients. Producer. The Partitioner interface, rewrite the partition method to get the custom partition effect.

How does Kafka set the number of partitions properly, as many as possible?

How to set the number of partitions in Kafka

First, we need to understand that achieving load balancing at the Partition level is the key to achieving high throughput. The appropriate number of partitions can achieve the purpose of parallel read/write and load balancing, which needs to be estimated according to the target throughput of producers and consumers of each Partition.

At this point, we can follow certain steps to calculate and determine the partition number:

1) First determine the initial value of partitions based on experience such as the amount of data currently received by a Topic.

2) Then, the throughput of Producer end and Consumer end are tested for this Topic.

3) Test results, assuming that their values are TpProducer end throughput and Tc negative Consumer end throughput respectively, and the total target throughput is Tt, in MB/s, then numPartition = Tt/Max (Tp, Tc).

4) Special note: Testing Tp is usually easy because the logic is very simple: just send messages directly to Kafka. Testing TCS, however, is usually more complicated because it has to do with what else the application does after consuming the message.

Two, the more partition Settings, the better?

One of the reasons for Kafka’s high throughput in the first place is that messages in a Topic are balanced across different brokers in a Kafka cluster through partitions.

Theoretically, the more partitions a Topic has, the more throughput the entire cluster can achieve. But is it really true that the number of Kafka Topic partitions configured in actual production is as good as possible? Apparently not! What are the drawbacks and problems caused by too many partitions? We can conduct in-depth analysis from the following four directions:

Use memory analysis

1) Broker side: There are many components that maintain partition level caches in memory, such as Controller, FetcherManager, etc. Therefore, the more partitions, the higher the cost of such caches.

2) Producer side: For example, the parameter batch.size is 16KB by default. It caches messages for each partition, and when the data has accumulated to a certain size or enough time, the accumulated messages are removed from the cache and sent to Broker nodes. This feature is designed to improve performance, but as the number of partitions increases, the memory footprint required for this part of the cache increases.

3) Consumer end: The number of consumers is directly linked to the number of partitions. The memory usage for message consumption and the number of consumers that need to be enabled for higher throughput performance will also increase with the number of partitions.

Consume file handle aspect analysis

In Kafka’s Broker, each Partition corresponds to a directory in the disk file system. In Kafka’s log file directory, each log data segment is allocated three files, two index files and one data file. Each Broker opens two index file handles and one log data file handle for each log segment file. Therefore, as the number of partitions increases, the number of file handles that need to be kept open increases and may eventually exceed the limit configured by the underlying operating system.

End-to-end delay analysis

What is end-to-end delay in Kafka? The time required between the Producer publishing the message and the Consumer receiving the message is the time taken by the Producer to receive the message minus the time taken by the Producer publishing the message.

In Kafka, only committed messages are guaranteed with maximum persistence against loss, so Kafka does not expose messages to consumers until after they have been committed. The more partitions there are, the more data needs to be synchronized between replicas, assuming that messages need to be exposed after the list of ISR replicas has been synchronized. Therefore, the time it takes to copy data between collections of ISR replicas will be the most significant part of Kafka’s end-to-end latency.

This can be mitigated by scaling up the Kafka cluster. For example, if we put 100 partition leaders into one Broker node versus 10, the latency is different between them. For example, in a cluster of 10 Broker nodes, each Broker node needs to handle data replication for an average of 10 partitions. The end-to-end delay is now a tenth of what it was.

As a rule of thumb, limiting the number of partitions per Broker node is a good idea if you are particularly concerned with message latency: For N Broker nodes and replication replica-factor F Kafka cluster, the number of partitions in the whole Kafka cluster should not exceed 100 * N * F. That is, the number of leaders of a single Broker node Partition does not exceed 100.

Analysis of high availability

We know that Kafka achieves high availability and stability of clusters through multi-copy replication technology. Each Partition has multiple copies of data, each on a different Broker. Among all data copies, one data copy is Leader and the other data copies are followers.

Within a Kafka cluster, all data replicas are managed in an automated manner and data is kept in sync between all replicas. When the Broker fails, all partitions of the Broker where the Leader copy resides become temporarily unavailable. Kafka automatically selects a Leader from among the other replicas to receive requests from the client. This process is automatically elected by the Kafka Controller node Broker.

Normally, when a Broker stops serving in a planned way, the Controller will remove all Leader copies on the Broker one by one before the service stops. The movement speed of a single Leader copy is very fast. From the perspective of the customer, planned service outage will only lead to a very short window of system unavailability.

However, when the Broker is not out of service normally, the system’s unavailability window will depend on the number of partitions affected. If the Broker is a Controller node, the Controller node will automatically recover from the failure. However, the new Controller node needs to read the metadata information of each Partition from Zookeeper to initialize the data. If a Kafka cluster has 10,000 partitions and each Partition takes about 2ms to recover metadata from Zookeeper, Controller recovery will add about 20 seconds to the window of unavailability.

In general, more partitions in a Kafka cluster results in higher throughput. However, if the total number of partitions in a Kafka cluster is too large or a single Broker node has too many partitions, it may have a potentially negative impact on system availability and message latency, which needs our attention.

How do I guarantee that messages in Kafka are ordered?

We know that in Kafka, messages are not guaranteed to be globally ordered, but partitions are guaranteed to be unordered from one partition to another. So how do you guarantee that messages in Kafka are ordered? It can be analyzed from the following three aspects:

The production end Producer

In the final scenario analysis of question 4, which sending mode does the producer have, it is simply explained, and here it is analyzed in detail:

Kafka’s Producer sends messages. Without setting any default parameters and without network jitter, messages can be sent in batches to the Kafka Broker in the order in which they are sent. However, once there is network fluctuation, the message may appear out of order.

Therefore, to ensure that Kafka sends messages in an orderly manner, first consider using synchronous mode to send messages. Two synchronous modes are as follows:

1) set the message response parameters acks = all & Max. In the flight. Requests. Per. The connection = 1 the sender will be in a message is issued, the response must meet acks set parameters, will send a message. Although it is still used asynchronously, the underlying message is already sent one by one.

When KafkaProducer send() is called, the return Future object’s get mode blocks and waits for the result. The result can be used to determine whether the message was sent successfully. Because synchronous sending blocks, the next message will be sent only when the message returns data via GET ().

Message retransmission and idempotent problems can also occur with the above methods:

When a message fails to be sent, Kafka determines whether the message can be automatically retried. If the message can be automatically recovered, retries > 0.

2) Idempotent Issues In versions after Kafka 1.0, the Producer end introduced idempotent features. With enable. Idempotence = true, the idempotent feature adds a sequence number to a message, incrementing the sequence number by 1 each time it is sent. Opens the Kafka Producer, after the power features such as we can by setting the parameters of Max. In the flight. Requests. Per. The connection = 5 default values, such as Kafka message, If a message fails to be sent, Kafka compares the sequence number of the last several logs on the server with the sequence number of the message that needs to be sent again. If the sequence number is continuous, the message can be sent again to ensure the sequence of the message.

The service side Broker

In Kafka, a Topic is only a logical concept, and the Partition that makes up the Topic is where the messages are actually stored.

Kafka only ensures that messages within a single partition are ordered, so if you want to ensure strict global order, you need to set Topic to a single partition. However, for businesses, global order is generally not needed. It is only necessary to ensure that different types of messages in the business are in order.

However, there is an issue that must be taken into account. When we change the number of partitions, messages that would have previously been assigned to the same Partition will be assigned to other partitions due to a simple Hash algorithm. Message order is not guaranteed. In this case, you need to consider the business impact when dynamically changing partitions. You may need to reclassify messages based on business and current partitioning requirements.

The Consumer end Consumer

On the Consumer side, according to Kafka’s model, each partition within a Topic can belong to only one Consumer within that Topic’s Consumer group.

When messages are sent to the same Partition, consumers must retrieve data from the Partition in order and without disorder.

However, consumers may have multiple threads consuming messages concurrently. If a single thread consumes data, the throughput is too low, and multiple threads concurrently consume data, the order may be out of order.

At this time, you can write messages with the same key to the same queue by writing multiple memory queues. Then, for multiple threads, each thread sends messages to a queue to ensure the message order.

Why doesn’t Kafka support read/write separation?

In many master-slave systems, slave nodes are allowed to provide read services, but Kafka avoided data inconsistencies by using the master node to provide services uniformly.

Read/write separation is not supported for two reasons:

1) Inconsistent scenarios: The read/write separation architecture is suitable for scenarios where read operations are heavily loaded but write operations are relatively infrequent, but Kafka is not suitable for this scenario.

2) Latency: Kafka synchronizes data in PULL mode, so there is inconsistency between the Leader copy and the Follower copy. If the Follower copy is allowed to provide read services, messages will lag.

What are the two types of Kafka replicas and what do they do?

In Kafka, to implement the function of “data backup”, to ensure that a node in the cluster fails, Partition data on the node is not lost, Kafka can continue to work. Each Partition of a Topic has several copies, one Leader copy and several Follower copies.

1) Leader The master copy is responsible for providing read and write data services externally.

2) The Follower replica only synchronizes data with the Leader replica and does not provide any external services.

Can Kafka manually delete messages?

First, Kafka supports manual deletion of messages, but of course it provides a message retention policy that automatically deletes expired messages.

Kafka stores messages to a disk. As data is written to the disk, the disk space becomes larger and larger. In order to control the disk space, messages need to be cleaned up. In The Kafka Log structure analysis, each Replica corresponds to a Log. The Log can be divided into multiple Log segments. This makes it easier for Kafka to clear logs.

1) Plain messages: We can delete messages using the kafka-delete-records command or programmably calling the admin.deleterecords method. In both cases, Admin’s deleteRecords method is called to indirectly delete messages by elevating the partition’s LEO value.

2) Set the key and parameter cleanup.policy=delete/campact message: you can rely on the function provided by the Log Cleaner component to delete the key message.

Log Retention: Deletes invalid Log segments based on certain Retention policies.

Log Compaction: Consolidates the key of each message, leaving only the last version of the message when different values have the same key.

Log delete

Kafka’s LogManager has a dedicated log cleaning task that periodically detects and deletes log segments that do not meet the criteria. Here we can by setting the Kafka Broker the parameters of the retention. Check. Interval. Ms, the default value is 300000, 5 minutes.

There are three retention strategies in Kafka:

Time based strategy

The log deletion task periodically checks whether the retention time of the current log file exceeds the specified threshold retentionMs to find deletableSegments, a set of log segment files that can be deleted.

RetentionMs can be determined by the size of these parameters at the Kafka Broker end

Ms > log.retention. Minutes > log.retention. Hours Priority. By default, only log.retention.

Note here: Deleting the expired log segment file is not simply calculated according to the modification time of the log segment file, but according to the maximum timestamp largestTimeStamp in the log segment. First, the timestamp index file corresponding to the log segment should be queried to find the last index data of the timestamp index file. The value is used if the timestamp value is greater than 0, otherwise lastModifiedTime is used.

Delete steps:

1. Remove the segment to be deleted from the skip list of the segment maintained by the Log object to ensure that there are no threads to read the segment.

2. Add.deleted to all files corresponding to the log segment, including index files.

3. A delay task named delete-file is assigned to delete the files with the suffix.deleted. The command is executed once every minute by default. You can run file.delete.delay.ms to configure the command.

Log size based policy

The log deletion task periodically checks whether the current log size exceeds the specified threshold retentionSize to find deletableSegments, a set of log segment files that can be deleted.

RetentionSize can be set using the Kafka Broker log.retention. Bytes parameter. The default value is -1, which is infinite.

Note here that log.retention. Bytes sets the size of all log files in the log, not the size of individual log segments. A single log segment can be set using the log.segment.bytes parameter. The default size is 1 gb.

Delete steps:

1. Calculate the difference between the total Size of log files and retentionSize, that is, the total Size of logs to be deleted.

2. Then start from the first log segment in the log file to find the file set of deletable log segments (deletableSegments)

3. You can delete it.

Based on the log start offset

The policy is based on whether the baseOffset of the next log segment is less than or equal to logStartOffset. If yes, you can delete the log segment.

The deletion steps are shown below:

1. First run through each log segment from the beginning. The start offset of the next log segment of log segment 1 is 20, which is smaller than the size of logStartOffset.

2. The start offset of the next log offset of log segment 2 is 35, which is also smaller than the size of logStartOffset. Add deletableSegments to log segment 2.

3. The start offset of the next log offset of log segment 3 is 50, which is also smaller than the size of logStartOffset. Add deletableSegments to log segment 3.

4. After the next log offset of log segment 4 is compared, it is to the right of logStartOffset. Then all log segments starting from log segment 4 will not be added to deletableSegments.

5. You can delete logs after collecting all logs that can be deleted.

Log compression

Log Compaction Log Compaction rewrites only the last version of a Compaction that occurs when the key has the same value. If the application only cares about the latest value of a key, you can enable the log clearing function for Kafka. Kafka periodically merges messages with the same key and retains only the latest value.

Log Compaction can be analogous to the persistence model of an RDB in Redis. You can imagine a scenario where Kafka is stored every time a message changes. If Kafka crashes at some point and you want to recover quickly, you can use a log compression strategy. In this way, only the latest data needs to be recovered.

How does Kafka read and write data so fast?

Sequential appending

Kafka writes data sequentially, appending data to the end of the file. For ordinary mechanical disks, random writes involve disk addressing issues, resulting in extremely poor performance, but sequential writes that simply append the end of the file in a sequential manner can provide almost the same performance as writes to memory. As shown in the following figure, the sequential I/O performance of a common mechanical disk is 53.2M values/s.

Page Cache

To ensure disk write performance, Kafka uses mMAP memory mapping to asynchronously write to the operating system’s Page Cache. Also known as OS cache, it means the cache managed by the operating system itself. In this way, when writing disk files, the OS cache can be directly written to the OS cache first. Then, the OPERATING system decides when to flush the data from the OS cache to disks, greatly improving write efficiency and performance. As shown below:

Zero copy technology

In order to solve the problem of unnecessary Copy of kernel-mode and user-mode data, Kafka introduces zero-copy technology when reading data. That is, the data in the OS cache of the operating system is directly sent to the network card and then sent to downstream consumers, skipping the step of copying data twice. In this way, the CPU cost of copying is reduced, the number of context switches in user-mode kernel mode is reduced, and the performance of data transmission is optimized. In Socket cache, only one descriptor is copied, and data is not copied to Socket cache, as shown in the following figure:

There are two main areas where zero-copy technology is used in Kafka:

1) Index files based on MMAP mechanism: First of all, index files are implemented based on MappedByBuffer, that is, the user state and the kernel state share the kernel state data buffer, so data does not need to be copied to the user state space. Although Mmap eliminates unnecessary Copy, its creation and destruction success varies between operating systems and may not always guarantee high performance. So in Kafka only index files use Mmap.

2) log file reading and writing based on sendfile: there is a TransportLayer interface in Kafka TransportLayer, whose implementation class uses the Java FileChannel transferTo method. Underlying this method is the zero-copy mechanism implemented using sendfile, which is currently only used when the I/O channel is PLAINTEXT.

Sending messages in batches

Kafka does not send messages one by one. Instead, it combines multiple messages into a Batch and sends them. The same is true for consuming messages. As shown below:

Data compression

An optimized compression algorithm is used on all three ends in Kafka. Compression helps improve throughput, reduce latency, and improve disk utilization. Kafka supports a variety of compression algorithms: LZ4, Snappy, gzip, and ZStandard. Kafka 2.1.0 is an open-source compression algorithm from Facebook that provides an extremely high compression ratio.

In Kafka, compression can occur in two places: the producer side and the Broker side. The Producer side compresses, the Broker side holds, and the Consumer side decompresses. This saves a lot of network and disk overhead.

What are the Kafka consumption models?

Generally, there are two modes of message consumption: push and pull. The consumption model in Kafka is a pull model, which can be implemented in two ways: point-to-point and publish-subscribe.

Point-to-point

Point-to-point: If all consumers belong to the same consumer group, all messages will be distributed to each consumer, but messages will be consumed by only one of them.

Publish and subscribe

Publish subscribe: If all consumers belong to different consumer groups, all messages will be assigned to each consumer and each consumer will receive the message.

What is a consumer group and what does it do?

Let me start by looking at what a consumer group is:

A Consumer Group, as its name implies, consists of multiple consumers and has a common and unique Group ID. Each consumer in the group is responsible for consuming data of different zones, but a zone can only be consumed by one consumer in the group, and the consumer groups do not affect each other.

Why did Kafka design a Consumer Group, not just consumers?

We know that Kafka is a high throughput, low latency, high concurrency, and high scalability message queue product, so if a Topic has millions to tens of millions of data and is consumed only by the Consumer process, the consumption rate can be expected. Therefore, a mechanism with good scalability is needed to guarantee the consumption progress. At this time, the Consumer Group comes into being. The Consumer Group is an extensible and fault-tolerant Consumer mechanism provided by Kafka.

Kafka Consumer Group features are as follows:

1) Each Consumer Group has one or more consumers.

2) Each Consumer Group has a common and unique Group ID.

3) When a Consumer Group consumes a Topic, each Partition of a Topic can only be assigned to a certain Consumer in the Group. Once consumed by any Consumer, this data can be considered to have been successfully consumed by the current Consumer Group.

What is the function of Offset in Kafka and how to maintain it?

In Kafka, each message under each Topic partition is assigned a unique ID value that identifies its location within the partition. This ID value is called Offset, or Offset, and cannot be changed once the message is written to the log partition.

Displacement Offset management mode

Older versions of Kafka (prior to 0.9) stored shifts in ZooKeeper, reducing the overhead of storing Broker side state.

Because Zookeeper is not suitable for frequent write updates, and the Consumer Group’s shift commits are frequent write operations, which can slow down the Performance of the Zookeeper cluster, in the new version of Kafka, Kafka topics naturally support high frequency writes and persistence, which is known as __consumer_offsets.

__consumer_offsets: This is used to store the displacement information submitted by Kafka consumers. It is automatically created by Kafka, just like any other Topic. The message format is defined by Kafka itself and cannot be modified.

__consumer_offsets has three message formats:

1) The message used to save Consumer Group information.

2) As a tombstone message, it can also be called as a tombstone message, which is characterized by empty message body. Once all the Consumer shift data in a Consumer Group have been deleted, Kafka writes a tombstone message to the corresponding partition of the __consumer_offsets theme, indicating that the Group information is to be deleted completely.

3) Used to save the displacement value.

__consumer_offsets message format

1) The message format can be simply understood as a KV pair. Key and Value represent the Key Value and message body, respectively.

2) What does the Key store? In Kafka, there are a large number of consumers. We must have a field to identify which Consumer the shift data belongs to. We know that the Consumer Group shares a common and unique Group ID, so should we just save it? We know that the Consumer commits the shift at the dimension of the partition. Obviously, the partition where the Consumer commits the shift should also be saved in the key.

3) Summary: The Key of the displacement topic should save three parts of the content: Group ID, topic name and partition number

4) Value can simply be considered as storing offset, but there is also some metadata that Kafka can use to perform other operations, such as deleting expired offset data.

__consumer_offsets message format schematic:

__consumer_offsets create

How is __consumer_offsets created? When the first Consumer in a Kafka cluster starts, Kafka automatically creates __consumer_offsets.

It is a normal Topic, and it has a partition number. If Kafka automatically creates a partition number, how is the partition number set?

The parameters depend on the Broker end theme partition number of displacement which offsets. The topic. Num. The default value is 50 partitions, so Kafka will automatically create a 50 __consumer_offsets partition. Since there are number of partitions, inevitably there will be a copy of the partition number, this is dependent on the Broker to complete the end another parameter, namely offsets. The topic. The replication. Factor the default value is 3.

To sum up, __consumer_offsets is automatically created by Kafka, so the number of partitions for this Topic is 50, and the number of copies is 3. According to abs(groupid.hashcode ()) % NumPartitions, This ensures that the Consumer Offset information is on the same Broker node as the Coordinator corresponding to the Consumer Group. As shown below:

If my article is helpful to you, please also help to like, view, forward, thank you very much!

Adhere to the summary, continue to output high-quality articles to pay attention to me: Howie chat technology

Kafka Interview rapid-fire, see how far you can go? (on)