This is the 24th day of my participation in the August More Text Challenge

Kafka operation and maintenance management and control platform is more in line with the operation habits of Chinese people, more powerful management and control ability, more efficient problem location ability, more convenient cluster operation and maintenance ability, more professional resource governance, more friendly operation and maintenance ecology,


Hi, I’m Shi Zhenzhen ~~~~

Today this article, to share with you a recent look at kafka source code, puzzling me for a few days of doubt, for everyone to think and discuss, to determine whether it is a Bug welcome to explore!

This “Bug” occurs when a partition copy is allocated. To give you a better understanding of the partition allocation rules in Kafka, I will give you a detailed explanation of the partition allocation rules.

There are three places where Kafka needs to perform partition copy allocation calculations

  1. Topic creation
  2. During Zone Expansion
  3. “Partition copy redistribution”

Copy allocation mode

Principles for copy allocation:

  1. Distribute copies equally across all brokers.
  2. Multiple copies of partitions should be assigned to different brokers.
  3. Copies of partitions should be allocated to different racks if all brokers have rack information.

We will not analyze the case with racks here for the sake of simplicity

No matter what the allocation rule is, I end up calling this method, and to analyze the allocation, I add some logs, okayFrom this allocation method, we can see that there are several variables that affect the way the final allocation is made

  1. Broker List The order of
  2. Start randomly assigning brokerIDSstartIndex
  3. The offset between the first copy and the second copynextReplicaShift

Let’s analyze the overall allocation rules by creating a Topic scenario.

Create Topic partition assignments

Do you know what Kafka does when creating a Topic? (With video)

Let’s look at the allocation of one copy

Start 5 brokers and create 1 Topic with 10 partitions and 1 copy

Single copy allocation

The distribution can be represented in the following figure

The initial random index is 2, which means BrokerId= broker-4; The first copy P1-1 (Leader) is allocated from it and subsequent allocations are traversed evenly according to BrokerList, so that the Leader copy of each partition is evenly allocated to the different brokers. The newxtReplicaShit parameter is not used because of the single-copy allocation;

Multiple copy allocation

Start 5 brokers, create a Topic with 10 partitions and 3 copies; (Same as above, but change the number of copies to 3; Create a new Topic = Test_Topic)

Initial random startIndex: 2 currentpartitionid: 0; Start random nextReplicaShift: 4; brokerArray:ArrayBuffer(0, 1, 4, 2, 3) (p-0,ArrayBuffer(4, 2, 3)) (p-1,ArrayBuffer(2, 3, 0)) (p-2,ArrayBuffer(3, 0, 1)) (p-3,ArrayBuffer(0, 1, 4)) (P-4,ArrayBuffer(1, 4, 2)) nextReplicaShift:5 (P-5,ArrayBuffer(4, 3, 4)) 0)) (p-6,ArrayBuffer(2, 0, 1)) (p-7,ArrayBuffer(3, 1, 4)) (p-8,ArrayBuffer(0, 4, 2)) (p-9,ArrayBuffer(1, 2, 3))Copy the code

The permutations that you get will eventually write zk, these are the values of AR; The first is the Leader

  1. Broker List,1,4,2,3 = {0}
  2. startIndes = 2
  3. nextReplicaShift Lambda is equal to 4nextReplicaShift Lambda equals 0 is the same thingnextReplicaShift%(BrokerSize-1))

The parameters are basically the same as in the single copy,nextReplicaShift = 4 represents the initial interval of 4 between the first and second copies. There are five brokers in total. The final effect is the same as the initial interval of 0, as you can see below.Now that we understand the meaning of this interval, let’s look at the overall distribution of the layout

From this we can see:

Random startIndex can make the Leader avoid the situation of partition stacking as much as possible. If each time starts from 0, then the first partition falls at 0 when each Topic is created. Assuming that there are not many partitions, all the partitions will be piled up to the previous Broker, and the later brokers will not be allocated.

NextReplicaShitf: Try to make the distribution of individual Topic copies more hashed

Partition expansion allocation mode

Partition expansion case, also call the above method, allocation rules are the same; But inputs are a little different

Where it’s different, I’ll post the key Scala code

Finally is also called the AdminUtils. AssignReplicasToBrokers method; But the input parameters are a little different

  1. Broker List allBrokers; hereallBrokersGet the list of Brokers nodes from zk; Such as,1,2,3,4,5 {0}

  2. StartIndex: here is not a random value, but existingAssignmentPartition0. Head to obtain values; This represents the Brokerlist index value for the first copy of the first partition of the current Topic;

  3. nextReplicaShitf:Here withstartIndexIs a value; If the input parameter is specifiedstartIndexthenextReplicaShitf:As it is, the following figure shows the code

  4. startPartitionId: The value here is the number of partitions that already exist; This value was 0 when we created the topic;

Then create t2(10 partitions,3 copies), perform the following partition expansion, expand to 13;

This is the situation after the partition expansion, because it happens to be two rounds of training before the expansion, so you may not see the problem, let’s look at another case

Create a new Topic T5, 3 partitions,1 copy as followsExpand the number of zones to five. The new zones are allocated as followsDistribution chart

At least 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1

Why is this happening?

Let’s first analyze what the person who wrote this code is trying to do.

The left side of the figure is the allocation after the final expansion, and the right side is the calculation method during the expansion. It can be concluded from the above analysis

  1. Partition expansion does not change the previous allocation, but only changes the allocation rules of the newly added partition.
  2. starIndexIs the index value of the first copy of the first partition in BrokerList after sorting; And then it allocates according to the allocation rule, and at this time there arestartPartitionIdTruncate the previous configuration and only calculate this part of the expanded partition;

From it this code analysis is not want to continue to allocate last time? It sorts the Broker List; NextReplicaShitf this variable does not affect the Leader balance of the partition. It is used to discretize the partitions as much as possible

As long as the order of the broker List for the third variable is maintained, the assignment will remain as it was created (excluding manual modification). It’s going to have an even distribution; Is the actual broker List variable the same?

Answer: Different!!!!!

{0,1,4,2,3} was not sorted when it was created. {0,1,2,3,4} was sorted when it was expanded

Why is that? Why is that? Why is that?

You can either sort them all, or you can just keep using the last list, right?

At this point, we have definitely determined that partition expansion may cause partition imbalance. Although the effect is small, you and I may not be aware of it, but if the whole cluster batch expansion, will it expand the scope of the problem?

At this point we may not be sure that it is a bug, just that there is an element of doubt

However, if the Topic was created in an orderly way, then there would be no uneven partition.

Let’s move on to the way partition copies are redistributed

Partition copy redistribution mode

Partition copy redistribution source code analysis process see: 30,000 words to teach you to thoroughly understand the principle of data migration (with supporting teaching video)

I’m not going to bother you here, but I’m going to throw the result;

Let’s reassign topic = t5 to see what kafka recommends.

BrokerList = {0,1,2,3,4}; BrokerList for generate is {0,1,2,3,4} no matter how many times you execute it; Of course startIndex and nextReplicaShift are both still random;

At least after redistribution, the partitions are balanced

And look at the source code, is deliberately sorted

Why is it only when you create topics that you don’t sort?

Well, LET me think about it, maybe intentionally, some other considerations; Create Topic Broker List

Up again

The Broker List obtained by creating the Topic in the last diagram is the Broker node obtained from zK when the Controller is initialized.

  1. Sorted first!!
  2. It then goes to ZK with this BrokerID to retrieve specific information for each Broker
  3. Return the resulttoMapPut it in the Map, so that’s why it’s not ordered;

Sorting here is not a bit off pants farting 💨 superfluous feeling

conclusion

Is that a bug? I think so, for several reasons

  1. The existing situation may cause uneven partition distribution during partition expansion
  2. “Topic creation” is not sorted, but “partition expansion” and “redistribution” are sorted
  3. Topic creation is not sorted, but when partitioning is extended, its calculation logic is sequential in the way it was allocated
  4. If the partitions are created sequentially, the uneven distribution caused by “expanded partitions” will not occur
  5. When the Topic was created, it was sorted first, but then it was put into the Map. If it didn’t want to be sorted eventually, why was it sorted in the first place? Because the sorting here is completely unnecessary;

The above is the process of my analysis, and my point of view, the level is limited, welcome to put forward different views, comment area discussion!