This is the 25th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Related: Consumer Groups in Kafka | Shift Topic: How do Consumer groups in Kafka preserve consumption shifts?

As mentioned earlier, Kafka consumers can manually submit shifts, and they can submit shifts that are not in the current position, allowing them to skip or re-consume messages. This is mainly because Kafka is a log-based messaging system, not a “queue” one.

In short, the displacement data is consumer-controlled. Note that the consumer can only control the displacement, not the message, which is always read-only to the consumer.

In fact, Kafka offers consumers a wealth of ways to reset shifts, roughly divided into position-specific and time-specific.

Reset by position

According to the displacement of position, there are several kinds of reweights

Earliest

Reset the displacement to the current earliest displacement.

Note here that the earliest position is not necessarily 0 because Kafka deletes the earlier logs. Use this strategy if you want to re-consume all existing messages in a topic.

Latest

Reset the displacement to the current latest displacement.

If you want to skip all historical messages and start spending from the most recent, use this strategy.

Current

Resets the displacement to the current latest committed displacement. There are few usage scenarios for this strategy.

Specified-Offset

Resets the displacement to a specified displacement.

Sometimes, consumers pull unconsumable messages from the messaging system, for example, messages are formatted incorrectly, or errors are reported during consumption, or for some business-related reason, messages cannot be consumed. At this point, you can use this strategy to skip and consume the message after it.

Shift-By-N

Resets the displacement to a position relative to the current position (current position + N).

Specification-offset can specify the displacement position to be reset directly, while shift-by-n can specify the displacement relative to the current position. For example, if N is 5, it is equivalent to skipping 5 messages, but N can also be negative, so it will jump back.

Specified-Offset and shift-by-n can be understood as absolute and relative positions.

Reset according to time

There are two kinds of displacement weights according to time

DateTime

Resets the displacement to the first position after the specified time.

Duration

Resets the displacement to the first position after a point in time relative to the current time.

DateTime and Duration can also be understood as absolute and relative times.

How to operate

With these strategies in mind, here’s how to do it.

For example, to reset the shift of the consumer group to its current earliest position, use the following command:

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics - to - earliest - executeCopy the code

Latest and Current have similar strategies.

The Specified value must be Specified in the command in the format of –to-offset < Offset > and –shift-by

, respectively.

For DateTime policy shift reset, you need to provide a specific time:

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics 000 - to 2021-11-25 - a datetime T20:00:00) - executeCopy the code

For the Duration policy, you need to provide a Duration format that complies with ISO-8601. It starts with the letter P and consists of D, H, M, and S, representing day, hour, minute, and second respectively.

bin/kafka-consumer-groups.sh --bootstrap-server <host>:<port> --group <group_id> --reset-offsets --all-topics - by - duration PT0H30M0S - executeCopy the code

PT0H30M0S here stands for 30 minutes.

After the preceding commands are executed, new displacement information is displayed.

Kafka also provides a consumer API for resetting displacement in consumer applications. Here is the Java API:

void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
Copy the code