Kafka producer consumer, interceptor, divider, serializer and deserializer

Go directly to the code POM file

<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> The < version > 1.2.62 < / version > < / dependency > < / dependencies >Copy the code

Materialized data for sending messages

public class User { private Integer userId; private String username; public User() { } public User(Integer userId, String username) { this.userId = userId; this.username = username; } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public String toString() { return "User{" + "userId=" + userId + ", username='" + username + '\'' + '}'; }}Copy the code

The producer, which contains some configurations for custom serializers and partitioners, as well as producer interceptors, uses the asynchronous send pattern

public class MyProducer { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); Configs. The put (ProducerConfig BOOTSTRAP_SERVERS_CONFIG, "115.159.150.169:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Set the custom serializer configs.put(producerconfig.value_serializer_class_config, userserializer_class_class); configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, Kafkapartitioner.class); configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInteceptor.class.getName()); KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs); for (int i=1; i<5; i++){ User user = new User(); user.setUserId(400+i); User. SetUsername (" zhao four "+ I); ProducerRecord<String, User> record = new ProducerRecord<String, User>( "tp_user_03", // topic user.getUsername(), // key user // value ); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception ! = null) {system.out.println (" message sending exception "); } else {// system.out.println (" topic: "+ metadata.topic() + "\t" // +" partition: "+ \t" // +" producer offset: " + metadata.offset()); System.out.println(" partition: "+ metadata.partition()); }}}); } // Close the producer producer.close(); }}Copy the code

The deserialization and the configuration of the consumer side are also added to the consumer

public class UserConsumer { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); Configs. The put (ConsumerConfig BOOTSTRAP_SERVERS_CONFIG, "115.159.150.169:9092"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Set the custom deserializer configs.put(consumerConfig.value_deserializer_class_config, userDeserializer_class_class); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer"); configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_id"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, UserIdConsumerInteceptor.class.getName() +","+ UsernameConsumerInteceptor.class.getName()); KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(configs); Subscribe (collections.singleton ("tp_user_03")); final ConsumerRecords<String, User> records = consumer.poll(Long.MAX_VALUE); records.forEach(new Consumer<ConsumerRecord<String, User>>() { @Override public void accept(ConsumerRecord<String, User> record) { System.out.println(record.headers()); System.out.println(record.value()); }}); Consumer.close (); }}Copy the code

Next comes the configuration of serialization and deserialization

public class UserSerializer implements Serializer<User> { @Override public void configure(Map<String, ? > configs, Boolean isKey) {// do nothing // receive configuration parameters for the serializer, } @override public byte[] serialize(String topic, User data) { System.out.println("username "+data.getUsername()); //data.setUserId(401); Data.setusername (data.getUsername()+" learning kafka"); return JSON.toJSONBytes(data); // try { // if (data == null) { // return null; // } else { // final Integer userId = data.getUserId(); // final String username = data.getUsername(); // // if (userId ! = null) { // if (username ! = null) { // final byte[] bytes = username.getBytes("UTF-8"); // int length = bytes.length; // // The first 4 bytes are used to store the value of userId // // the second 4 bytes are used to store the length of the array of username bytes int value // // the third length, // ByteBuffer = ByteBuffer. Allocate (4 + 4 + length); // // set userId // buffer.putint (userId); // // Set the length of the username byte array // buffer.putint (length); // // Set username byte array // buffer.put(bytes); // // returns the value of the user object as a byte array // return buffer.array(); Catch (Exception e) {// throw new SerializationException(" serialization failed "); // } // return null; } @override public void close() {// do nothing // It needs to be idempotent, that is, called multiple times, and the effect is the same. }}Copy the code

As you can see, in fact, during the serialization process, as long as we convert the data to bytes as required, we can also modify or process the data during the deserialization process

public class UserDeserializer implements Deserializer<User> { @Override public void configure(Map<String, ? > configs, boolean isKey) { } @Override public User deserialize(String topic, byte[] data) { // ByteBuffer buffer = ByteBuffer.allocate(data.length); // // buffer.put(data); // buffer.flip(); // // final int userId = buffer.getInt(); // final int usernameLength = buffer.getInt(); // // String username = new String(data, 8, usernameLength); // // return new User(userId, username); System.out.println(" Perform deserialization "); return JSON.parseObject(data,User.class); } @Override public void close() { } }Copy the code

Similarly, in the deserialization process, a peer deserialization process is required for the producer-side interceptor. Although I set the reset partition in the code, this setting is not implemented, but it can be determined that the interceptor is performed before the partition

public class ProducerInteceptor implements ProducerInterceptor<String, User> { @Override public ProducerRecord<String, User> onSend(ProducerRecord<String, User> record) {system.out.println (" interceptor 1"); Integer partition = record.partition(); System.out.println(" the partition before interceptor is "+partition); String topic = record.topic(); long timestamp = record.timestamp(); String key = record.key(); User user = record.value(); Headers headers = record.headers(); headers.add("patition inteceptor","partiton".getBytes()); Partition =2; ProducerRecord<String,User> producerRecord = new ProducerRecord<>(topic,partition,timestamp,key,user,headers); return producerRecord; } // Override public void onAcknowledgement(RecordMetadata metadata, Exception Exception) {system.out.println (" callback success interceptor "); } @Override public void close() { } @Override public void configure(Map<String, ? > configs) { } }Copy the code

Consumer-side interceptors

Public class UserIdConsumerInteceptor implements ConsumerInterceptor<String, User> { @Override public ConsumerRecords<String, User> onConsume(ConsumerRecords<String, User> records) { records.forEach(record->{ record.value().setUserId(record.value().getUserId()+10); }); return records; } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {system.out.println ("userId consumer interceptor submits offset "); } @Override public void close() { } @Override public void configure(Map<String, ? > configs) {}} / / second interceptor public class UsernameConsumerInteceptor implements ConsumerInterceptor < String, User> { @Override public ConsumerRecords<String, User> onConsume(ConsumerRecords<String, User> records) { records.forEach(record->{ record.value().setUsername(record.value().getUsername()+record.key()); }); return records; Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { System.out.println(offsets.entrySet()); System.out.println(" Commit offset "); } @Override public void close() { } @Override public void configure(Map<String, ? > configs) { } }Copy the code

Partition, when we implement the data ID as the partition element

public class Kafkapartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { User user = JSON.parseObject(valueBytes, User.class); System.out.println(user.getUserId()); int partition = user.getUserId()%3; System.out.println(" partition key"+user.getUsername()+" partition "+partition); return partition; } @Override public void close() { } @Override public void configure(Map<String, ? > configs) { } }Copy the code

The result of the producer end of the final implementation can be shown that the execution order is. Interceptor, serializer, divider

Consumer side as a result, you can see the deserializer first execution, execution after consumers interceptors, consumers interceptor is, in fact, after the callback accept method, also need to be aware of a parameter is consumer interceptors are need to be configured interceptor and producer is a collection of the full path string interceptors, and configuration in front of the first execution