Spark-27281 kafka-7703 Kafka2.2.0 Multithreading caused Offset error reoccurrence Kafka-7703 official fix KafkaConsumer poll analysis 2 get partition Offset

preface

In a previous article on pit SparkStreaming read Kafka reported OffsetOutOfRangeException, also left a mystery

The spark log displays the following error:

java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative

A possible cause of this problem was found in Kafka issue kafka-7703. Note the reoccurrence and study of this issue.

repetition

First, we need to compile the source code for Kafka2.2.0, which I will not expand on here, there are many resources available online. We opened the source code for Kafka2.2.0 in IDEA and added logging and threading code in some key areas. For details, you can view the source code

Start Zookeeper locally and execute our test method, repo

@Test
  def repo(a): Unit = {
    val producer = createProducer()
    for(_ < -0 until 10) {
      val record =
        new ProducerRecord(tp.topic, tp.partition, null."key".getBytes, "value".getBytes)
      producer.send(record)
    }
    for(_ < -0 until 10) {
      val record =
        new ProducerRecord(tp2.topic, tp2.partition, null."key".getBytes, "value".getBytes)
      producer.send(record)
    }
    producer.flush()
    producer.close()
    // The offset of each partition should be 10
    val consumer = createConsumer()
    consumer.subscribe(List(tp.topic()).asJava)
    consumer.poll(0)
    consumer.seekToEnd(List(tp, tp2).asJava)
    val offset1 = consumer.position(tp)
    val offset2 = consumer.position(tp2)
    println(tp + ":" + offset1)
    println(tp2 + ":" + offset2)
    assert(offset1 == 10)
    // This would fail because the "earliest" reset response triggered by `poll(0)` set it to 0.
    assert(offset2 == 10)
    consumer.close()
  }
Copy the code

The seekToEnd method should have returned 10. Position () should have returned 0. The seekToEnd method should have returned 10.

ListOffsetResult: {topic-0=offset: 0}
ListOffsetResult: {topic-1=offset: 0}
start to reset
ListOffsetResult: {topic-0=offset: 10}
start sleep 5000 ms
ListOffsetResult: {topic-1=offset: 10}
topic-0: 10
topic-1: 0
Copy the code

Problem analysis

Adding Printed Logs

The previous log is a bit sparse, but we’ll add some more in key places in the source code to make it easier to understand and observe fetcher.java

private void resetOffsetIfNeeded(TopicPartition partition, Long requestedResetTimestamp, OffsetData offsetData) {
        // we might lose the assignment while fetching the offset, or the user might seek to a different offset,
        // so verify it is still assigned and still in need of the requested reset
        if(! subscriptions.isAssigned(partition)) { log.debug("Skipping reset of partition {} since it is no longer assigned", partition);
        } else if(! subscriptions.isOffsetResetNeeded(partition)) { log.debug("Skipping reset of partition {} since reset is no longer needed", partition);
        } else if(! requestedResetTimestamp.equals(offsetResetStrategyTimestamp(partition))) { log.debug("Skipping reset of partition {} since an alternative reset has been requested", partition);
        } else {
            System.out.println("Moving " + partition + " to: " + offsetData);
            if (partition.partition() == 1) {
                try {
                    System.out.println("resetOffsetIfNeeded ===> "+Thread.currentThread().getName());
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (offsetData.offset > 0) {
                    // Make sure `position` returns before we change the offset to 10.
                    //System.out.println("Make sure `position` returns before we change the offset to 10.");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

            log.info("Resetting offset for partition {} to offset {}.", partition, offsetData.offset);
            System.out.println("Resetting offset for " + partition + " to: " + offsetData);
            offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch));
            subscriptions.seek(partition, offsetData.offset);
            System.out.println("Moved " + partition + " to: " + offsetData);
            System.out.println("after seek:: subscriptions.isOffsetResetNeeded("+ partition+") is "+ subscriptions.isOffsetResetNeeded(partition));
            System.out.println("after seek ===> "+Thread.currentThread().getName()); }}private void resetOffsetsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
        System.out.println("resetOffsetsAsync===>"+Thread.currentThread().getName());
        System.out.println("resetOffsetsAsync: " + partitionResetTimestamps);
        // Add the topics to the metadata to do a single metadata fetch.
        for (TopicPartition tp : partitionResetTimestamps.keySet())
            metadata.add(tp.topic());

        Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> timestampsToSearchByNode =
                groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>> entry : timestampsToSearchByNode.entrySet()) {
            Node node = entry.getKey();
            final Map<TopicPartition, ListOffsetRequest.PartitionData> resetTimestamps = entry.getValue();
            subscriptions.setResetPending(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs);

            RequestFuture<ListOffsetResult> future = sendListOffsetRequest(node, resetTimestamps, false);
            future.addListener(new RequestFutureListener<ListOffsetResult>() {
                @Override
                public void onSuccess(ListOffsetResult result) {
                    System.out.println("ListOffsetResult: " + result.fetchedOffsets);
                    System.out.println("onSuccess===>"+Thread.currentThread().getName());
                    if(! result.partitionsToRetry.isEmpty()) { subscriptions.resetFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs); metadata.requestUpdate(); }for (Map.Entry<TopicPartition, OffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
                        TopicPartition partition = fetchedOffset.getKey();
                        OffsetData offsetData = fetchedOffset.getValue();
                        ListOffsetRequest.PartitionData requestedReset = resetTimestamps.get(partition);
                        System.out.println("before resetOffsetIfNeeded:: subscriptions.isOffsetResetNeeded("+ partition+") is "+ subscriptions.isOffsetResetNeeded(partition)); resetOffsetIfNeeded(partition, requestedReset.timestamp, offsetData); }}}); }}Copy the code

KafkaConsumer.java

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        acquireAndEnsureOpen();
        try{...if (includeMetadataInTimeout) {
                    System.out.println("poll1");
                    if(! updateAssignmentMetadataIfNeeded(timer)) {returnConsumerRecords.empty(); }}else {
                    System.out.println("poll2");
                    while(! updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata"); }}... }@Override
    public void seekToEnd(Collection<TopicPartition> partitions) {

        try {
            Thread.sleep(2000);
            System.out.println("start to reset");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (partitions == null)
            throw new IllegalArgumentException("Partitions collection cannot be null");

        acquireAndEnsureOpen();
        try {
            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
            for (TopicPartition tp : parts) {
                log.debug("Seeking to end of partition {}", tp);
                System.out.println("Seeking to end of partition "+ tp);
                subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
                System.out.println("after seekToEnd:: subscriptions.isOffsetResetNeeded("+ tp+") is "+ subscriptions.isOffsetResetNeeded(tp)); }}finally{ release(); }}@Override
    public long position(TopicPartition partition, final Duration timeout) {
        acquireAndEnsureOpen();
        try {
            if (!this.subscriptions.isAssigned(partition))
                throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");

            System.out.println("before position sleep, and partition.partition() is " + partition.partition());
            if (partition.partition() == 1) {
                // Wait to make sure the background offset request for earliest has finished and `offset` is not null.
                try {
                    System.out.println("start sleep 5000 ms");
                    System.out.println("position===>"+Thread.currentThread().getName());
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("after position sleep");

            Timer timer = time.timer(timeout);
            do {
                Long offset = this.subscriptions.position(partition);
                System.out.println("position2====>"+Thread.currentThread().getName());
                System.out.println("return offset :"+offset+", partition :"+ partition);
                if(offset ! =null)
                    return offset;
                System.out.println("start position updateFetchPositions of partition:"+partition);
                updateFetchPositions(timer);
                client.poll(timer);
            } while (timer.notExpired());

            throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
                    "for partition " + partition + " could be determined");
        } finally{ release(); }}Copy the code

After adding log printing, we execute the REPO test method again:

poll2 start poll updateFetchPositions resetOffsetsAsync===>main resetOffsetsAsync: {topic-0=-2, topic-1=-2} ListOffsetResult: {topic-0=offset: 0} onSuccess===>kafka-coordinator-heartbeat-thread | my-test before resetOffsetIfNeeded:: subscriptions.isOffsetResetNeeded(topic-0) is true Moving topic-0 to: offset: 0 Resetting offset for topic-0 to: offset: 0 Moved topic-0 to: offset: 0 after seek:: subscriptions.isOffsetResetNeeded(topic-0) is false after seek ===> kafka-coordinator-heartbeat-thread | my-test KafkaApis===>data-plane-kafka-request-handler-3 returning for topic-1: 0 ListOffsetResult: {topic-1=offset: 0} onSuccess===>kafka-coordinator-heartbeat-thread | my-test before resetOffsetIfNeeded:: subscriptions.isOffsetResetNeeded(topic-1) is true Moving topic-1 to: offset: 0 resetOffsetIfNeeded ===> kafka-coordinator-heartbeat-thread | my-test start to reset Seeking to end of partition topic-0 after seekToEnd:: subscriptions.isOffsetResetNeeded(topic-0) is true Seeking to end of partition topic-1 after seekToEnd:: subscriptions.isOffsetResetNeeded(topic-1) is true before position sleep, and partition.partition() is 0 after position sleep position2====>main return offset :null, partition :topic-0 start position updateFetchPositions of partition:topic-0 resetOffsetsAsync===>main resetOffsetsAsync:  {topic-0=-1, topic-1=-1} position2====>main return offset :null, partition :topic-0 start position updateFetchPositions of partition:topic-0 position2====>main return offset :null, partition :topic-0 start position updateFetchPositions of partition:topic-0 ListOffsetResult: {topic-0=offset: 10} onSuccess===>main before resetOffsetIfNeeded:: subscriptions.isOffsetResetNeeded(topic-0) is true Moving topic-0 to: offset: 10 Resetting offset for topic-0 to: offset: 10 Moved topic-0 to: offset: 10 after seek:: subscriptions.isOffsetResetNeeded(topic-0) is false after seek ===> main position2====>main return offset :10, partition :topic-0 before position sleep, and partition.partition() is 1 start sleep 5000 ms position===>main KafkaApis===>data-plane-kafka-request-handler-2 returning for topic-1: 10 Resetting offset for topic-1 to: offset: 0 Moved topic-1 to: offset: 0 after seek:: subscriptions.isOffsetResetNeeded(topic-1) is false after seek ===> kafka-coordinator-heartbeat-thread | my-test ListOffsetResult: {topic-1=offset: 10} onSuccess===>kafka-coordinator-heartbeat-thread | my-test before resetOffsetIfNeeded:: subscriptions.isOffsetResetNeeded(topic-1) is false after position sleep position2====>main return offset :0, partition :topic-1 topic-0: 10 topic-1: 0Copy the code

Streamline the execution process

Before analyzing the log, we need to clear up the poll(),seekToEnd(), and Position () methods called by the test class method. Modified the OffsetResetStrategy seekToEnd method is simple, is for the LATEST subscriptions. RequestOffsetReset (tp, OffsetResetStrategy. LATEST); . The following is the flow of the position() and poll() methods, and you can see that both methods are consistent in their subsequent execution.

KafkaConsumer.position() ---->
updateFetchPositions() ---->
Fetch.resetOffsetsIfNeeded() ---->
resetOffsetsAsync() ----->
future.onSuccess() ---->
Fetch.resetOffsetIfNeeded(partition,Timestamp,offsetData)

-----

KafkaConsumer.poll ---> 
KafkaConsumer.updateAssignmentMetadataIfNeeded ---->
updateFetchPositions() ---->
Fetch.resetOffsetsIfNeeded() ---->
resetOffsetsAsync() ----->
future.onSuccess() ---->
Fetch.resetOffsetIfNeeded(partition,Timestamp,offsetData)
Copy the code

Fetch #resetOffsetIfNeeded() Subscription. seek(partition, offsetdata.offset); . We added the ‘Moved’ log after this position. As you can see in the final log, only three ‘Moved’ logs were printed, respectively:

Moved topic-0 to: offset: 0
Moved topic-0 to: offset: 10
Moved topic-1 to: offset: 0
Copy the code

As you can see, the log that we ended up with “Moved Top-1 to offset 10” didn’t print, so it didn’t get called. The onSuccess() request was not returned. Or in the Fetcher. ResetOffsetIfNeeded (partition, Timestamp, offsetData) method in the judge failed? We can see that “ListOffsetResult” log appears 4 times, indicating that onSuccess() request returned 4 times, that is, resetOffsetIfNeeded does not need to re-seek offset.

Debug

So we run the REPO test method in Debug mode. Can find should version topic – 1 to 10 “this time of the onSuccess () request, is the Fetcher resetOffsetIfNeeded (partition, Timestamp, offsetData) method in the following judgments, Judge to false, so the seek method is not executed.

else if(! subscriptions.isOffsetResetNeeded(partition)) { log.debug("Skipping reset of partition {} since reset is no longer needed", partition);
        }
Copy the code

The default breakpoint mode of IDEA is ALL. If you want every Thread to trigger breakpoints, you need to set the breakpoint mode of IDEA to Thread.

We follow this approach subscriptions. IsOffsetResetNeeded (partition)

private boolean awaitingReset(a) {
    returnresetStrategy ! =null;
}
Copy the code
private OffsetResetStrategy resetStrategy;

public enum OffsetResetStrategy {
    LATEST, EARLIEST, NONE
}
Copy the code

ResetStrategy is an OffsetResetStrategy enumeration class with only three enumerated values. The resetStrategy property is reset to NULL when the seek() method is executed and to LATEST or EARLIEST when offset changes are required, as was the case with our seekToEnd() method.

So, we add logs before and after some key calls to print out whether the determination was successful or not. We note the following entry:

Resetting offset for topic-1 to: offset: 0
Moved topic-1 to: offset: 0
after seek:: subscriptions.isOffsetResetNeeded(topic-1) is false
ListOffsetResult: {topic-1=offset: 10}
before resetOffsetIfNeeded:: subscriptions.isOffsetResetNeeded(topic-1) is false
Copy the code

In the last version topic – 1 to 0 “of the seek (), subscriptions, isOffsetResetNeeded (1) the result is false. Then execute next version topic – 1 to 10 “, the execution of the seek () will judge before subscriptions. IsOffsetResetNeeded (1) to false results. So that leads to ‘Moved Topic-1 to 10’ not being executed.

The question at this point might be, why?

This bug is triggered when Kafka executes seek() or requestOffsetReset() without locking the resetStrategy property, causing multiple threads to change the resetStrategy of the same partition. Let’s add the log of the print thread and execute the test class REPO method to verify. Here are the key logs:

resetOffsetsAsync: {topic-0=-2, topic-1=-2}
onSuccess===>kafka-coordinator-heartbeat-thread | my-test
before resetOffsetIfNeeded:: subscriptions.isOffsetResetNeeded(topic-1) is true
Moving topic-1 to: offset: 0
resetOffsetIfNeeded ===> kafka-coordinator-heartbeat-thread | my-test
start to reset
Seeking to end of partition topic-0
after seekToEnd:: subscriptions.isOffsetResetNeeded(topic-0) is true
Seeking to end of partition topic-1
after seekToEnd:: subscriptions.isOffsetResetNeeded(topic-1) is true
position2====>main
resetOffsetsAsync===>main
resetOffsetsAsync: {topic-0=-1, topic-1=-1}
...
position2====>main
before position sleep, and partition.partition() is 1
start sleep 5000 ms
position===>main
Resetting offset for topic-1 to: offset: 0
Moved topic-1 to: offset: 0
after seek:: subscriptions.isOffsetResetNeeded(topic-1) is false
after seek ===> kafka-coordinator-heartbeat-thread | my-test
ListOffsetResult: {topic-1=offset: 10}
onSuccess===>kafka-coordinator-heartbeat-thread | my-test
before resetOffsetIfNeeded:: subscriptions.isOffsetResetNeeded(topic-1) is false
after position sleep
position2====>main
Copy the code

It can be found:

  1. For the first time,Version topic – 1 to 0″The execution thread iskafka-coordinator-heartbeat-thread | my-test, and then execute sleep(5000) just before executing the seek() method.
  2. Now the main threadmainStart seekToEnd(), sleep(2000) and willresetStrategyChange the property to LATEST. The postition() method is then executed, followed by sleep(5000).
  3. Then,kafka-coordinator-heartbeat-thread | my-testthanmainThe thread wakes up earlier and it continues to executeVersion topic – 1 to 0″The seek() method of theresetStrategyProperty is changed to NULL and terminates.
  4. mainThe thread wakes up and executesVersion topic – 1 to 10″The onSuccess() method of theresetStrategyProperty is null. If null is found, the subsequent seek() method is not executed, that is, offset is not set to 10.

How to repair

Kafka-7703: kafka-7703: kafka-7703: Kafka-7703: Kafka-7703: Kafka-7703: Kafka-7703: Kafka-7703: Kafka-7703: Kafka-7703: Kafka-7703

Even simpler, update kafka-Client version to 2.3.0.