In the back-end storage practice class of Teacher Li Yue recently, I saw a lot of interesting things: using Kafka to store the data of click stream, and repeated processing. In the past, Kafka was simply a vehicle for message transmission. Once a message was consumed, it could not be consumed again. The new knowledge conflicts with impressions, and hence this article: How Kafka data can be reused.

Preliminary theoretical understanding

First I went to the official website to correct my overall understanding of Kafka.

Kafka is described on its website as a distributed streaming platform. Blame yourself for your poor academic skills.

Second, I looked at the process of consuming a Kafka consumer: Kafka first gets a message via push/poll (default poll). After receiving the message, the manual/automatic commit is successful, and the Kafka server decides whether to move the current offset based on the commit.

The scheme

Where kafka consumers read data is determined by the offset, so if I can manually set the offset to the starting position, then I can repeat the consumption? This is interesting.

How you set the offset manually is key.

show me the code

The key to the code is mostly the call to the offset setting API, nothing else.

Note that IN the code I have called the Settings offsets for different purposes, just for demonstration purposes, available on demand.

When it comes to the final consumer message message, I only use the default pull bar Settings for consumption once, which can be modified as needed.

/**
 * repeat kafka message
 * @param host kafka host
 * @param groupId kafka consumer group id
 * @param autoCommit whether auto commit consume
 * @param topic consume topic
 * @param consumeTimeOut consume time out
*/
    private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){
        //form a properties to new consumer
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString());
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //subscribe incoming topic
        consumer.subscribe(Collections.singletonList(topic));
        //get consumer consume partitions
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>();
        for(PartitionInfo partitionInfo : partitionInfos){
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            topicPartitions.add(topicPartition);
        }
        // poll data from kafka server to prevent lazy operation
        consumer.poll(Duration.ofSeconds(consumeTimeOut));
        //reset offset from beginning
        consumer.seekToBeginning(topicPartitions);
        //reset designated partition offset by designated spot
        int offset = 20;
        consumer.seek(topicPartitions.get(0), offset);
        //reset offset to end
        consumer.seekToEnd(topicPartitions);
        //consume message as usual
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
        while (iterator.hasNext()){
            ConsumerRecord<String, String> record = iterator.next();
            log.info("consume data: {}", record.value());
        }
    }
Copy the code
The results

Points to note

An exception was encountered while manually setting the offset

java.lang.IllegalStateException: No current assignment for partition test-0
Copy the code

Setting the offset is a lazy operation.

Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only when poll(long) or position(TopicPartition) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.

So I do a poll and then I set the offset.

In this paper, starting from cartoon blogs reprint please indicate the source: cartoonyu. Making. IO/cartoon – bio…