This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from the real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Looking forward to joining the most combative team in the IOT era. QQ email address: [email protected], if there is any academic exchange, please feel free to contact.

The profile

  • performance
    • Throughput: How many bytes (or messages) a broker or clients application can process per second
    • Delay: Usually refers to the time between Producer sending a persistent message to the broker.
  • Availability: The probability or ratio of time that a system or component is up and running. The industry generally uses N nines to quantify availability. For example, four nines per year represent 53 minutes (365* 24 * 60 * 0.01%=53 minutes).
  • Persistence: Committed messages need to be persisted to the physical log of the underlying file system on the Broker side and not lost.

1 Kafka infrastructure optimization

  • Disk capacity: The first consideration is the total disk capacity of the messages that need to be saved and the disk space that each broker can provide. If a Kafka cluster needs to hold 10 TERabytes of data and a single broker can store 2 terabytes, the minimum Kafka cluster size is 5 brokers. In addition, if the copy parameter is enabled, the corresponding storage space needs to be at least doubled (depending on the copy parameter). This means that a corresponding Kafka cluster requires at least 10 brokers.

  • When a file is accessed, created, or modified, the file system records the time stamps of the file, such as file creation time (Ctime), last modified time (mtime), and last accessed time (atime). By default, atime updates have a read, which results in a lot of disk reads and writes, whereas Atime is completely useless for Kafka.

      mount -o noatime
    Copy the code
  • The vast majority of software running on Linux is built and tested on EXT4, making it more compatible than other file systems.

  • As a high-performance 64-bit Journaling File System, XFS shows high performance, high scalability, and is especially suitable for production servers, especially for large file (30+GB) operations. XFS is a good choice for many storage applications.

  • Computer memory is divided into virtual memory and physical memory. Physical memory is real memory, virtual memory is disk instead of memory. The swap mechanism is used to load and replace disks into physical memory. The disks used in this mechanism are called swap disks. When writing a file, Linux first writes data to unused memory, called the page cache. Linux then searches the page cache first, or the hard disk if it cannot find the page cache. When the physical memory usage reaches a certain percentage, Linux uses swap, using disks as virtual memory. Run cat /proc/sys/vm/swappiness to view swap parameters. This parameter indicates the percentage of swap disks in virtual memory. 0 indicates the maximum memory usage. 100 indicates the maximum swap disk usage. The default value is 60. When the physical memory usage reaches 40%, frequent swap operations will affect system performance. Therefore, you are advised to set vm.swappiness to 1. Finally, I set it to 10, because the memory of our machine is relatively small, only 40G, setting too small, may affect the use of virtual memory.

    Temporary change: sudo sysctl vm.swappiness=N Permanent change (/etc/sysctl.conf) : vm.swappiness=NCopy the code

2 Kafka JVM Settings

  • PermGen space: Permanent Generation space refers to the Permanent memory storage area. Why does memory overflow occur? This section is used to store Class and Meta information. Classes are loaded into the PermGen space. It’s different from the Heap area where Instance is stored, so if your APP loads a lot of classes, you’re likely to get a PermGen space error.

  • The G1 algorithm divides the heap into regions, which are still part of the generational collector. However, some of these areas contain the new generation, whose garbage collection still copies live objects to older generations or Survivor Spaces by suspending all application threads. The old era is also divided into regions, and the G1 collector cleans up by copying objects from one region to another. This means that, under normal processing, G1 compacts the heap (at least part of it) so that there is no CMS memory fragmentation problem.

  • There is also a special region in G1 called the Humongous region. If an object occupies more than 50% of its partition capacity, the G1 collector considers it a giant object. These giant objects, by default, are allocated directly to the tenured generation, but if it is a short-term giant object, it will have a negative impact on the garbage collector. To solve this problem, G1 has a Humongous zone, which is dedicated to storing giant objects. If an H block does not fit a large object, G1 looks for contiguous H partitions to store it. Sometimes you have to start the Full GC in order to find consecutive H regions.

  • G1 uses the Region concept to divide memory into memory partitions of equal size. When reclaiming memory, the system reclaims the memory by Region. Surviving objects are copied to another free partition. Since they all operate in units of equal size, G1 is a natural compression scheme (local compression);

  • G1 is also a generational collector, but there is no physical distinction between young and old generations for the entire memory partition, nor does it require a completely separate survivor(to space) heap to prepare for replication. G1 has only logical generational concepts, or the possibility that each partition can switch back and forth between generations as G1 runs;

  • The collection of G1 is all STW, but the collection boundary between the young generation and the old generation is blurred, and a mixed collection method is adopted. That is, each collection can either collect only young generation partitions (young generation collection) or include part of old generation partitions (mixed collection) along with the young generation, so that even when heap memory is large, the collection scope can be limited and pauses can be reduced.

  • The size of a Region in the heap memory can be specified using the -xx :G1HeapRegionSize parameter. The Region size can be 1M, 2 m, 4M, 8M, 16M, or 32M. If G1HeapRegionSize is the default value, The practical size of Region is calculated during heap initialization. By default, the heap memory is evenly divided into 2048 parts, and finally a reasonable size is obtained.

  • JVM 8 metaSpace was born: However, the biggest difference between a metaSpace and a permanent generation is that the metaSpace is not in the virtual machine, but uses local memory. Therefore, by default, the size of the dimension is only restricted by local memory, but can pass the following parameters to specify the size of the dimension: – XX: MetaspaceSize, initial size, to achieve the value will trigger the uninstall the types of garbage collection, at the same time the GC can adjust the value: if released a lot of space, is appropriate to reduce the value. If very little space is freed, increase this value appropriately until MaxMetaspaceSize is exceeded.

  • -xx :MinMetaspaceFreeRatio, the minimum percentage of Metaspace free space capacity after GC, reducing the amount of garbage collected as a result of allocating space. -xx :MaxMetaspaceFreeRatio, after GC, Maximum Metaspace percentage of free space capacity, reduced for garbage collection resulting from free space

  • XX:MaxGCPauseMillis=n: Set the maximum GC pause time (target). This is a soft goal, and the JVM tries to achieve it.

  • InitiatingHeapOccupancyPercent: the whole stack using achieve what percentage of time, start the GC cycle. G1 uses this value to determine whether to trigger a GC cycle based on the overall heap, not just one generation. 0 means GC is always present, and the default value is 45 (45% slow or occupied).

  • -xx :G1NewSizePercent Specifies the minimum value of the new generation. The default value is 5%

  • -xx :G1MaxNewSizePercent Specifies the maximum value of the new generation. The default value is 60%

  • MetaspaceSize: This JVM parameter is the minimum initialization threshold that triggers FullGC when Metaspace expands.

    # export JAVA_HOME=/usr/ Java /jdk1.8.0_51 # export KAFKA_HEAP_OPTS=" -xmx6g-xMS6g-xx :MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 "Copy the code
  • This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from the real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Looking forward to joining the most combative team in the IOT era. QQ email address: [email protected], if there is any academic exchange, please feel free to contact.

3 File tuning

  • If the “too many Files open” error occurs, you need to tune the maximum file deployer limit for the machine on which the Broker is located. The number of file handles can be calculated as follows:

    Maximum number of possible partitions on the broker * (average amount of data per partition/Average log segment size +3) where 3 represents the number of index files. Assume that there are 20 partitions, the total amount of data in split partitions is 100GB, and the size of each log segment is 1GB. Then the maximum file deploiter size on the machine should be: 20 x (100/1+3) =2060 Therefore, this parameter must be large enough. For example, 100000Copy the code
  • If a “Java. Lang. OutOfMemoryError: Map failed serious errors, the main reason is that a large number of create topic will drain operating system memory, a user can adjust the vm, Max. The Map. The count parameter, the specific method is as follows:

    /sbin/sysctl -w vm. Max_map_count = N. The default value of this parameter is 65535.Copy the code

4 the throughput

The Broker end;

  • Increase num.replica.fetchers appropriately, but do not exceed the number of CPU cores, which controls the maximum number of threads from which the follower replica at the broker receives messages from the leader replica. The default value is 1, indicating that the follower replica uses only one thread to pull the latest messages from the leader in real time. For the producer whose acks=all is set, the main delay may be delayed in the synchronization between the followers and the leader. Therefore, increasing this value can shorten the synchronization time and indirectly improve the TPS on the producer end.
  • Tune GC to avoid frequent Full GC. Older versions relied too heavily on Zookeeper to indicate that the Consumer was alive. If GC takes too long, the Zookeeper session will expire and Kafka will rebalance the group immediately. The dependency on Zookeeper has been deprecated on the new release.

The Producer side:

  • Add batch.size appropriately, such as 100-512KB.
  • If the Producer sends messages in batches, for example, 10-100 ms, a larger batch size can encapsulate more messages into the same request, thus reducing the total number of requests sent to the broker, which can reduce the load of the Producer. It also reduces the CPU request processing overhead on the broker side. The larger linger. Ms makes the producer wait longer before sending messages, so that more messages can be cached to fill the batch, thus improving TPS on the whole. But the delay has definitely increased.
  • Set compression type: compression. Type =lz4. Currently, the supported compression methods are: GZIP, Snappy, and LZ4.
  • Acks = 0 or 1
  • retries=0
  • Buffer. memory is increased if multiple threads share Produer or if there are many partitions. Because each partition takes up a batch.size.

The consumer end

  • Multiple Consumer instances are used to consume multi-partitioned data, and these instances share the same group.ID.
  • Increments of fetch.min.bytes, such as 10000, indirectly affect TPS by representing the minimum amount of data returned by the broker of each leader copy. By increasing this value, Kafka fills more data into the response of each fetch request.

5 paradox exists (The more partitions, the higher TPS)

  • The basic unit of parallelism in Kafka is partitions. Producer designs the ability to send messages to multiple partitions simultaneously, so that these messages can be written to multiple brokers and consumed by multiple consumers. In general, the more partitions, the higher the TPS
  • The more partitions there are, the more buffers will be used, since buffers are partition-grained, so the Server/clients side will use more memory.
  • Each partition has its own directory in the underlying file system. In addition to the three index files, it also stores log segment files, which occupy a large number of file handles.
  • Each partition has several copies stored on different brokers. When the broker fails, the Controller is required to process the leader change request on a single thread. Crazy Alexander.
  • This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from the real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Looking forward to joining the most combative team in the IOT era. QQ email address: [email protected], if there is any academic exchange, please feel free to contact.

7 summary

Num. Replica. Fetchers is a new player, you can have a good try, acks focus, other rules. A good article is not easy to come, respect the original, declined to reprint, thank you!

This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from the real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Looking forward to joining the most combative team in the IOT era. QQ email address: [email protected], if there is any academic exchange, please feel free to contact.

Qin Kaixin in Shenzhen 201812032355