This is the 14th day of my participation in the August More Text Challenge. For details, see: August More Text Challenge

Recommend a very good Kafka management platform, Kafka soul mate Didi open source Logi-Kafkamanager one-stop Kafka monitoring and control platform


Technical communication

If you want to enter didi LogI open source user group, add my personal wechat: jjdLmn_ group (note: group). The group mainly exchanges Kakfa, ES, Agent, Logi-Kafka-Manager, and other related technologies; There are people in the group to answer your questions. There are people in the related technical field to answer your questions. Everything you ask will be answered

If you want to enter didi LogI open source user group, add my personal wechat: Jjdlmn_ group (note: group). The group mainly exchanges kakFA, ES, Agent, and other technology groups. There are specially-assigned people to answer your questions, and you can get the response


Didi/Logi-Kafkamanager: a one-stop Apache Kafka cluster indicator monitoring and operation and maintenance management platform

The previous article briefly introduced how to access a cluster, and how to apply for a Topic and a quota. At this time, we didn’t know much about the advantages of Logi-Kafkamanager and how to manage many Kafka clusters.

Operational controls

The operation and maintenance control menu is mainly for the operation and maintenance personnel to manage all clusters.

Access to the cluster

Kafka’s soulmate logi-Kafkamanger a cluster access and related concepts

Physical Cluster List

All physical clusters are listed. Click on a physical cluster to see details.

If no information is available, please check whether JMX is enabled correctly. ==> The jMX-connection fails

An overview of the cluster

.

Real-time traffic

Indicators show

Because I sent and consumed the message, in order not to let the previous data interference; If we restart the Broker again, the Jmx data will clear to 0. Go to the database when the historical data is clear_metricsDelete all table data at the end;Execute the following code to verify that the real-time traffic criteria are not accurate; The following code indicates that 60 messages are sent every 60 seconds. Each message size is 1 byte; But the message is only sent once in the 60S; Because of thelinger.ms=60000, set to send after 60 seconds. So the expected real-time metrics are;

    @Test
    void contextLoads(a) {

            Properties props = new Properties();
            props.put("bootstrap.servers"."xxxxxxx");
            props.put("acks"."all");
            props.put("retries".0);
            props.put("batch.size".16384000);
            props.put("linger.ms".60000);
            props.put("buffer.memory".335544320);
            props.put("client.id"."appId_000001_cn.Test2");
            props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<>(props);
            for(int i = 0; i < 60; i++){
                // Make a message larger
                byte[] log = new byte[1024];
                String slog = new String(log);
                producer.send(new ProducerRecord<String, String>("Test2".0, Integer.toString(i),  slog));
            }
        try {
            Thread.sleep(62000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producer.close();
    }
Copy the code

MessagesIn: Number of messages sent to Kafka per second = 1 byteIn: Number of bytes sent to Kafka per second = 1 byte totalProduceReques: Total number of requests sent per second = 1/60=0.017 This is the number of requests, because there’s actually only one request sent in 60s;

Execute the code and see the result Basically in line with our expectations, real-time traffic data is accurate;

In addition to the above indicators, we should also pay attention to the following outliers, which are all 0 under normal circumstances; If it is not 0, there may be an exception, operation and maintenance students should check the exception log; ByteRejected (B/s) of the number of bytes per second is rejected failedFetchRequest pull failure number of requests per second per second failedProduceRequest send messageIn the number of requests/totalProduceRequest failure The number of messages/total requests can also be looked at; If their result =1; Note If a request is not sent in batches, one message is sent

Historical traffic

Indicators show

Historical data is stored in the _metrics table;

The Broker information

The top left is a kanboard for peak utilization of all brokers. You can use this graph to get a brief idea of the peak utilization of all brokers. You need to check the source code to see how the peak utilization is calculated.

The replica state diagram can be understood as synchronous in the ISR; Not in the ISR is unsynchronized;

We now shut down one of Broker 1 to simulate abnormal situations such as Broker downtime; You can see it’s going to look like this; As you can see, 1 is in unused state, 0,2 copies of both brokers are in out-of-sync state;

Copy status: If the number of invalid copy partitions is greater than 0, the copy status is not synchronized. The number of invalid copy partitions UnderReplicatedPartitions kafka are accessed through JMX. Server: type = ReplicaManager, name = UnderReplicatedPartitions get value; If get UnderReplicatedPartitions value greater than 0, it is possible that a Broker’s problem, may also be extended to the problem of the whole cluster, may also introduce some other information, such as indicators cooperate to find out the problem.

Note: This value is also greater than 0 if the Kafka cluster is performing partition migration (Kafka-reassign – Partitions. Sh).

For more information about the abnormal number of failed copies, see Diagnosis and Warning of Failed copies

If we understand the meaning of duplicate state, we can understand the figure above; Brokers [0,2] are not synchronized because Broker 2 contains copies of [0,2]; Broker2 is down and the number of invalid copy partitions is greater than zero

Delete: A Broker can be deleted when it is offline. You can delete the Broker when it is removed from the cluster, but it will be added back if it is reclustered. If only the Broker is down, do not delete it;

.

Leader Rebalance

To understand what this function does, we need to understand a concept, rightLeader balancing;

Leader equilibrium mechanism (auto Leader. Rebalance. Enable = true)

When a broker stops or crashes, the leaders of all partitions within that broker are transferred to other copies. This means that by default, when the broker is restarted, all of its partitions will only be used as followers and will not be used for client read and write operations.

To avoid this imbalance, Kafka has a concept of preferred replicas. If the replicas list for a partition is 1, 5, and 9, node 1 is preferred as the leader of the other two replicas 5 and 9, because it exists in the replicas earlier. You can make the Kafka cluster attempt to restore the leader status of the recovered replica by running the following command: This is the leader’s balancing mechanism, which does not result in unbalanced load and wasted resources

# kafka version <= 2.4 > bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot # kafka new version > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:portCopy the code

Kafka balance leader

In the conf/server configuration file. The properties configured in the open (the default is open) auto. Leader. Rebalance. Enable = true and relevant configuration Leader. Imbalance. Check. Interval. Seconds partition check to rebalance cycle time; The default is 300 seconds. Leader. Imbalance. Per. Broker. Percentage identifies each broker ratio imbalance, if more than change ratio, execute re-election broker’s leader. The default is 10%; The ratio is as follows: the broker imbalance rate is equal to the number of leaders of non-preferred copies/the total number of partitions. If a topic has three partitions [0,1,2] and three replicas, normally [0,1,2] each will be a leader copy. In this case 0/3=0%;

The above configurations are all ampersand; Only when both are satisfied can the rebalancing be triggered;

Tuning Suggestion: Set this parameter to false in production environments, considering that leader reelection is costly, may affect performance, and may cause client blocking. Or a longer cycle, such as once a day;

If balancing is off, or if the cycle time is long enough to cause the above problems, kafka-Manager provides a manual rebalancing operation.

Rebalance the Leader Rebalance when the Broker is down. Rebalance the Leader Rebalance after it restarts. Manually trigger the rebalance;

For example, 🌰


  1. Automatic balancing of brokers is turned off firstauto.leader.rebalance.enable = false; And restart them one by one
  2. Take a look at the Leader distribution of a Topic across brokers;

Let’s look at TOPIC TEST3 here;

Broker-0

Broker-1

Broker-2

The Leader distribution at startup completion is as follows;

Broker Leader
0
1
2 0,1,2,3

Because Broker-2 was the first one I created; So the leaders of all partitions are concentrated on this one machine; None of the subsequently started brokers is assigned to the Leader; This situation is obviously unreasonable; So we need to do a rebalancing;

  1. Manually execute the rebalancing policy. Drop down the selected Broker; The purpose of selecting a Broker is to select all topics on that Broker for rebalancing

Let’s look at the Leader situation after rebalancing

Broker Leader
0 2, 3
1 0
2 1

As you can see, broker-0 has two leaders assigned; Automatically reverts to the previous assignment;

Rebalance all topics within the Leader Rebalance all topics within the Leader Rebalance. If all three of your brokers have the same Topic, it doesn’t matter which one you choose


The Broker details

The basic information

Displays the basic information and real-time traffic history of the current Broker. Note that the traffic information shows the current traffic of the Broker; The cluster overview shows the total traffic of the entire physical cluster.

Monitoring information

Multiple indicators are displayed according to the timeline, and indicators are also indicators of the currently selected Broker.

Topic Information (TODO)

Show what topics are currently under the Broker. For more details TODO….

Disk information (TODO)

Displays some disk information for the current Broker; However, this feature can only take effect if it is connected to Didi’s Kafka-Gatway component. At present, the component is for enterprise service, and there is no open source plan. For more information, see TODO….

Partition information

Displays the partition information of the current Broker, lists the Leader and replicas of all topics of the current Broker, and lists the unsynchronized replicas.

The Leader Rebalance module above explains some of this information.

For example, if broker-0 is down, you can see which topics have copies in broker-0, clearly showing which copies are out of sync. TEST2, like the one below, does not have a copy in Broker-0, so its state is synchronized;

The Topic analysis

The basic information about the current Topic of the Broker is shown in the basic information section on the left, but it shows the last minute data and compares all topics.

Let’s simulate sending messages in batches, sending 10,000 messages to TEST2 and TEST3 topics

bin/kafka-producer-perf-test.sh --topic TEST3 --num-records 10000 --record-size 100 --throughput 100  --producer-props bootstrap.servers=xxx:9092,xxx:9092,xxx:9092

bin/kafka-producer-perf-test.sh --topic TEST2 --num-records 10000 --record-size 100 --throughput 100  --producer-props bootstrap.servers=xxx:9092,xxx:9092,xxx:9092
Copy the code

Look at the data on displayThis data shows the Topic activity of the current Broker in the last minute. You can see which Topic is active; The percentage in the figure should be calculated incorrectly, so mention a BUG;

Consumer information

Displays information about all consumer groups under the current Broker. Location indicates that the data is fetched from the Broker (older versions are stored in ZK). Note that this may be empty at startup and will not be retrieved until a minute later when the Consumer task is executed

Region information

The Region list

All regions in the current physical cluster are displayed.

Let’s focus on the parameters aboveEstimated capacity:Many people are confused about this value, and do not know how to come; Let’s look at the source code to see where it comes from; This value calculates the value that can be borne by the current RegionMaximum flow value; For example, the above expression supports a maximum of 360M/s; But this value is actually a very vague estimate, is neededOperation and maintenance management personnelIf not set, the default is 120M/s per Broker.

Operation and maintenance management personnelYou need to have a number of peak traffic that your Broker can handle; After the configuration is complete, you can intuitively know whether the Region can withstand the peak traffic.

Actual traffic: Calculate the actual peak traffic from historical data;

Estimated traffic:Actual traffic + estimated traffic of the newly applied Topic; Explain; For our new applied Topic, no traffic has come in at this time, but we need to reserve a Buffer for this new applied Topic. When we applied for the Topic, we were asked to fill in an estimated peak traffic. But actual traffic in current code = estimated traffic; To optimize

So how do you modify the peak traffic that the Broker can handle?

Click O&M Control > Platform Management > Platform Configuration and fill in the following information

Make key:REGION_CAPACITY_CONFIGConfiguration value Json string; It’s an array


[{
	"clusterId": 4.// ID of the physical cluster
	"duration": 10.// The default value is 10. The default value is 10. // The default value is 10
	"latestTimeUnitMs": 604800000.// Indicates the number of days in which the data is calculated; For example, the default is 7 days; 7 * 24 * 60 * 60 * 1000L
	"maxCapacityUnitB": 125829120 // Estimate the capacity; The default value is 120 x 1024 x 1024. Is 120 m; For a single Broker
}, {
	"clusterId": 5."duration": 10."latestTimeUnitMs": 604800000."maxCapacityUnitB": 125829120
}]
Copy the code

PS: Each of the above configurations is for all brokers in the physical cluster; MaxCapacityUnitB =125829120 for clusterId=4; (120M), then all the Region brokers below the physical cluster; The estimated capacity given is 120M

The calculation above is done every 12 hours;

For this area, the subsequent community should do optimization transformation, or make the estimated capacity can be automatically calculated platform configuration there is not convenient; Or the community will make changes

Logical cluster Information

Logical cluster list

Display information about all logical clusters in the current physical cluster.See creating a logical cluster【KafkaManager 2 】 Cluster access and related concepts

The Controller information

Show the Controller change history and set candidate Controller about Controller

Controller component (Controller) is the core component of Apache Kafka. Its main role is to manage and coordinate the entire Kafka cluster with the help of Apache ZooKeeper. Any Broker in the cluster can act as a controller, but only one Broker can act as a controller and perform its management and coordination duties during runtime

For more details see What is Kafka’s Controller Broker

If the candidate Controller is set, the Controller will be selected from the selected brokers. This may be used if you know which brokers are idle and want them to take on Controller responsibilities;


Current limiting information

This shows all topics in the current physical cluster that are being restricted at this moment.

In our last article, we discussed how to limit the number of streams in kafka and how to limit the number of streams in Kafka.

Check whether the current Topic is restrictedThe current quota for Kafka is limitedKafka quota management (speed limit) mechanism

So what about our current limiting information here? When will it come out?

Then let’s create a scene in which flow restriction occurs in Producer;

1. Set a traffic limiting configuration

		// Add traffic limiting information
sh bin/kafka-configs.sh --bootstrap-server broker1:9092 --alter --add-config 'producer_byte_rate=100,consumer_byte_rate=100' --entity-type clients --entity-name appId_000001_cn.Test2
Copy the code

Broker1 :9092 add a flow limiting configuration for client clientName = Appid_000001_CN.test2; The rate of producers is 100b/s. The rate for the consumer group is 100b/s;

We can also go to ZK to see if the configuration is successful

2. Production messages

@Test
    void contextLoads(a) throws InterruptedException {
            Properties props = new Properties();
            props.put("bootstrap.servers"."broker1:9092");
            props.put("acks"."all");
            props.put("retries".0);
            props.put("batch.size".110);
            props.put("linger.ms".0);
            props.put("client.id"."appId_000001_cn.Test2");
            props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<>(props);
            for(int i = 0; i < 1000; i++){
                // Make a message larger
                byte[] log = new byte[100];
                String slog = new String(log);
                producer.send(new ProducerRecord<String, String>("Test2".0, Integer.toString(i),  slog));
                System.out.println("i="+i);
            }
        try {
            Thread.sleep(62000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producer.close();
    }
Copy the code

The above code indicates that each send 100b messages out, and is sent immediately; Because we set the current limit at 100b/s; Then no problem will be limited;

Note: The client ID is set toappId_000001_cn.Test2; This is the same as the client stream limiting name we used above;

Let’s see what happens after we execute the code;

The current stream-limiting Topic is shown. And so it will not limit the flow of the message;

PS: One thing to note here is that it shows traffic limiting information for a single Topic; We know that Kafka does not currently support traffic limiting for the Topic dimension; Of course, it’s easy to implement a topic-specific flow limit yourself. You just need to make each Topic’s client.id different. For each topic, client.id is configured to limit the flow. The client format is appId_000001_cn.Test2. But it’s a lot of trouble to do it yourself; Don’t recommend doing it yourself; Kafka is a topic-grained system that can limit the number of streams available to the system.

If only the open source version of this feature is not needed (do it yourself trouble is mainly)

But the drip-drip kafka-gatewayIt is supported; but kafka-gatewayIs a commercial service of Didi, not open source currently; kafka-gatewayMany enhancements have been made to native Kafka; Want to use kafka-gatewayOpen source contact Didi official

Column list

Kafka’s soulmate logi-Kafkamanger a cluster access and related concepts

The logi-Kafkamanger is the soulmate of Kafka. The logi-Kafkamanger is the soulmate of Kafka. The Logi-Kafkamanger is the soulmate of Kafka.

Logi-kafkamanger is a soulmate of Kafka

O&m of Logi-Kafkamanger (4) — Cluster O&M (Task Migration and Cluster Online Upgrade)







Didi Open source Logi-Kafkamanager one-stop Kafka monitoring and control platform

Welcome to add a personal wechat to pull you into the development of technology exchange group, group specially-assigned to answer technical questions (please note: technology)