This is the second day of my participation in Gwen Challenge

Kafka is a high-performance cross-language distributed message queue written in Scala, with single-machine throughput up to 10W and message latency at ms level. Kafka is a fully distributed system. Brokers, producers, and consumers all automatically support distribution and rely on ZooKeeper for distributed coordination.

Kafka supports multiple reads per write, messages can be consumed by multiple clients, and messages can be repeated, but not lost.

1. Kafka core concepts

The Kafka message queue consists of three roles. On the left is Producer of the message. In the middle is a Kafka cluster, Kafka cluster consists of multiple Kafka servers, each server is called a Broker (Kafka supports horizontal scaling, the general number of brokers, the higher the throughput of the cluster), that is, the message Broker; On the right is the Consumer of the message. Kafka uses Zookeeper to manage cluster configuration, elect the leader, and rebalance when the Consumer Group changes.

Producer uses push mode to publish messages to the broker, and Consumer uses pull mode to subscribe to and consume messages from the broker.

1.2 the Topic

Each message published to the Kafka cluster has a category called Topic. Physically we can think of a Topic as a Queue. In practice, different business data can be set to different topics. A Topic can have multiple consumers, and when a producer sends a message on a Topic, all consumers who subscribe to that Topic can receive the message.

1.3 Partition

To improve parallelism, Kafka maintains multiple partitions for each Topic, and each Partition can be viewed as an append type of log. Messages in each partition are guaranteed to have unique and ordered ids, and new messages are constantly appended to the end. Partition When data is stored, data is partitioned by size to ensure that data is always written to smaller files, improving performance and facilitating management.

In the middle of the figure, partitions are distributed across multiple brokers. The green modules in the figure indicate that Topic1 is divided into three partitions. Each Partition is replicated across multiple brokers, as shown in the red module, to ensure disaster recovery if the primary Partition fails. Each Broker can hold multiple partitions of multiple topics.

Kafka only ensures that messages are ordered within a partition, not between different partitions of a Topic. To ensure high processing efficiency, all message reads and writes are performed on the primary Partition, while other replica partitions only copy data from the primary Partition. Kafka maintains a set of synchronized replicas called ISR (In-Sync Replica) on ZooKeeper for each Topic. If a primary partition becomes unavailable, Kafka selects a copy from the ISR collection as the new primary partition.

1.4 Consumer Group,

A Consumer consumes messages according to a Group. Kafka uses a Group Coordinator to manage which Partition a Consumer consumes. By default, the Consumer supports Range and polling.

Each message in a Topic can be consumed by multiple Consumer groups, such as GroupA and GroupB in the figure above. However, each Partition can only be consumed by one Consumer in a Group. Therefore, if broadcasting is required, each Consumer should have a separate Group. To implement unicast, all consumers need to be in the same Group. Consumer groups also allow consumers to Group freely without having to send messages multiple times to different topics.

2. Kafka advanced use

2.1 How to ensure consumption order

Again, this Topic is divided into four partitions (P1 to P4 in green). The upper producer selects one Partition to write according to the rule, and the default rule is polling strategy. The manufacturer can also specify the Partition or key to select the Partition based on the Hash value.

If a topic has multiple partitions, the order in which the data is consumed cannot be guaranteed. If the consumption sequence of messages needs to be strictly guaranteed and consumption performance is not required, the number of partitions can be set to 1 and all messages can be written into the partition by Producer for consumption. Conversely, you can specify the same routing key (order number) for the same type of business (for example, all operations on an order), and send messages to the same partition to ensure consumption order.

2.2 How to Improve Throughput

Producer can improve throughput by setting three parameters:

  • buffer.memory: sets the buffer for sending messages. The default value is 33554432, or 32MB
  • compression.typeThe default value is None, which does not compress. However, lZ4 compression can also be used, which is fairly efficient. After compression, the amount of data can be reduced and the throughput can be improved, but the CPU cost on the producer side will be increased.
  • batch.size: batch message size. The default value is 16384, or 16KB. If the batch is too small, frequent network requests and throughput decrease. If the batch is too large, it will cause a message to wait too long to be sent, and it will strain the memory buffer, and too much data will be cached in memory.

In a real production environment, the batch value can be increased to improve throughput, but if a batch is set too large, there will be delays. It is generally set based on the size of a message. If we have less information. The default value is 0, meaning that the message must be sent immediately, but this is not correct, generally set a 100 milliseconds or so, in this way, the message is sent out after the batch, if 100 milliseconds, the batch is full of 16KB, It will be sent out naturally.

2.3 How do I Prevent Message Loss

2.3.1 broker Settings
  • replication.factor>=3: a copy of the message partition, ensuring that the Broker is highly available.
  • min.insync.replicas>1: the message is committed only if it is written to more than one copy
  • unclean.leader.election.enable=false: A broker with too much missing data cannot become the leader

Replication. The factor should be > min. Insync. Replicas, ensure the availability, if equal, copy any hang up and then to the partition complains when inserting data.

2.3.2 Producer set

When sending messages to the broker, Producer can set retries to avoid network jitter. In addition, Producer can set request.required. Acks to ensure the reliability of sending messages.

  • 0: As long as the request has been sent, it does not care whether the write succeeded or not. Performance is good. If you are analyzing some logs and can withstand data loss, performance is good with this parameter.
  • 1: Sends a message to the leader Partition. The write is successful only when the write succeeds. However, there is the possibility of losing data in this way.
  • - 1: The message is written successfully only after all copies in the ISR list are written. This can be done through the kafka server
2.3.3 consumer Settings

After the message is written to the Broker, the Consumer submits the offset mark consumption progress after consuming. However, if we choose automatic submission, if it is batch consumption, there may be a thread processing failure, but the offset is still submitted, resulting in message loss. You can disable automatic submission by enable.auto.mit =false, and manually submit offset after the message is repeated. However, the consumer will hang before the message is submitted, resulting in repeated consumption. It will be mentioned later.

2.4 How to ensure that messages are not consumed repeatedly

Kafka provides offset to implement Exactly once (every message must be transmitted once and only once) very directly.

  • The data structure in each consumer memory stores the consumption offsets for each partition of each topic, and offsets are periodically submitted. Kafka-0.10.1. X was previously written to ZK, but such a high concurrency request for ZK is not reasonable architecture design. Zk is designed to coordinate distributed systems. Lightweight metadata store, cannot be responsible for high concurrent reads and writes, as data store.

  • Versions after Kafka-0.10.1. X commit offset to kafka internal topic: The key is group. Id +topic+ partition number, and the value is the current offset value. Every once in a while, Kafka compacts this topic. That is, each group. Id +topic+ partition number keeps the latest data.

If we choose auto-commit, there could be a problem of double-consuming kafka without committing offsets due to the consumer exiting the message before auto-commit offsets. We can circumvent this by setting a smaller value to auto_commit_interval_ms, but not completely. Due to the inflexibility and imprecision of Kafka’s automatic submission of offsets (only at a specified frequency), the offset can be more flexibly and accurately controlled by manual submission of offsets. At the same time, messages can be combined with idempotent processing in the business code to ensure that messages are not processed repeatedly and messages are not lost.

2.5 How to improve consumption power

  • Increase the number of partitions for topic to match the number of consumers
  • Batch pull message, multithreading consumption. throughmax.poll.records(number of strips pulled each time) andmax.poll.interval.ms(Maximum time interval of each poll), so that each consumer can pull multiple data for multithreading consumption.

The actual code can be read in this article.