For more content, please follow the wechat public account “Taste of Little Sister”, link to this article

Please take a close look at this diagram, paying particular attention to the points of concern marked. We’ll come back to this picture more than once

background

Can Kafka be used in high-availability businesses? The official answer is yes, the latest version already supports message-queued transactions, but we have questions about its performance. The performance of Kafka can be very high depending on the ACK level configured. This test is performed to find scenarios where Kafka can be used flexibly. The testing also explored a number of lost message scenarios. In contrast to most tests that focus only on the Kafka cluster itself, this test also introduces the business scenario of message loss. The whole solution should be integrated so that the highest level of high availability can be achieved without discrimination.

Test objectives

  • High availability of a cluster and the minimum cluster size, configuration, and restrictions required for high availability
  • Messages not lost, and configurations and conventions needed to meet message not lost
  • The test environment

Broker:

Broker JVM parameter configuration: Xms=8G Xmx=8GCopy the code

client:

8 core 
16G 
Centos 6.8
Copy the code

Test scenarios

High cluster reliability configuration:

zookeeper.connection.timeout.ms=15000
zookeeper.session.timeout.ms=15000
default.replication.factor=3
num.partitions=6
 min.insync.replicas=2
 unclean.leader.election.enable=false
log.flush.interval.ms=1000
Copy the code

ack

acks= all
 retries = 3
 request.timeout.ms=5000
Copy the code

Message size: 1024 bytes


Failover test

The test method

Offline a node to test the recovery time and service level during the failure

The testing process

Set replica.lag.time.max.ms from 10s to 60s (extending the time for observation), then kill Broker 0, select 3 partitions and observe the changes of ISR as follows: Among them, the success rate of joining the team in the second/third stage is damaged:

  • During the second stage, Partition 96/97/98 could not be written, and the success rate of joining the team decreased to 0%.
  • During phase 3, Partition 96 can continue writing, but Partition 97/98 cannot write because writes are waiting for Broker 0 to ack back, but Broker 0 has killed, and the enqueue success rate drops to 33%.

However, during the second/third phase, there was no throughput at all, because the manometry tool kept reporting connection failures and stopped writing.

Cause analysis,

The Kafka Broker Leader is elected by the Controller, and the ISR list is maintained by the leader. The former lease is defined by Controller, while the latter lease is specified by the Broker configuration replica.lag.time.max.ms. Therefore, the shorter duration of the second stage is determined by the lease time of Controller, while the longer duration of the third stage is determined by replica.lag.time.max.ms. When Broker 0 is killed, the former affects the enqueue success rate of 1/3 of the partitions that Broker 0 is the leader, and the latter affects the enqueue success rate of 2/3 of the partitions that Broker 0 is a follower.

Conclusion the HA

During failover, Kafka will be unavailable for about 10 seconds, which is determined by replica.lag.time.max.ms. Therefore, the application program needs to handle the exception information in this case, and set a proper retry times and an escape algorithm.


Pressure test

The test method

Test script:

./kafka-producer-perf-test.sh --topic test003 --num-records 1000000 --record-size 1024 --throughput -1 --producer.config .. /config/producer.propertiesCopy the code

The test results

Concurrent throughput is not limited

[[email protected] bin]# time ./kafka-producer-perf-test.sh --topic ack001 --num-records 1000000 --record-size 1024 --throughput -1 --producer.config .. /config/producer.properties[21:26:57 2017-09-14, 543] WARN the Errorwhilefetching metadata with correlation id 1 : {ack001=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 81112 records sent, 16219.2 Records/SEC (15.84 MB/ SEC), 1416.2 ms avG latency, 1779.0 Max latency. 92070 Records sent, 18414.0 records/ SEC (17.98 MB/ SEC), 1671.7 ms AVG latency, 1821.0 Max latency. 91860 Records Sent, 18368.3 Records/SEC (17.94 MB/ SEC), 1670.3 ms AVG latency, 1958.0 Max latency. 91470 Records sent, 18294.0 Records/SEC (17.87 MB/ SEC), 1672.3 ms AVG latency, 2038.0 Max latency. 91050 Records sent, 18202.7 Records/SEC (17.78 MB/ SEC), 1678.9 ms AVG latency, 2158.0 Max latency. 92670 Records sent, 18534.0 Records/SEC (18.10 MB/ SEC), 1657.6 ms AVG latency, 2223.0 Max latency. 89040 Records sent, 17808.0 Records/SEC (17.39 MB/ SEC), 1715.0 ms latency, 2481.0 Max latency. 86370 Records sent, 17274.0 Records/SEC (16.87 MB/ SEC), 1767.5 ms AVG latency, 2704.0 Max latency. 91290 Records sent, 18254.3 Records/SEC (17.83 MB/ SEC), 1670.2ms AVG latency, 2553.0 Max latency. 92220 Records sent, 18444.0 Records/SEC (18.01 MB/ SEC), 1658.1 ms AVG latency, 2626.0 Max latency. 90240 Records sent, 18048.0 records/ SEC (17.63 MB/ SEC), 1669.9 ms AVG latency, 2733.0 Max latency. 1000000 Records sent, 17671.591150 records/ SEC (17.26 MB/ SEC), 1670.61 ms Max latency, 1764.00 ms Max latency, 1544 ms 50th, 2649 ms 95th, 2722 MS 99th, 2753 ms 99.9th. Real 0m57.409s user 0m14.544s sys 0m2.072sCopy the code

Limit throughput to 1W

[[email protected] bin]# time ./kafka-producer-perf-test.sh --topic ack003 --num-records 1000000 --record-size 1024 --throughput 10000 --producer.config .. /config/producer.properties[the 2017-09-15 10:51:53, 184] WARN the Errorwhilefetching metadata with correlation id 1 : {ack003 = LEADER_NOT_AVAILABLE} workClient (org.apache.kafka.clients.Net) [the 10:51:53 2017-09-15, 295] WARN the Errorwhilefetching metadata with correlation id 4 : {ack003=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 49766 records sent, 9953.2 Records/SEC (9.72 MB/ SEC), 34.9 ms AVG latency, 358.0 Max latency. 50009 Records sent, 10001.8 Records/SEC (9.77 MB/ SEC), 23.9 ms AVG latency, 39.0 Max latency. 50060 Records sent, 10008.0 Records/SEC (9.77 MB/ SEC), 26.9 ms AVg latency, 49.0 Max latency. 49967 Records Sent, 9991.4 Records/SEC (9.76 MB/ SEC), 23.6 ms AVG latency, 38.0 Max latency. 50014 Records sent, 10000.8 Records/SEC (9.77 MB/ SEC), 24.0 ms AVG latency, 51.0 Max latency. 50049 Records Sent, 10007.8 Records/SEC (9.77 MB/ SEC), 23.5 ms AVG latency, 37.0 Max latency. 49978 Records sent, 9995.6 Records/SEC (9.76 MB/ SEC), 23.5 ms AVG latency, 44.0 Max latency. 49803 Records sent, 9958.6 Records/SEC (9.73 MB/ SEC), 23.7 ms AVG latency, 47.0 Max latency. 50229 Records sent, 10045.8 Records/SEC (9.81 MB/ SEC), 23.6 ms AVG latency, 46.0 Max latency. 9996.0 Records/SEC (9.76 MB/ SEC), 23.5 ms AVG latency, 36.0 Max latency. 50061 Records Sent, 10010.2 Records/SEC (9.78 MB/ SEC), 23.6 ms AVG latency, 36.0 Max latency. 49983 Records Sent, 9996.6 Records/SEC (9.76 MB/ SEC), 23.4 ms AVG latency, 37.0 Max latency. 49978 Records sent, 9995.6 Records/SEC (9.76 MB/ SEC), 23.9 ms AVG latency, 55.0 Max latency. 50061 Records sent, 10012.2 Records/SEC (9.78 MB/ SEC), 24.3 ms AVG latency, 55.0 Max latency. 49981 Records sent, 9996.2 Records/SEC (9.76 MB/ SEC), 23.5 ms AVG latency, 42.0 Max latency. 49979 Records sent, 9991.8 Records/SEC (9.76 MB/ SEC), 23.8 ms AVG latency, 39.0 Max latency. 50077 Records sent, 10013.4 Records/SEC (9.78 MB/ SEC), 23.6 ms AVG latency, 41.0 Max latency. 49974 Records sent, 9994.8 Records/SEC (9.76 MB/ SEC), 23.4 ms AVG latency, 36.0 Max latency. 50067 Records Sent, 10011.4 Records/SEC (9.78 MB/ SEC), 24.8 ms AVG latency, 65.0 Max latency. 49963 Records sent, 9992.6 Records/SEC (9.76 MB/ SEC), 23.5 ms AVG latency, 54.0 Max latency. 1000000 Records sent, 9997.300729 records/SEC (9.76 MB/SEC), 24.24 ms avg latency, 358.00 ms Max latency, 23 ms 50 th and 28 th 95, ms and ms 99th, 155 ms 99.9th. Real 1m40.808s user 0m16.620S sys 0m1.260s more... Throughput 5K 1000000 Records sent, 4999.275105 Records/SEC (4.88 MB/ SEC), 22.94 ms avG latency, 127.00 ms Max latency, 50th, 27 ms 95th, 31 ms 99th, 41 ms 99.9th. Throughput 2W 1000000 Records sent, 18990.827430 Records/SEC (18.55 MB/ SEC), 954.74 ms avG latency, 2657.00 ms Max latency, 50th, 2492 ms 95th, 3611 ms 99th, 3650 ms 99.9th. Throughput 3W 1000000 Records sent, 19125.212768 records/ SEC (18.68 MB/ SEC), 1227.07 ms avG latency, 3020.00 ms Max latency, 50th, 2815 ms 95th, 2979 ms 99th, 3011 ms 99.9th.Copy the code

12 partitions with 2.6W throughput

[[email protected] bin]# time ./kafka-producer-perf-test.sh --topic ack001 --num-records 1000000 --record-size 1024 --throughput 26000 --producer.config .. /config/producer.properties129256 Records sent, 25840.9 Records/SEC (25.24 MB/ SEC), 31.9 ms AVG latency, 123.0 Max latency. 129794 Records sent, 25953.6 Records/SEC (25.35 MB/ SEC), 28.6 ms avG latency, 73.0 Max latency. 130152 Records sent, 26025.2 Records/SEC (25.42 MB/ SEC), 28.3 ms AVG latency, 64.0 Max latency. 130278 Records sent, 26045.2 Records/SEC (25.43 MB/ SEC), 28.1 ms avG latency, 55.0 Max latency. 26010.8 Records/SEC (25.40 MB/ SEC), 27.9 ms AVG latency, 45.0 Max latency. 130080 Records sent, 26005.6 Records/SEC (25.40 MB/ SEC), 27.7 ms AVG latency, 41.0 Max latency. 130093 Records Sent, 26013.4 Records/SEC (25.40 MB/ SEC), 74.5 ms AVG latency, 343.0 Max latency. 1000000 Records sent, 25904.051394 records/SEC (25.30 MB/SEC), 38.33 ms avg latency, 343.00 ms Max latency, 28 th 50 ms, 122 ms 95th, 242 ms 99th, 321 ms 99.9th. Real 0m39.395s user 0m12.204s sys 0m1.616sCopy the code

CPU and memory do not change. Network RX/TX :170Mbps/120Mbps, disk IoUtil: 6%. One million pieces of data can be done in two minutes.

Pressure test conclusions

The reasons affecting the submission efficiency are as follows: Number of partitions + timeout duration + message size + throughput

  • No restriction: Ack =all mode, no restriction on throughput, TPS can be maintained at about 2W, the average time is about 1600ms, 99.9% of records can normally submit feedback in about two seconds, the maximum time is more than 5 seconds.

  • Timeout duration: When the timeout is often set to more than 5 seconds, all submissions are successful (ACK). Gradually reduce the timeouts to about 3 seconds, with lots of timeouts coming up. The default value is 30 seconds. Considering the complexity of the network environment, you are advised to set this parameter to 10 seconds. If there is a timeout, the client needs to catch exceptions for special processing.

  • Message size: When the message size is set to 512 bytes, the TPS submitted can be 3W/SEC. When it increases to about 2k, TPS decreases to 9K/s, and the message size is linear with TPS.

  • Flow: The best effect is to reduce competition when the throughput is limited to about 1.3W. The average time is reduced to 24 milliseconds, the maximum latency is just over 300 milliseconds, and the service level is quite high.

  • Number of partitions: Increasing the number of partitions significantly improves the processing capability, but the number of partitions affects the fault recovery time. This test case only applies to the case of 6 partitions. It is proved that when the number of partitions is increased to 12, the processing power is almost doubled, but the performance does not improve significantly as the number continues to increase.

Final conclusion: It is assumed that the network is in good condition and can withstand 1.3W /s message requests in ACK =all mode, timeout 10 seconds, three retries and partition 6. The average write time is less than 30ms and the maximum write time is less than 500ms. To increase TPS, you can increase the partition to 12, which can achieve 2.6 W /s efficient write.

Stacking test

Kafka production and consumption are theoretically unaffected by message pile-up, which takes up disk space. In this case, message pile-up refers to the number of messages in a topic, regardless of whether messages are consumed


conclusion

Kafka uses a time-based SLA(Service level Guarantee) where important messages are kept for three days.

performance

Basic configuration: message 1K size, ACK =all, that is, all replicas are synchronized. To ensure that the message is reliable, all three copies are used.

  • 3 copies, 1 partition: 6K-8K
  • 3 copies, 6 partitions: 1.3W-1.6W
  • 3 copies, 12 partion case: 2.6W-2.8W

Note: on the production side, consider a scenario where a single send and then call future.get() to confirm that the TPS drops sharply below 2k. Be sure to do this, otherwise, use asynchronous commit, callback calls. Compared with 1.6W TPS in ACK mode, the submission in common mode can reach 13W (mainly due to network and I/O bottlenecks and full bandwidth). When throughput is limited to around 1W and ACK is enabled (very much in line with our business characteristics), Kafka is efficient and highly available, taking an average of just 24 milliseconds. The best practice for producers is to set timeout to 10 seconds and retry three times. Consumers are also efficient. The average processing time for six partitions and ACK modes is about 20 milliseconds, depending on the processing capability of consumers.

Kafka message reliability

  • Write 3 copies, turn on ACK =all mode, flush disk every 1 second. A message goes through the Client -> Leader →Replica process. The leader waits for ack responses from all replicas and then sends the ACK to the Client. The whole process is confirmed several times. If the ACK fails, the system tries again. This mode ensures data loss. To achieve this level of messaging, it is important to follow the best practices provided by the architecture group (parameters vary widely between versions of Kafka).
  • There are three modes of messaging. Kafka synchronous sending is At least One mode (version 0.10). On the consumer side, you do idempotent processing. Duplicate messages may occur in the following scenarios: After the production end sends a message to the leader node, the leader node synchronizes the message to all the follower nodes and receives the acknowledgement. At this time, the leader node is down and does not send an ACK to the production end. The production end tries to resend the message. A machine in the follower node is then promoted to leader, and duplicate data is generated.

Capacity expansion, impact of the fault

  • When a single node becomes offline, production consumption is temporarily affected. The recovery time is related to the leader election time and the number of partitions (about 10 seconds isR detection time). ACK mode and retry ensure that data is not lost during faults. Position 2 in the figure above.
  • Capacity expansion means that a node goes online, which does not affect users. However, when the node reaches the available state, it is related to the total amount of lag data (simple network copy process). As a rule of thumb, some messages take longer to pull, but the effect is small. There was no obvious jitter during pressure measurement. It is recommended that the consumer set long timeouts for processing (including asynchronous processing). Position 3 in the figure above.
  • =2 nodes are down (for example, the equipment room is powered off), and services are unavailable. Fault recovery requires two nodes to be synchronized, which is related to the overall data volume. The disk fsync every second. In extreme cases (total downtime), the data will be lost for up to 1 second.


When will data be lost

  • When batch mode is used, the buffer is not gracefully closed when there is data in it, and the data in the buffer is lost. Position 1 above.
  • With Batch mode consumption, messages are pulled and asynchronously processed using a thread pool, at which point consumption data is lost if the thread pool is not gracefully closed. Position 4 above.

risk

  • TPS pressure measurement is for reference only. In actual operation, service may jitter due to network delay, bad disk, high and low peak flow, etc. Both producers and consumers must record all processing failures so that data can be played back in extreme cases.
  • Do not transfer large chunks of unnecessary data in a message. The size of a message has a direct linear effect on service quality. (Please keep message < 2KB)
  • Consumer side consumption, except for idempotent, incorrect use of asynchronous thread pools (such as using unbounded queues), often cause consumer side failure, please be cautious consumption.
  • If you allocate 6 partitions, if you have 7 consumer machines, one of them will be free. Consider kafka’s limitations when designing.
  • Batch commit mode is enabled on the Kafka production side by default. This means that if your producer fails, messages will be lost in the buffer. Make sure: the producer kills the process with “kill -15” to give the service a chance to flush; Also, if your message is important, write it to the log file at the same time. Please weigh the pros and cons before confirming use.