This series of blogs summarizes and shares examples drawn from real business environments, and provides practical guidance on Spark business applications. Stay tuned for this series of blogs. Copyright: This set of Spark business application belongs to the author (Qin Kaixin).

  • Qin Kaixin technology community – big data business combat series complete catalogue
  • Kafka business environment combat – Kafka production environment planning
  • Kafka Business Environment in action – Kafka producer and consumer throughput test
  • Kafka business environment combat – Kafka Producer parameter setting and parameter tuning suggestions
  • Kafka cluster Broker parameter Settings and tuning guidelines

1 Distributed streaming platform

Apache Kafka® is a distributed streaming platform. What exactly does that mean? A streaming platform has three key capabilities: - Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. - Store streams of records in a fault-tolerant durable way. -  Process streams of records as they occur. Kafka is generally used for two broad classes of applications: - Building real-time streaming data pipelines that reliably get data between systems or applications - Building real-time streaming applications that transform or react to the streams of data To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up. First a few concepts: - Kafka is run as a cluster on one or more servers that can span multiple datacenters. - The Kafka cluster stores streams of records in categories called topics. - Each record consists of a key, a value, and a timestamp.Copy the code

2 Kafka as a Storage System

Any message queue that allows publishing messages decoupled from consuming them is effectively acting as a storage system for the in-flight messages. What is different about Kafka is that it is a very good storage system. - Data written to Kafka is written to disk and replicated for fault-tolerance. Kafka allows producers to wait on acknowledgement so that a write isn't considered complete until it is fully replicated and guaranteed to persist even if The server written to fails. - The disk structures Kafka uses scale well, Kafka will perform the same whether you have 50 KB or 50 TB of persistent data on the server. - As a result of taking storage seriously and allowing the clients to control their read position, you can think of Kafka as a kind of special purpose distributed filesystem dedicated to high-performance, low-latency commit log storage, replication, and propagation.Copy the code

3 The secret of High throughput kafka

  • A user program that sends file content to the network works in user space. Files and network sockets are hardware resources, and there is a kernel space between them. Therefore, within the operating system, the whole process is:

After Linux kernel2.2, a system call mechanism called “zero-copy” was introduced, which skips the copy of the “user buffer” and creates a direct mapping between disk space and memory. Data is no longer copied to the “user buffer”.

  • Kafka’s queue topic is divided into multiple partitions, and each partition is divided into multiple segments. Therefore, a message in a queue is stored in N fragment files. Each file operation is performed on a small file, increasing the parallel processing capability

  • Kafka allows messages to be sent in batches. Messages are cached in memory and then sent in batches in a single request. For example: You can specify that the cached messages are sent when they reach a certain number, or that the cached messages are sent after a fixed amount of time, such as 100 messages, or every 5 seconds. This policy greatly reduces the number of I/ OS on the server.
  • Kafka also supports compression of message sets. Producer can compress message sets using GZIP, Snappy or LZ4 formats. Compression reduces the amount of data to be transmitted and reduces the pressure on network transmission.

4 Kafka Cluster Broker global parameter Settings


  • broker. id

A unique integer that identifies each broker and does not conflict with other brokers. It is recommended to start at 0.


  • Log.dirs <= Increased throughput

Ensure that the directory has large disk space. If multiple directories need to be specified, separate them with commas (,), for example,/xin/ kafka1,/xin/kafka2. The advantage of this is that Kafka tries to store partition data evenly across multiple directories. If multiple disks are mounted, multiple heads will write simultaneously. A very strong increase in throughput.


  • zookeeper.connect

This parameter has no default value and must be configured. This parameter can be a list of comma-separated values, such as zk1:2181, zk2:2181, zk3:2181 / kafka. Note that /kafka at the end, which is the chroot of ZooKeeper, is optional. If not specified, the root path of ZooKeeper is used by default.


  • listeners

Protocol configurations include PLAINTEXT, SSL, SASL_SSL, etc. The format is [protocol]://[host name]:[port],[[protocol]://[host name]:[port]]. This parameter is a listening port developed by Brocker end for clients. Recommended configurations:

PLAINTEXT: / / hostname: port (not enable security certification) SSL: / / hostname: port (enable security certification)Copy the code

  • Unclean. Leader. Election. Enable < = data integrity assurance

Solve the problem that all ISR replicas are empty and the leader breaks down again. How to choose the leader? As of Kafka 1.0.0, this parameter defaults to false, indicating that brokers that are not outside the ISR replica set are not allowed to be selected. Kafka officially chose the latter for its high availability and data integrity.


  • delete.topic.enable

Not to mention, whether to allow topic deletion, in view of 0.9.0.0 added ACL mechanism permission mechanism, misoperation basically does not exist.


  • The log. The retention. {pump | minutes | ms} < = time dimension

The default retention mechanism is 7 days, followed by minutes and hours. How to judge:

New: Judge based on the timestamp in the message. Old version: Compares log files based on the latest modification time.


  • Log.retention. Bytes <= spatial dimension

Kafka periodically deletes log files whose size exceeds this value. The default is -1, which means Kafka never deletes logs based on size


  • Min.insync.replicas <= Is used with acks=-1

Persistence level for the minimum number of replicas needed for synchronization. Only meaningful if acks=all(or -1). Min.insync.replicas specifies the minimum number of copies that must be answered for write requests. If you can’t meet, the producer will be thrown NotEnoughReplicas or NotEnoughReplicasAfterAppend abnormalities. This parameter is used for better message persistence.

Examples are as follows:

5 brokers ACK =-1 min.insync.replicas = 3

This means that at least three replicas need to be synchronized before the Broker can serve, or an exception will be thrown. If three brokers break down, an error will be reported even if the remaining two brokers have completed synchronization and ack =-1 is satisfied.


  • Num.net work.threads <= Number of request forwarding threads

The default value is 3, which forwards requests from brokers and clients. Emphasis here just forwarding, real environment, need to monitor NetWorkerProcessorAvgIdlePercent JMX index, if the index is lower than 0.3, the like value is recommended.


  • Num.io. Threads <= Actual number of threads to process

The default is 8, meaning that the broker has eight threads polling for incoming network requests and processing them in real time.


  • message.max.bytes

The maximum message size that the broker can receive. The default is 977KB. Therefore, note that the production environment should adjust this value higher.

4 Topic-level kafka Broker parameters

  • delete.topic.enable
  • message.max.bytes
  • log.retention.bytes

5 Set OS parameters

  • OS page cache flush Time <= Improves throughput

The default is 5 seconds, which is too short an interval. Properly increasing this value can greatly improve the performance of OS physical write operations. LinkedIn set it to 2 minutes to improve throughput.

  • File System Selection

The official test write time for the XFS file system was 160 seconds, and about 250 milliseconds for Ext4. The XFS file system is recommended for production environments.

  • OS file descriptor restrictions

The maximum number of open file descriptors in an OS is limited. For example, a kafka cluster has three replicas and 50 partitions. If each partition has a file size of 10 gb and log segments within the partition are 1GB, a Broker needs to maintain about 1500 file descriptors. So set as needed:

ulimit -n 100000
Copy the code
  • OS buffer Settings (not sure)

conclusion

This machine based on the Broker parameters detailed explanation, have questions, welcome to leave a message.

Qin Kaixin in Shenzhen 2018