Kafka

Kafka use

It can be summed up in a few words: asynchronous processing, daily system decoupling, peak clipping, speed up, broadcast. More specifically: messages, site activity tracking, monitoring metrics, log aggregation, stream processing, event gathering, submit logs, etc

Kafka basic concepts, how to implement high availability? Broker, Topic, Partition, Leader, and Follower

High availability of MQ

Learn the basics of Kafka

Why does Kafka have high throughput and write/read speed
Partitions

Multiple Partitions for the same Topic are typically distributed on different nodes, which amounts to parallel processing.

Sequential write to disk

Sequential reads and writes reduce the seek time of hard disk heads and require less sector rotation time

Kafka official test data (raid-5, 7200rpm) sequential I/O: 600MB/s random I/O: 100KB/s

Page Cache

When the Broker receives data, it writes the data to the Page Cache. There is no guarantee that the data will be written to disk

Zero copy

Direct mapping of disk space and memory space, data is no longer copied to “user buffer”. Reduce the number of system context switches by 2, which doubles the performance

Batch send

Kafka allows you to send messages in batches. When producter sends a message, it caches it locally and sends it to Kafka when certain conditions are met

  1. Wait for the number of messages to reach a fixed number
  2. Send them once in a while

Data compression

Kafka also supports compression of message sets. Producer can compress message sets using GZIP or Snappy formats.

The advantage of compression is to reduce the amount of data transferred and reduce the pressure on network transmission.

After Producer compression, Consumer needs to decompress. Although the work of CPU is increased, the bottleneck of big data processing is on the network rather than the CPU, so the cost is worth it

Mechanism of Replicas

Ensure multiple partitions in a high availability Topic

Generally, multiple partitions will be established under topic. When the number of consumers is larger than the number of partitions, the surplus consumers will not get messages, resulting in a waste of resources. When the number of consumers is smaller than the number of partitions, one consumer will correspond to multiple partitions.

Do not specify a key, kafka will according to the way of polling, in turn, send the message to each partition, but if you specify the key, then according to the hash value and the partition of the key modulus, calculated should be sent to which partitions, so when we send by means of the specified key, if the set is not reasonable cause kafka partition is not uniform, As a result, some consumers are too stressed and some are too idle.

In Kafka, each partition has a partition leader and zero to multiple partition followers. The leader is responsible for receiving and sending messages, and the followers do not process messages. The followers constantly pull messages from the leader to keep their leaders in sync. When the leader in a partition dies, a follower is elected as the new leader.

Since only the partition leader processes messages, Kafka typically arranges the leaders evenly among brokers to make them more load-balanced.

ISR mechanism

When the leader is down, Kafka preferentially elects a follower from the ISR as the new leader. The follower synchronizes messages from the leader. However, the follower is kicked out of the ISR if he or she has a STalling (GC), is down, is unable to pull messages as fast as the leader is receiving them, or is too far behind the leader for any other reason.

Parameter Replica.lag.time.max. ms, meaning the longest time for which data is not requested from the leader, the ISR is judged to be out of sync and kicked out.

Kafka Consumer Group

Also a logical concept, Kafka implements both unicast and broadcast message models. Data from the same topic will be broadcast to different groups. Only one worker in the same group can get this data. In other words, for the same topic, each group can get all the same data, but the data can only be consumed by one worker after entering the group. Workers in a group can be realized by using multithreading or multi-process, or processes can be scattered on multiple machines. The number of workers is usually no more than the number of partitions, and it is better to keep an integer multiple relationship between them. Kafka is designed with the assumption that a partition can only be consumed by one worker (within the same group).

The Kafka message is lost

why

MQ message Loss

Kafka message loss scenario

Consumer message loss

Cause: The consumer automatically commits offset, which makes Kafka think you have consumed the message, but the consumer hangs, causing the message to be lost.

Solution: Disable automatic commit offset and manually commit offset after processing to ensure that data will not be lost. But it will lead to repeated consumption, requiring business to do idempotent processing.

Manual synchronous submission: retries, low throughput Asynchronous synchronous submission: cannot be retried

Producer message loss

Asynchronous callback in sender code, if failed, keep retry.

Kafka cluster message loss

A Broker breaks down and the leader of the partition is reelected, while some data from the other followers is not synchronized. As a result, the leader dies and a follower is elected as the leader, leaving some data missing.

Kafka’s leader machine crashed. After switching from follower to leader, we found that this data was lost.

Therefore, you must set at least four parameters as follows:

  • For a topicreplication.factorParameter: This value must be greater than 1, requiring that each partition must have at least 2 copies.
  • Set on the Kafka servermin.insync.replicasParameter: This value must be greater than 1, which requires the leader to be aware that there is at least one follower still in contact with the leader, so as to ensure that there is still one follower after the leader dies.
  • Set on the producer sideacks=all: This is required for each piece of data, must beWrite is considered successful only after all replicas are written.
  • Set on the producer sideretries=MAX(a very, very, very large value, infinite retries) : this isRequires unlimited retries if a write failsIt’s stuck here.

Our production environment is configured in such a way that, at least on the Kafka broker side, data will not be lost in the event of a leader switch due to a failure of the leader broker.

Kafka repurchases

Kafka repeated consumption scenarios and solutions

Principle:

When autoCOMMIT =true, the next poll will be carried out when the messages pulled by the previous poll method are consumed. After the interval of auto.mit.interval. ms, the next call to poll will commit the offset of all consumed messages.

Repeated consumption scenarios:

The consumer process was killed, but the offset was not committed, and the next time it was re-poll a bunch of messages for repeated consumption

Solution:

  • Avoid the use ofkill -9
The processing time of consumer business is too longmax.poll.interval.ms

The reason:

The max-poll.interval. ms parameter defines the maximum interval between two polls. Its default value is 5 minutes. Coordinator can also start a new round of Rebalance. This can happen if the consumer consumes a message that is time consuming.

Solution:

  • Improve the consumption capacity, improve the processing speed of a single message, for example, the time-consuming steps in the message processing can be processed by asynchronous way, the use of multi-thread processing.
  • The value can be set based on actual scenariosmax.poll.interval.msSet the value to a larger value to avoid unnecessary rebalance
  • In addition, it can be appropriately reducedmax.poll.recordsThe default value is 500.
  • The introduction ofSeparate to heavyMechanism, such as adding unique identifiers such as message IDS when generating messages. On the consumer side, we can save the last 1000 message ids toredisormysqlTable, configurationmax.poll.recordsIs less than 1000. inMessages are consumed through the front table firstThen the message is processed.The consumption scenario does idempotent processing to the interface.
KafkaRepeated business processing of messages

Save the unique identity of the message to the external media and determine whether it has been processed each time it is consumed.

KafkaWhy not read/write separation
  • Data consistency issues
  • Delay problem, contrastRedisIs memory based,KafkaIt’s going to take the disk
Idempotency of consumption

idempotence

Scenario: The consumer automatically submits offset, but the machine is down and may re-consume after restart

The producer sends a message with a globally unique ID, which is then compared with Redis

Idea 2: Based on the database unique key

Order of message consumption

MQ order

  • One topic, one partition, one consumer, internal single-thread consumption, single-thread throughput is too low to use this.
  • Write N memory queues, all data with the same key to the same queue; Then, for N threads, each thread consumes a queue to ensure orderliness.

Global order:

A Producer, a Consumer, a Partition

Write N queues, store all the data with the same key in the same queue, and then consume one queue for each of the N threads

Local order:

The same key is distributed to the same partition. Because each partition is fixed to some consumer thread for consumption, messages in the same partition are strictly ordered.

You can specify a key, for example, if you specify an order ID as the key, the data related to the order must be distributed to the same partition, and the data in the partition must be in order. Write N memory queues, all data with the same key to the same queue; Then, for N threads, each thread consumes a queue to ensure orderliness.