Recently, the Spark stream calculation program running in the project suddenly stopped by error and failed to restart due to other reasons. As a result, it was discovered a day later and almost caused a disaster.

mystery

The spark log displays the following error:

 java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
Copy the code

The error is that kafka’s consumption offset is larger than the maximum production offset, resulting in a negative result when calculating how many numRecords (numRecords= Untiloffset-fromoffset) are consumed in a batch. I checked a lot of information, all of which mentioned the problem that occurred when deleting the original topic and rebuilding the same topic. We did not have related operation on Kafka before. Unfortunately, this problem cannot be repeated at present, and the specific cause has not been found. Let me know in the comments if any of the bigwigs have any ideas about questionable directions.

Get into the business

A new error

The cause of the faulty shutdown cannot be found temporarily, but the program must run again first, or the customer will have to kill the door. But after the SparkStreaming program started running again, it stopped by mistake. It’s not the same mistake as before

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {kafkaTest3-0=4406}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
    ...
Copy the code

Tracing the source code

The offset is out of bounds. Check the source code to see where the offset is obtained. Finally in # Fetcher. PrepareFetchRequests () method, you can see offset from subscriptions variables

long position = this.subscriptions.position(partition);
Copy the code

And subscriptions variables in the data is in # ConsumerCoordinator refreshCommittedOffsetsIfNeeded () method by enclosing subscriptions. Seek (tp, offset) set in

public boolean refreshCommittedOffsetsIfNeeded(final long timeoutMs) { final Set<TopicPartition> missingFetchPositions =  subscriptions.missingFetchPositions(); final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, timeoutMs);if (offsets == null) return false;

        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            final TopicPartition tp = entry.getKey();
            final long offset = entry.getValue().offset();
            log.debug("Setting offset for partition {} to the committed offset {}", tp, offset);
            this.subscriptions.seek(tp, offset);
        }
        return true;
    }
Copy the code

Follow up the fetchCommittedOffsets(missingFetchPositions, timeoutMs) method

// Get the commit offset of a component area. future = sendOffsetFetchRequest(partitions); client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));if (future.succeeded()) {
    return future.value();
}
Copy the code

So finally the consumption offset of the current Partiton is obtained from Kafka.

Find out why

The cause of the error is also apparent. Kafka messages were saved for 1 day, and it took more than a day to pull up SparkStreaming again. The offset is no longer in the range of offsets that Kafka currently has.

The solution

Now that we know why, we need to modify the consumption offset in Kafka.

Kafka common command

There are two ways:

  1. Set directly to offset at earliest of the current partition

  2. In the first method, there is a very small chance that the data will expire again when the SparkStreaming restarts after the new offset is set. So you can query the Kafka producer’s data log and determine the offset from the next index file. Or do the first method again.

Sh --bootstrap-server 192.168.19.128:9092 --describe --group spark-kafka // Change the pay offset of a topic to the current producer./kafka-consumer-groups.sh --bootstrap -- server 192.168.19.128:9092 --group Spark-kafka --topic kafkaTest3 --execute --reset-offsets -- to-modi // Modify your earliest purchase offset in any topic or group. Sh --bootstrap-server 192.168.19.128:9092 --group spark-kafka --topic kafkaTest3 --execute --reset-offsets --to-offset 1500 // Query the current maximum message offset of each partition under a topic. It is the news in the position of each partition)/kafka - run - class. Sh kafka. View GetOffsetShell - broker - list 192.168.19.128:9092 - topic kafkaTest3 -- time-1 // Indicates to obtain the minimum displacement of each current partition. Subtracting the result of running the previous command from the result of this command is the current total number of messages for that topic in the cluster. . / kafka - run - class. Sh kafka. View GetOffsetShell - broker - list 192.168.19.128:9092 - topic kafkaTest3 - time - 2Copy the code