This series of blogs summarizes and shares examples drawn from real business environments, and provides practical guidance on Spark business applications. Stay tuned for this series of blogs. Copyright: This set of Spark business application belongs to the author (Qin Kaixin).

  • Kafka business environment combat – Kafka production environment planning
  • Kafka Business Environment in action – Kafka producer and consumer throughput test
  • Kafka business environment combat – Kafka Producer parameter setting and parameter tuning suggestions
  • Kafka business environment combat – Kafka cluster management important operation instructions operation and maintenance book
  • Kafka cluster Broker parameter Settings and tuning guidelines
  • Kafka Business Environment – Kafka Producer synchronous and asynchronous message sending and transaction idempotency case application
  • Kafka Polling mechanism and consumer group rebalanced zoning strategy analysis
  • Kafka Rebalance and Consumer rebalance
  • [Kafka commercial Environment combat – Kafka copy and ISR synchronization mechanism in depth analysis]
  • Kafka business environment combat – Kafka precise semantic EOS principle in-depth analysis]
  • Kafka message idempotence and transaction support mechanism
  • [Kafka business environment combat – Kafka cluster Controller election and responsibility design ideas architecture detail]
  • Kafka business Environment – Kafka cluster message format V1 version to V2 version smooth transition details.
  • [Kafka Business Environment Combat – In-depth study on Data consistency Guarantee by Kafka Cluster Watermarking and Leader Epoch]
  • Kafka Cluster log file System design and retention mechanism and Compact in-depth study
  • [Kafka Commercial Environment practice – In-depth analysis of Kafka Cluster Consumer Group state machine and Coordinaor management mechanism]
  • [Kafka Commercial Environment practice – Kafka tuning process in throughput, persistence, low latency, availability and other indicators of compromise study]

1 Consumer dominates single threads

Note that this article uses the latest version of the Kafka kernel to analyze the principles. In the new version, each Consumer manages multiple Socket connections through independent threads, that is, communicates with multiple brokers at the same time to read messages in parallel. This is the new technological innovation. Similar to the Linux I/O model or Select NIO model.

2 Poll Why set a timeout parameter

  • Condition:
  • 1: Get enough usable data
  • 2: The waiting time exceeds the specified timeout period.
  • The goal is to make the Consumer main thread periodically “wake up” to do something else. For example, perform regular tasks on a regular basis (such as journaling, writing to libraries, etc.).
  • Get the message, then execute the business logic.

3 Displacement accuracy

  • At least once -> messages are processed repeatedly
  • At most once -> Messages are lost, but not processed repeatedly.
  • Precision one -> must be processed, and only processed once.

4 Displacement role <- (leave a section to analyze HW and LEO)

  • Last committed offset
  • Current position: Current position
  • Water level: High watermark
  • Log terminal Offset: (Log End Offset)

5 Displacement Management

The consumer’s displacement commit will eventually be submitted to the Group coordinator, but it is important to restate that the group coordinator is responsible for managing all consumer instances. Coordinators also run on the broker (by electing a broker), but note that the new version of a coordinator is only responsible for group management.

However, the specific reblance partition allocation strategy has been handed over to the Consumer client. This decouples group management from partition allocation.

Advantages of decentralization:

  • If you need to allocate, it seems like you need to restart the entire Kafka cluster.
  • Partition allocation policies can be customized on the Consumer side.
  • For each consumer shift commit, a message is appended to the partition corresponding to the _consumer_offsets. If a consumer commits multiple shifts for the same topic and same partition of the same group, obviously we only care about the last committed shift.

6 Reblance trigger conditions

  • Group subscriptions change, such as regular expression based subscriptions, when a match is made to a new topic creation.
  • The number of topic partitions for the group was changed to increase the number of subscription-based topic partitions through the command line script.
  • Group members change: new members join the group or leave the group.

7 reblance Allocation policy

7.1 Range Partition Allocation Policy

For example: a partition has ten partitions (0,1,2…..) The same group has three consumers whose ConsumerID is A, B,c:

  • Consumer a allocates the corresponding partition number [0,4], namely the first four partitions 0, 1, 2, and 3

  • Consumer B allocates corresponding partitions 4, 5, and 6

  • Consumer c allocates the last three partitions (7,8,9).

    class RangeAssignor() extends PartitionAssignor with Logging {

    def assign(ctx: AssignmentContext) = { val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId] val partitionAssignment = new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) for (topic <- ctx.myTopicThreadIds.keySet) { val curConsumers = ctx.consumersForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) val nPartsPerConsumer = curPartitions.size / curConsumers.size val nConsumersWithExtraPart = curPartitions.size % curConsumers.size info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers) for (consumerThreadId <- curConsumers) { val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) /** * Range-partition the sorted partitions to consumers for better locality. * The first few consumers pick up an extra partition, if any. */ if (nParts <= 0) warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) else { for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) info(consumerThreadId +  " attempting to claim partition " + partition) // record the partition ownership decision val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer) assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId) } } } }Copy the code

    Source code analysis is as follows:

    NPartsPerConsumer =10/3 =3 nConsumersWithExtraPart=10%3 =1 A: myConsumerPosition= curConsumers.indexof(a) =0 startPart= 3*0+0.min(1) = 0 nParts = 3+(if (0 + 1 > 1) 0 else 1)=3+1=4 b:  myConsumerPosition=1 c: myConsumerPositionCopy the code

7.2 Round-robin Zone Allocation Policy

If all consumers in the same consumer group have the same subscription information, the RoundRobinAssignor policy allocates partitions evenly. For example: Suppose there are two consumers C0 and C1 in the consumer group, both of which subscribe to topic0 and topic1, and each of which has three partitions. After hashCode sorting, the order is: Topic0_0, TOPIC0_1, TOPIC0_2, TOPIC1_0, TOPIC1_1, TopIC1_2. The final distribution result is:

Consumer0: TOPIC0_0, TOPIC0_2, TOPIC1_1

Customer: TopIC0_1, TOPIC1_0, TopIC1_2

To use the RoundRobin policy, the following two conditions must be met:

  • Num. Streams must be equal for all consumers in the same Consumer Group.
  • Each consumer must subscribe to the same topic.

So assume num. Streams = 2 for the two consumers mentioned earlier. The RoundRobin policy works as follows: The partitions of all topics are grouped into a TopicAndPartition list, which is then sorted by hashCode, and finally allocated to different consumer threads in a round-robin style.

val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
	  info("Consumer %s rebalancing the following partitions for topic %s: %s"
	       .format(ctx.consumerId, topic, partitions))
	  partitions.map(partition => {
	    TopicAndPartition(topic, partition)
	  })
	}.toSeq.sortWith((topicPartition1, topicPartition2) => {
	  /*
	   * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
	   * up on one consumer (if it has a high enough stream count).
	   */
	  topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
	})
Copy the code

7.3 StickyAssignor Partition Allocation Policies (Excerpt)

  • Partition distribution should be as uniform as possible;
  • Partitions are allocated as much as possible as they were last allocated. When the two conflict, the first goal takes precedence over the second. Given these two goals, StickyAssignor is much more complex to implement than the RangeAssignor or RoundRobinAssignor.

Suppose there are three consumers in the consumer group: C0, C1, and C2, each of which subscribes to four topics: T0, T1, T2, and T3, and each topic has two partitions, that is, the entire consumer group subscribes to eight partitions: T0P0, T0P1, T1P0, T1P1, T2P0, T2P1, T3P0, and T3P1. The final distribution result is as follows:

Consumer C0: T0P0, T1P1, T3P0 Consumer C1: T0P1, T2P0, T3P1 Consumer C2: T1P0, T2P1Copy the code

Assuming that consumer C1 is out of the consumer group at this point, the consumer group will perform a rebalancing operation and the consumption partition will be reallocated. If RoundRobinAssignor is used, the assignment results are as follows:

Consumer C0: T0P0, T1P0, T2P0, T3P0 Consumer C2: T0P1, T1P1, T2P1, T3P1Copy the code

The RoundRobinAssignor policy repolls the assignment based on consumers C0 and C2. If StickyAssignor is used, the result is:

Consumer C0: T0P0, T1P1, T3P0, T2P0 Consumer C2: T1P0, T2P1, T0P1, T3P1Copy the code

It can be seen that the allocation result retains all the allocation results of the previous allocation for consumers C0 and C2, and distributes the “burden” of the original consumer C1 to the remaining two consumers C0 and C2, and the final allocation of C0 and C2 remains balanced.

If partition reallocation occurs, it is possible that the previous consumer and the newly assigned consumer are not the same for the same partition, and that the halfway processing of the previous consumer has to be repeated in the newly assigned consumer, which is obviously a waste of system resources. StickyAssignor, like the sticky in its name, makes the allocation policy sticky and tries to make two assignments as same as possible, thus reducing the loss of system resources and other exceptions.

7 Reblance generation

The main function is to prevent invalid offset submission. The reason is that if the previous consumer member submits the offset late for some reason and is kicked out of the group, then after the new group member partition is allocated, If the old consumer submits the old offset again, there will be a problem. So using Reblance Generation, the old requests will be denied.

8 reblance cleaning operation

Before each reblance operation, it will check whether the user has set the automatic submission displacement, if so, it will help the user to submit. If not set, the user’s submitter is called back in the listener.

9 summary

This article read a lot of information, very little, spend a lot of time, it is not easy, the author has always insisted on the original, of course, also draw lessons from the classic case.

Qin Kaixin in Shenzhen 2018 1:30