This series of blogs summarizes and shares examples drawn from real business environments, and provides practical guidance on Spark business applications. Stay tuned for this series of blogs. Copyright: This set of Spark business application belongs to the author (Qin Kaixin).

  • Kafka business environment combat – Kafka production environment planning
  • Kafka Business Environment in action – Kafka producer and consumer throughput test
  • Kafka business environment combat – Kafka Producer parameter setting and parameter tuning suggestions
  • Kafka business environment combat – Kafka cluster management important operation instructions operation and maintenance book
  • Kafka cluster Broker parameter Settings and tuning guidelines
  • Kafka Business Environment – Kafka Producer synchronous and asynchronous message sending and transaction idempotency case application
  • Kafka business Environment combat – Kafka Consumer multiple consumption model case application combat
  • Kafka business environment combat – Kafka Consumer parameter Settings and parameter tuning recommendations
  • [Kafka Business Environment in action — Kafka shift submission mechanism and Rebalancing Strategy for Consumer Groups]
  • [Kafka Business Environment practice – In-depth analysis of kafka message Poll mechanism principle]
  • [Kafka commercial Environment combat – Kafka copy and ISR synchronization mechanism in depth analysis]
  • Kafka business environment combat – Kafka precise semantic EOS principle in-depth analysis]
  • Kafka message idempotence and transaction support mechanism
  • [Kafka business environment combat – Kafka cluster Controller election and responsibility design ideas architecture detail]
  • Kafka business Environment – Kafka cluster message format V1 version to V2 version smooth transition details.
  • [Kafka Business Environment Combat – In-depth study on Data consistency Guarantee by Kafka Cluster Watermarking and Leader Epoch]
  • [Kafka Commercial Environment practice – Kafka tuning process in throughput, persistence, low latency, availability and other indicators of compromise study]

1 Message reception -> Based on Consumer Group

Consumer Groups are primarily used to implement a highly scalable, fault-tolerant Consumer mechanism. Therefore, message reception is based on the Consumer Group. Multiple Consumer instances in a group can read Kafka messages at the same time. Only one Consumer can consume a message at a time, and if a Consumer “dies”, The Consumer Group immediately transfers the broken Consumer’s partitions to other consumers. So that the Consumer Group can work normally.

2 Displacement save -> Based on Consumer Group

Oddly enough, the shift saving is based on the Consumer Group, with the introduction of checkpoint patterns that periodically persist offsets.

3 Shift commit -> Discard ZooKeeper

Consumers report their progress to the Kafka cluster on a regular basis, a process called displacement submission. This process has discarded Zookeeper because Zookeeper is only a coordination service component and cannot be used as a storage component, and high concurrent reads are bound to strain Zk.

  • The new version of shift commit maintains an internal Topic(_consumer_offsets) within Kafka.
  • Under the Kafka internal log directory, there are a total of 50 folders, each containing log files and index files. Log files are mainly k-V structures, (group. Id,topic, partition number).
  • Assuming there are many consumer and consumergroups on the line, by doing a modular Hash on group.id, the 50 folders can be spread out and the submission pressure can be offset simultaneously.

4 Official Case

4.1 Automatic submission shift

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
Copy the code

4.2 Manual submission of displacement

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); }}Copy the code

5 Kafka Consumer Settings


  • Consumer.poll (1000) Important parameters
  • The Poll method of the new version of Consumer uses a mechanism similar to Select I/O, so all related events (including reblance, message fetch, etc.) occur in an event loop.
  • 1000 is a timeout, and consumer.poll(1000) returns ConsumerRecords

    Records as soon as enough data is retrieved (parameter Settings).
    ,>
  • If you don’t get enough data, it blocks for 1000ms, but returns after 1000ms.

  • Session.timeout. ms <= Time when coordinator detection fails
  • The default value is 10s
  • This parameter is the interval at which the Consumer Group actively detects a crash (comSummer within the Group). If the parameter is set to 10 minutes, the Group coordinator of a Consumer Group may need 10 minutes to feel the coordinator. That’s a long time.

  • Max.poll.interval. ms <= Maximum time for processing logic
  • This parameter has been added since version 0.10.1.0 and may not be visible in many places. This parameter needs to be set based on the actual service processing time. If a Consumer fails to process the service, it is kicked out of the Consumer Group.
  • Note: If the average service processing logic is 1 minute, max.poll.interval. ms needs to be set to a little more than 1 minute, but session.timeout. ms can be set to a little less (such as 10s) to quickly detect Consumer crashes.

  • auto.offset.reset
  • This attribute specifies the consumers read a no offset in the offset is invalid (consumer failure for a long time the current offset is outdated and deleted) under the condition of the partition, how should be handled, the default value is the latest, also is to read the data from the latest record (after the consumer generated record), another value is the earliest, This means that in the case of an invalid offset, the consumer reads the data from the starting position.

  • enable.auto.commit
  • For semantics that are accurate to one time, it is best to commit the shift manually

  • fetch.max.bytes
  • The maximum number of messages that fetch data at a time.

  • Max.poll. records <= throughput
  • The maximum number of messages returned by a single poll call, which can be increased appropriately if the processing logic is light.
  • The number of data strips polled from Kafka at a time, Max. Poll. records, which must be processed within session.timeout.ms
  • The default value is 500

  • Heartbeat. Interval. Ms <=
  • Heartbeat The heartbeat is used for communication and returns a response in time. This time interval really can’t come as soon as possible. Because once reblance appears, the new allocation scheme or the command to rejoin the group will be put into the heartbeat response.

  • Connection.max-idle. ms <= Socket connection
  • Kafka periodically closes idle Socket connections. The default is 9 minutes. If you do not care about the resource cost, you are advised to set these parameters to -1, that is, do not close these idle connections.

  • request. timeout. ms
  • This configuration controls the maximum wait time for a request response. If no response is received within the timeout period, Kafka either resends the message or sets it to failure if the number of retries exceeds.
  • The maximum waiting time for a message to be sent. The time must be longer than session.timeout.ms

  • fetch.min.bytes
  • The minimum amount of data that the server sends to the consumer, otherwise it waits until the specified size is met. The default value is 1.

  • fetch.wait.max.ms
  • Bytes Indicates the maximum time to wait for a request from the consumer if the fetch

  • 0.11 new features
  • Rebalance the empty consumption group, which is mainly configured in the server.properties file
  • Group. The initial. Rebalance. Delay. Ms < = the cow force, I kafka, prevent members to rebalance request should be immediately after opening
  • For users, this improved the most direct effect is a new broker configuration: group. Initial. Rebalance. Delay. Ms,
  • The default is 3 seconds.
  • The coordinator delays the connection of the empty consumer group to the rebalance that should start immediately after receiving a member’s join request. In practice, if you expect all your consumer group members to join within 10s, you can set this parameter to =10000.

6 line pit mining

org.apache.kafka.clients.consumer.CommitFailedException:
 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. 
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. [com.bonc.framework.server.kafka.consumer.ConsumerLoop]
Copy the code
Based on the latest version 10, note that session.timeout. ms is separated from max.poll.interval.ms.
  • It can be found that frequent reblance is accompanied by repeated consumption, which is a very serious problem, caused by excessive processing logic and too small max-poll.interval.ms. This occurs because the poll () loop takes too long and there is a processing timeout. In this case, you only need to increase max-poll.interval. ms and decrease max-poll. records, and set request.timeout. ms to a value greater than max-poll.interval. ms

7 summary

The optimization will continue and temporarily focus on request.timeout. ms, max.poll.interval. ms, and max.poll.records to avoid frequent removal of consumers from the Consumer group due to excessive processing logic.

Qin Kaixin in Shenzhen