Why is there a message system

1. Decouple
2. Asynchronous processing

For example, e-commerce platform, seckill activities.

The general process is divided into:

  1. Risk control
  2. Inventory lock
  3. To generate orders
  4. SMS notification
  5. Update the data

Split the seckill activity business through the message system, and put the business which is not urgent to deal with in the back to deal with slowly;

The process is changed to:

  1. Risk control
  2. Inventory lock
  3. The messaging system
  4. To generate orders
  5. SMS notification
  6. Update the data
3, flow control

3.1 When the gateway receives the request, it puts the request into the message queue

3.2 The back-end service obtains the request from the message queue and completes the subsequent seckill process. And then return the results to the user.

  • Advantages: Control the flow
  • Cons: Can slow down the process

Kafka core concepts

  • Producer: Producer generates data in the Kafka cluster
  • Consumers: Consumers get data in Kafka, process it, and consume it

The data in Kafka is pulled by consumers themselves

  • Topic: the topic
  • Partitions: partition

By default, a topic has one partition. You can set multiple partitions (the partitions are stored on different nodes of the server).

Solves a problem of how to store massive data

For example, if there is 2T of data, and a server has 1T, a topic can be divided into multiple areas and stored on multiple servers to solve the problem of massive data storage

Kafka’s cluster architecture

In a Kafka cluster, a Kafka server is a broker, a Topic is a logical concept, and a partition is represented as a directory on disk.

Consumer Group: When consuming data, you must specify a Group ID, specify a Group ID

Assuming that program A and program B specify the same group ID, then both programs belong to the same consumer group

Special:

  • For example, if you have A topic called topicA and program A consumes topicA, program B can no longer consume topicA (program A and Program B belong to the same consumer group).
  • For another example, program A has consumed the data in topicA, and now it is not allowed to consume topicA’s data again, but after A new group ID number is specified, it can be consumed.

There was no effect among different consumer groups. The consumer group needs to be customized, and the consumer name is automatically generated (unique).

Controller: a master node in a Kafka node. With the help of a zookeeper

Kafka disk sequential write ensures data write performance

Write data in Kafka:

Sequential write: When data is written to disks, data is appended. There is no random write operation.

Experience:

If the number of disks on a server reaches a certain number and the disk revolution reaches a certain number, the speed of sequential data writing (appressing) to disks is similar to that of writing to memory.

The producer writes the message to the OS cache memory through the Kafka service, and then writes the message to the disk in sync order

5, Kafka zero copy mechanism to ensure high performance read data

Consumer data reading process:

  1. The consumer sends a request to the Kafka service
  2. The kafka service reads data from the OS cache.
  3. Data is read from the disk to the OS cache
  4. The OS cache copies data to the Kafka application
  5. Kafka sends the data (replication) to the socket cache
  6. The socket cache is transmitted to the consumer through the NETWORK adapter

Kafka Linux SendFile technology – Zero copy

  1. The consumer sends a request to the Kafka service
  2. The kafka service reads data from the OS cache.
  3. Data is read from the disk to the OS cache
  4. The OS cache directly sends data to the NIC
  5. Data is transmitted to consumers through network cards

6. Save Kafka logs in segments

A topic in Kafka is usually partitioned; For example, create topic_A and specify that the topic has three partitions.

There are actually three directories created on three servers.

Server 1 (Kafka1) :

  • Create directory topic_A-0:
  • Below the directory are our files (storing data), kafka data is message, data is stored in the log file
  • The.log end is the log file. In Kafka, data files are called log files.

By default, a partition has n log files (segmented storage), and a log file is 1G by default

Server 2 (Kafka2) :

  • Create directory topIC_A-1:

Server 3 (Kafka3) :

  • Create directory topIC_A-2:

7, Kafka binary search location data

Every message in Kafka has its own offset, which is stored on the physical disk at position

Position: physical Position (on the disk)

This means that a message can have two locations:

  • Offset: Relative offset (relative position)
  • Position: indicates the physical position of the disk

Sparse index:

  • Kafka uses a sparse index to read the index. Every time Kafka writes a 4k log (.log), it writes a record index to the index.

Binary search is used

8. High Concurrency Network design (first understand NIO)

The network design part is one of the best designed parts of Kafka, which is why Kafka has high concurrency and high performance

To tune Kafka, you need to understand how Kafka works, especially in network design

Reactor network design mode 1:

Reactor network design mode 2:

Reactor network design mode 3:

Design of Ultra-high Concurrency Networks in Kafka:

9. Kafka redundancy ensures high availability

Partitions are replicated in Kafka. Note: There was no replica mechanism before 0.8. When you create a topic, you can specify either the partition or the number of replicas. Replicas have roles:

Leader partition:

  • Data write and read operations are performed from the leader partition.
  • An in-sync-Replica (ISR) list is maintained, but values in the ISR list are deleted according to certain rules

The producer sends a message that is first written to the leader partition

After the message is written, it is also written to other partitions in the ISR list, and the message is not committed until it is written

Follower partition: synchronizes data from the leader partition.

Good architecture thinking – summary

Kafka – High concurrency, high availability, and high performance

  • High availability: Multiple copy mechanism
  • Three-tier Architecture: Multi-selector -> Multi-thread -> Queue Design (NIO)
  • High performance:

Write data:

  1. Write data to the OS Cache first
  2. Write to disk sequentially, high performance

Read data:

  1. Based on the sparse index, quickly locate the data to consume

  2. Zero copy mechanism

    • Reduce data copying
    • Reduced application and operating system context switching

11. Build Kafka production environment

11.1 Requirement Scenario Analysis

E-commerce platforms, a billion requests are sent to Kafka clusters every day. 28 anyway, the general assessment of the problem is not big.

1 billion requests -> 24 over, under normal circumstances, every day 12:00 to 8:00 in the morning this period of time is actually not much data volume. 80% of the requests are processed in another 16 hours. 16 hours to process -> 800 million requests. 16 * 0.2 = 80% of the 800 million requests processed in 3 hours

That’s 600 million data in three hours. Let’s simply calculate QPS at peak times

6Million /3H =5.5M/s QPS =5.5wanCopy the code

1 billion requests * 50kb = 46 terabytes of data are stored every day

Generally, we set two replicas 46T * 2 = 92T. Kafka has a retention period of data, which is the last 3 days.

92T * 3Day = 276 tCopy the code

When I say 50KB, I don’t mean that a message is 50KB (combining logs, multiple logs together), usually a message is a few b’s, it can be a few hundred bytes.

11.2 Evaluating the Number of Dedicated Servers

1) First analyze whether you need a virtual machine or a physical machine

When we build clusters like Kafka, mysql, Hadoop, we use physical machines for production.

2) The peak needs to process the total request per second 55,000 requests, in fact, one or two physical machine is absolutely able to resist. In general, when we evaluate machines, we evaluate them at four times the peak.

If we were to quadruple that, we would probably be up to 200,000 QPS in our cluster. This kind of cluster is a more secure cluster. About five physical machines are needed. Each machine can handle 40,000 requests.

Scene summary:

  • It takes 5 physical machines to handle 1 billion requests, 55,000 QPS in peak period and 276T data.

11.3 Disk Selection

It takes 5 physical machines to handle 1 billion requests, 55,000 QPS in peak period and 276T data.

1) SOLID-state drives (SSDS), still require regular mechanical hard drives

  • SSD: Has good performance but is expensive
  • SAS disk: Low performance, but relatively cheap.

The performance of SSDS is good, which means that the PERFORMANCE of random read and write is good. Suitable for clusters like MySQL.

But in fact, its sequential write performance is similar to SAS disk.

What Kafka means: write in order. So we just use a normal [mechanical hard drive].

2) We need to assess how many disks are needed per server

Five servers, a total of 276 terabytes, about 60 terabytes of data for each server. The server configuration in our company uses 11 hard disks, each of which is 7T. 11 * 7T = 77T

77T * 5Servers = 385TCopy the code

Scene summary:

  • To handle 1 billion requests, it takes 5 physical machines, 11 SAS * 7T

11.4 Memory Evaluation To fulfill 1 billion requests, five physical machines are required, with 11 (SAS) x 7TB

We found that Kafka read and write data flow is based on OS cache, in other words, assuming that our OS Cashe infinite then the entire Kafka is equivalent to is based on memory to operate, if it is based on memory to operate, performance must be very good. Memory is finite.

  • Give as much memory as possible to the OS cache
  • The core Kafka code is written in Scala, and the client code is written in Java. Both are based on the JVM. So we have to give some memory to the JVM.

Kafka is not designed to put many data structures in the JVM. So we don’t need much memory for this JVM. As a rule of thumb, 10 grams is fine.

NameNode: The JVM also puts metadata (tens of gigabytes), the JVM must give large. Let’s say I give you 100 grams.

Let’s say we have 10 requests for this project, there will be 100 topics in total. 100 topic * 5 partition * 2 = 1000 partition

A partition is a directory on a physical machine that contains multiple.log files.

  • Log files are used to store data. By default, a.log file is 1 GB in size.

If we want to keep 1000 partitions of the latest. Log file data in memory, this is the best performance. 1000 x 1 GB = 1000 GB memory.

We just need to keep the current log up to date and make sure that 25% of the current data is in memory. 250M x 1000 = 0.25G x 1000 =250G memory.

  • 250 memory / 5 = 50 GBmemory
  • 50G+10G = 60Gmemory

64 GIGABytes of memory, and another 4 gigabytes. Does the operating system need memory? In fact, The JVM in Kafka can also not give so much as 10GB. It’s estimated that 64G is ok. Of course, if you can give 128G memory server, that is the best.

When I evaluated just now, I used 5 partitions for a topic, but there may be 10 partitions for a topic with a large amount of data.

Conclusion:

  • It takes five physical machines to handle a billion requests,11 (SAS) x 7TRequires 64GB of memory (128GB is better)

11.5 CPU Pressure Assessment

Assess how many CPU cores are needed per server (resources are limited)

We estimate how many CPUS are needed based on how many threads are running in our service. Threads rely on the CPU to run. If we have more threads but less CPU core, then our machine will be overloaded and performance will be poor.

Assess how many threads a kafka server will have after it starts.

  • Acceptor thread 1
  • Processor Thread 3 Six to nine threads
  • Processing request threads 8 32 threads
  • A periodic cleanup thread, a pull thread, a mechanism for periodic checking of ISR lists, and so on.

So when a Kafka service is started, there are probably over a hundred threads.

  • CPU core = 4, at a time to say, dozens of threads, must be full of CPU.
  • CPU core = 8, should easily support dozens of threads.

If we have more than 100 threads, or something like 200 threads, then eight CPU cores is not going to work.

So here’s our advice:

  • CPU cores = 16. If possible, 32 CPU cores would be great.

If you think the article is good, welcome to pay attention to the public [code ape technology column]

Conclusion:

  • A Minimum of 16 CPU cores should be used in a Kafka cluster, and 32 CPU cores would be better.
  • 2cpu * 8 =16 cpu core
  • 4cpu * 8 = 32 cpu core

Conclusion:

  • It takes five physical machines to handle a billion requests,11 (SAS) x 7TRequires 64GB of memory (128GB is better) and 16 CPU cores (32 is better).

11.6 Network Requirement Assessment

What kind of network card do we need?

Generally either gigabit network cards (1G/s), there are ten gigabit network cards (10G/s)

At peak times there are 55,000 requests per second, which is 5.5/5 = about 10,000 requests per server. As we said before, 10,000 times 50 kilobytes is 488M, so each server is receiving 488M of data per second. There are also replicas of data, and synchronization between replicas is also a request to go out on the network. 488 * 2 = 976m/s

Note:

  • In many companies, the data in a request is not as large as 50KB. In our company, it is because the host encapsulates data at the production end and then merges multiple data, so our one request can be so large.
  • In general, the bandwidth of the network card is not up to the limit, if it is gigabit network card, we can use is generally about 700M. But in the best case scenario, we’ll go with a 10 gigabit network card.
  • If you’re using a 10-gigabit, it’s easy.

11.7 Cluster Planning

  • The request quantity
  • Plan the number of dedicated servers
  • Analyze the number of disks and choose which disks to use
  • memory
  • cpu core
  • The network card

Is to tell you, in the future, if there is any demand in the company, the evaluation of resources, the evaluation of the server, everyone in accordance with my thinking to evaluate.

Size of a message 50kb -> 1kb 500byte 1M

TCP/IP host name

  • 192.168.0.100 hadoop1
  • 192.168.0.101 hadoop2
  • 192.168.0.102 hadoop3

Host planning: When kafka cluster architecture: master-slave architecture:

  • Controller -> Manage metadata for the entire cluster through the ZK cluster.

Zookeeper cluster

  • hadoop1
  • hadoop2
  • hadoop3

Kafka cluster

  • In theory, we should not install Kafka’s services for ZK together.
  • But we have limited servers here. So our Kafka cluster is also installed on hadoop1, haadoop2 and hadoop3

11.8 Setting up the ZooKeeper Cluster

11.9 Description of Core Parameters

11.10 Cluster Stress Test

12. Kafka operation and maintenance

12.1 Common O&M Tools

KafkaManager – Page management tool

12.2 Common O&M Commands

Scenario 1: The amount of topic data is too large. Increase the number of topics

In the beginning, when creating the theme, the amount of data is small and the number of partitions given is small.

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6

kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --partitions 3 --topic test6
Copy the code

The broker id:

  • hadoop1:0
  • hadoop2:1
  • hadoop3:2

Suppose a partition has three copies: partition0:

a,b,c

  • A: leader partition
  • B, c: followers partition
ISR:{a,b,c}
Copy the code

If a follower partition does not pull data from the leader partition for more than 10 seconds, it is removed from the ISR list.

Scenario 2: Add replica factor to the core Topic

If the copy factor is required for core business data

Vim test.json script, save the following line of JSON script

{" version ":1, "partitions" : [{" topic ":" test6 ", "partition" :0, "replicas" :0.1.2]}, {" topic ":" test6 ", "partition" :1, "replicas" :0.1.2]}, {" topic ":" test6 ", "partition" :2, "replicas" :0.1.2]}]}
Copy the code

Execute the above JSON script:

kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute
Copy the code

Scenario 3: Manually migrate a topic with unbalanced load

vi topics-to-move.json

{" switchable viewer ": [{" topic" : "test01"}, {" topic ":" test02 "}], "version" :1} 
// Write down all your topics here

kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181- switchable viewer - to - move - json - file switchable viewer - to - move. Json - broker - the list"5.6"- the generate// Write all of your machines, including new brokers, here. This means that all partitions are evenly spread across all brokers, including new brokers
Copy the code

A migration scheme is generated, which can be saved to a file: expa-cluster-reassignment.json

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Copy the code

The data migration operation must be performed at night during low peak hours, because it will move data between machines, very heavy bandwidth resources

  • Generate: Generate the migration plan based on the given list of topics and brokers. Generate does not actually perform the message migration, but rather calculates the message migration plan for use by the EXECUTE command.
  • Execute: Migrates according to the given message migration plan.
  • Verify: Checks whether the message has been migrated.

Scenario 4: There are too many broker leader partitions

Normally, our leader partition is load balanced between servers.

  • hadoop1 4
  • hadoop2 1
  • hadoop3 1

The number of partitions is automatically allocated and dynamically adjusted later. Kafka itself will automatically distribute the leader partition evenly on each machine, which can ensure that the read and write throughput of each machine is uniform.

The exception is that if some of the brokers are down, the leader partition becomes too concentrated on a small number of other brokers, resulting in a high level of read and write requests from the few brokers. Other downed brokers are rebooted as Folloer partitions with low read/write requests.

Cause cluster load imbalance has a parameter, auto. Leader. Rebalance. Enable, the default is true, every 300 seconds (leader. Imbalance. The check. The interval. Seconds) check whether leader load balance

If a broker on the disequilibrium of the leader of more than 10%, leader. Imbalance. Per. Broker. Percentage, an election will be to the broker.

Configuration parameters:

  • auto.leader.rebalance.enableThe default is true
  • leader.imbalance.per.broker.percentage: The ratio of unbalanced leaders allowed per broker. If each broker exceeds this value, the controller triggers the leader to balance. This value represents a percentage. 10%
  • leader.imbalance.check.interval.seconds: The default value is 300 seconds

Kafka producer

13.1 How Consumers Send Messages

13.2 Principles of Sending Messages by Consumers – Basic Example

13.3 How can I Improve throughput

How to improve throughput: Parameter 1: buffer.memory:

Sets the buffer for sending messages. The default value is 33554432, which is 32MB

Parameter 2: compression.type:

The default value is None. Lz4 can also be used to compress data. The efficiency is good

Parameter 3: batch.size:

  • Set the batch size. If the batch size is too small, frequent network requests will occur and the throughput decreases.
  • If batch is too large, it takes a long time for a message to be sent, and the memory buffer is overloaded. The default value is: 16384 is 16KB, that is, a batch is sent when it is full of 16KB. Generally, in a production environment, this batch value can be increased to improve throughput. If a batch is set to large, there will be a delay. It is generally set according to the size of a message.
  • If we have less information. Linger. Ms, the default value is 0, which means the message must be sent immediately, but this is not correct, it is usually set to something like 100 milliseconds, so that the message is sent into a batch, and if the batch is full of 16 kilobytes within 100 milliseconds, Naturally it will be sent.

13.4 How Can I Handle Exceptions

1, LeaderNotAvailableException:

If a machine is down and the leader copy is not available, you will fail to write data. You must wait for other follower copies to switch to the leader copy. In this case, you can retry sending data. If you restart the broker process of kafka at ordinary times, will surely lead to switch leader, will cause you to write an error, is LeaderNotAvailableException.

2, NotControllerException:

If the Controller’s Broker is dead, there will be a problem waiting for the Controller to be reelected. If the Controller’s Broker is dead, there will be a problem waiting for the Controller to reelect.

3. NetworkException: NetworkException timeout

  • If the retries parameter is configured, it will retry automatically
  • But if it doesn’t work after a few retries, we are provided with an Exception to handle, and when we get the Exception, we process the message separately. We’ll have a backup link. Sending unsuccessful messages is either sent to Redis or written to the file system, or even discarded.

13.5 Retry Mechanism

Retries cause some problems:

The message to repeat

Sometimes some leader switching problems, such as the need to retry, set retries, but the message retry will lead to repeated sending problems, such as network jitter, he thought it did not succeed, retry, in fact, they are successful.

Message out of order Message retries can cause messages to be out of order because the messages after you may have been sent. So you can use the “Max. In. Flight. Requests. Per. Connection” parameter is set to 1, so we can ensure the producer at the same time can only send a message.

The default interval between retries is 100 milliseconds, set to “retry.backoff.ms”, and 95% of all exceptions will be solved by retries during development.

13.6 Detailed ACK Parameters

The producer side

The request. Required. Acks = 0;Copy the code
  • As long as the request has been sent, it is sent. It does not care whether the request was written or not.
  • Good performance, if you are analyzing some logs and can afford to lose data, you can use this parameter, the performance is very good.
The request. Required. Acks = 1;Copy the code
  • After a message is sent, the write to the leader partition is successful.
  • However, this method also has the possibility of losing data.
The request. Required. Acks = 1;Copy the code
  • The message is not written until all copies in the ISR list have been written.
  • ISR: One copy. 1 leader partition 1 follower partition

Kafka server:

Min. Insync. Replicas: 1Copy the code

The default value is 1. A leader partition maintains an ISR list. This value limits the number of copies in the ISR list. An error occurs while trying to insert data into this partition.

Design a scheme without losing data:

  • Partition copy >=2
  • acks = -1
  • min.insync.replicas >=2

Another possibility is to send an exception: to handle the exception

13.7 Customizing partitions

Partitions:

  • No key set

Our messages will be sent to different partitions in rotation.

  • Set up the key

Kafka’s built-in partition computes a hash value based on the key, and the hash value applies to a particular partition.

If the keys are the same, then the hash value must be the same, and the value of the key must be sent to the same partition.

However, in some special cases, we need to customize the partition

public class HotDataPartitioner implements Partitioner {
    private Random random;
    @Override
    public void configure(Map
       
         configs)
       ,> {
        random = new Random();
    }
    @Override
    public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String key = (String)keyObj;
        List partitionInfoList = cluster.availablePartitionsForTopic(topic);
// get the number of partitions 0,1,2
        int partitionCount = partitionInfoList.size();
// The last partition
        int hotDataPartition = partitionCount - 1;
        return! The key. The contains (" hot_data ")? random.nextInt(partitionCount -1) : hotDataPartition; }}Copy the code

How to Use:

Configuration on this class can be: props. The put (partitioner. “class”, “com. ZHSS. HotDataPartitioner”);

13.8 Comprehensive case presentation

Demand analysis:

E-commerce background – a second-hand e-commerce platform

[happy send] of the project, the user will have [stars] after the purchase of things, with stars to change items. One dollar for a star.

The order system (message production), sends a message (pay order, cancel order) -> Kafka <- member system, consumes data from Kafak, finds the amount of the corresponding user’s consumption, and then updates the number of stars to the user.

Analyze:

When sending a message, you can specify or not specify a key.

1) If no key is specified

  • Zhangsan -> order -> 100 -> +100
  • Zhangsan -> cancel order -> -100 -> -100
  • When the member system consumes the data, it is possible to consume the cancellation order data first.

2) If key is specified,key -> hash -> partition number -> send to the corresponding partition.

  • If the key is the same -> data will be sent to the same partition (ordered)

This project needs to specify the key, specifying the user ID as the key.

14. Kafka consumers

14.1 Concept of consumer Groups

The same groupid belongs to the same consumer group

1) Each consumer must belong to a consumer. Group, which means a consumer group. A topic partition is assigned to only one consumer in a consumer group. It is also possible that a consumer is not assigned any partitions.

2) If you want to achieve a broadcast effect, you only need to use different group IDS to consume.

topicA:

  • Partition0, partition1

GroupA:

  • Partition0 consumer1: consumption
  • Note: consuemr2: consumption
  • Consuemr3: Data cannot be consumed

groupB:

  • Consuemr3: Consume to Partition0 and Partition1

3) If a consumer in the Consumer group hangs up, the partition assigned to him will be automatically handed over to other consumers; if he restarts again, some partitions will be handed back to him

14.2 Basic Case Demonstration

14.3 Offset Management

The data structure in each consumer memory holds the consumption offset for each partition of each topic, and the offset is periodically submitted. The old version was written to zK, but that high concurrent request ZK is not reasonable architecture design, zK is to coordinate distributed systems, lightweight metadata storage, cannot be responsible for high concurrent read and write. As a data store.

Now the new version commits the offset to the internal Kafka topic: __consumer_offsets, when the past is submitted, the key is the group. Id +topic+ partition number, and the value is the current offset value. Every once in a while, kafka internally compuses this topic. That is, each group. Id +topic+ partition number keeps the latest data.

__consumer_offsets may receive high concurrency requests, so the default partition is 50 (leader partitiron -> 50 kafka), so if your Kafka has a large cluster, say 50 machines, You can use 50 machines to counter the pressure of offset submissions.

  • Consumer -> Broker – side data
  • Message -> Disk -> offset in ascending order
  • Where to start spending? -> offset
  • Consumer (offset)

14.4 Offset Monitoring Tool Description

A management software for Web page management (Kafka Manager)

  • To modify the bin/kafka-run-class.sh script, add the first lineJMX_PORT=9988
  • The Kafka process is restarted

Another piece of software: consumer offsets that are primarily monitored.

Java-cp kafkaoffsetMonitor-assembly-0.3.0-snapshot. jar

com.quantifind.kafka.offsetapp.OffsetGetterWeb
Copy the code
  • OffsetStorage kafka \ (offsetStorage kafka \ (offsetStorage kafka \))
  • zk hadoop1:2181
  • port 9004
  • refresh 15.seconds
  • retain 2.days

Write a program to consume data in Kafka (consumer, processing data -> business code) -> Kafka how to determine your code is really real-time to consume?

Delay hundreds of millions of data entries -> Threshold (Send an alarm when the number is 200,000.)

14.5 Abnormal perception of consumption

The heartbeat. Interval. Ms:

  • The consumer heartbeat interval. The consumer must be in the heartbeat with the coordinator to know if the consumer is faulty.
  • And then if there’s a malfunction, it sends the rebalance command through its heartbeat to other consumers to make the rebalance

The session. A timeout. Ms:

  • How long does Kafka take to perceive a consumer before it considers it malfunctioning? The default is 10 seconds

Max. Poll. Interval. Ms:

  • If the time between two poll operations is longer than this, the consume is considered too weak and is kicked out of the consumption group. The partition is allocated to someone else to consume. Generally, it is set according to the performance of the business processing.

14.6 Core Parameter Description

The fetch. Max. Bytes:

To get the maximum number of bytes in a message, it is recommended to set it to a larger number, the default is 1 MB and we have seen this in several places before, what is the maximum number of bytes a message can take?

  1. Producer: The maximum size of a message sent: -> 10M
  2. Broker: Stores data. The maximum size a message can accept -> 10M
  3. Consumer:

max.poll.records:

The maximum number of messages to be returned in a poll. The default is 500

Connection. Max. Idle. Ms:

If the socket connection between the consumer and the broker is idle for longer than a certain period of time, the connection will be automatically reconnected, but the next time the socket connection is re-established. The recommendation is to set this to -1 and not to be reconnected

enable.auto.commit:

The automatic commit offset function is enabled

auto.commit.interval.ms:

How often the offset is committed. The default value is 5000 milliseconds

Auto. Offset. Reset:

  • Earliest: When there are submitted offsets under each partition, consumption starts from the submitted offsets; If no offset is submitted, consumption starts from scratch
  • Latest: If the partition has submitted offsets, consumption starts from the submitted offsets. If no offset is submitted, the newly generated data in the partition is consumed
  • None: If each topic partition has an offset, consumption starts after the offset. As long as one partition does not have a committed offset, an exception is thrown

14.7 Comprehensive case demonstration

Introduction case: Second-hand e-commerce platform (Happy Send), according to the amount of consumption of users, users star accumulative.

  • Order system (producer) -> send message in Kafka cluster.
  • The member system (consumer) -> consumes messages in the Kafak cluster and processes them.

14.8 Working Principles of group Coordinator

Question: How do consumers make the rebalance? – Based on coordinator

What is the coordinator

Each consumer group selects a broker as its coordinator. This broker monitors the heartbeat of all consumers in the consumer group and makes a rebalance

How do I select a coordinator

First of all to hash the groupId (number), then the number of partitions to __consumer_offsets modulus, the default is 50, _consumer_offsets partition number through offsets. Topic. Num. Partitions to set, find the partition, The broker machine on which this partition resides is the Coordinator machine.

Such as: GroupId, “myconsumer_group” -> hash value (number) -> mod 50 -> 8__consumer_offsets which broker is on the 8th partition of this topic, The Coordinator knows where all the consumers in the consumer group are submitting their offsets to.

Run the process

  • Each consumer sends a JoinGroup request to the Coordinator,
  • The Coordinator then selects a consumer from a consumer group as the leader.
  • Send the Consumer group information to the leader,
  • Then the leader will be responsible for making the consumption plan,
  • The Coordinator is sent to the SyncGroup
  • The Coordinator then sends the consumption scheme to the consumers, who then send it from the specified partition

The Leader Broker starts the socket connection and consumes the message

14.9 rebalance strategy

A consumer group uses a coordinator to implement the Rebalance

There are three strategies for rebalance: range, round-robin, and sticky

For example, a theme we consume has 12 partitions:

p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
Copy the code

Let’s say we have three consumers in our consumer group

The range strategy

  • The range policy is based on the ordinal range of the partiton
  • p0~3 consumer1
  • p4~7 consumer2
  • p8~11 consumer3
  • The default is this strategy;

Round – robin strategy

  • It’s called polling allocation
  • ,3,6,9 consumer1:0
  • ,4,7,10 consumer2:1
  • ,5,8,11 consumer3:2

But there is a problem with the previous two scenarios: 12 -> 2 each consumer consumes six partitions

Let’s say consuemR1 is dead: P0-5 is assigned to Consumer2, and P6-11 to Consumer3. In this way, the p6 and P7 partitions that were originally on Consumer2 are assigned to Consumer3.

Sticky strategy

Rebalance all the partitions that belong to the consumer. This is a sticky strategy. If the consumer makes no changes to the partitioning strategy, make sure that all partitions that belong to them are rebalance

  • Consumer1:0 to 3
  • consumer2: 4-7
  • consumer3: 8-11

Let’s say Consumer3 dies

  • Consumer1:0-3, +8,9
  • Consumer2:4-7, + 10, 11

15. Broker management

15.1 Leo and HW meanings

  • The core principle of Kafka
  • How to evaluate a cluster resource
  • Set up a Kafka cluster – “introduced some simple operation and maintenance management operations.
  • Producer (use, core parameters)
  • Consumer (principles, usage, core metrics)
  • Some of the principles within the broker, core concepts: LEO, HW

LEO: It’s related to the offset.

LEO:

In Kafka, both the leader partition and the follower partition are called replicas.

Every time a partition receives a message, it updates its LEO, which is the log end offset. LEO is actually the latest offset + 1

HW: High water level

An important function of the LEO is to update the HW. If the follower and the leader LEO are in sync, the HW can be updated

Data before HW is visible to the consumer and the message is in the COMMIT state. After HW the message consumers can’t consume.

15.2 Leo update

Hw update 15.3

15.4 How Can a Controller Manage a Cluster

1: Competing for controller

  • /controller/id

2: Controller Directory that the service listens to:

  • /broker/ids/It is used to sense the broker up and down
  • /broker/topics/To create the topic, we created the topic command at that time, providing the parameter, ZK address.
  • /admin/reassign_partitionsPartition redistribution

15.5 Delayed Tasks

Delay scheduling mechanism in Kafka (extended knowledge)

Let’s first take a look at where kafka needs tasks to be delayed.

The first type of delayed tasks:

For example, if acks of the producer are -1, the response of the producer cannot be returned until both the leader and followers have finished writing.

There is a timeout period, which defaults to 30 seconds (request.timeout.ms).

So need to be written to a disk data to the leader, you must have a delay task, due to time is 30 seconds delay task On DelayedOperationPurgatory (delay manager).

If before 30 seconds if all the followers are copy write to the local disk, then this task will be automatically trigger the awakening, can return to response the result to the client, otherwise, this delay task myself specifies the most expires is 30 seconds, if the timeout time don’t wait, to directly timeout abnormal return.

The second type of delayed tasks:

When a follower attempts to fetch a message from the leader, if the message is empty, a delayed fetching task is created

When the delay is over (say, 100ms), the follower sends an empty message to the follower, and the follower sends another request to read the message. However, if the leader writes a message during the delay (less than 100ms), the follower automatically wakes up and performs the pull task.

Huge amount of delayed tasks to schedule.

15.6 Time wheel Mechanism

1. What will design the time wheel?

Kafka internal there are many delayed tasks, not based on the JDK Timer to achieve, the time complexity of the insert and delete task is O(nlogn), but based on their own written time round to achieve, the time complexity is O(1), rely on the time round mechanism, delayed task insert and delete, O(1)

2. What is the time wheel?

The time round is essentially an array.

  • TickMs: the time wheel interval is 1ms

  • WheelSize: time wheelSize 20

  • Interval: timckMS * whellSize, the total time span of a time wheel. 20ms

  • CurrentTime: pointer to the currentTime.

    • A: Because the time wheel is an array, so when we want to get the data in it, we need to use index, and the time complexity is O(1).
    • B: The corresponding task at a location in the array is stored in a bidirectional list. The time complexity of inserting and deleting tasks into the bidirectional list is also O (1).
  1. Multilevel time wheel

For example, insert a task that will run 110 milliseconds later.

  • TickMs: The time wheel interval is 20ms

  • WheelSize: time wheelSize 20

  • Interval: timckMS * whellSize, the total time span of a time wheel. 20ms

  • CurrentTime: pointer to the currentTime.

    • Time round of layer 1:1ms * 20
    • Layer 2 Time round: 20ms * 20
    • Time round of layer 3:400ms * 20