Moment For Technology

Kafka series - 3.1, producer client basic use

Posted on Oct. 13, 2023, 2:58 p.m. by 司懿
Category: The back-end Tag: kafka

Necessary parameters

  • bootstrap.servers

This parameter is the address of the broker. You do not need to fill in all of them, as Kafka will retrieve information about other brokers from the current broker. In case a broker fails, multiple broker addresses are used

  • key.serializer

How do I serialize the message key

  • value.serializer

How is the message content serialized

The sample code

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Copy the code

Producer interceptor

The message is processed before it is sent, before the serializer, before the partitioning.

Implement org. Apache. Kafka. Clients. Producer. ProducerInterceptor interface, can be custom interceptors

Introduce the method of interface definition

  • ProducerRecordK, V onSend(ProducerRecordK, V record)

Messages can be processed before they are sent

  • void onAcknowledgement(RecordMetadata metadata, Exception exception

Called before the message is answered or when the message fails to be sent

  • void close()

When the producer is shut down, it is called

Kafka allows you to configure a chain of interceptors, with multiple interceptors separated by a number.

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TestProducerInterceptor.class.getName() + "," + TestProducerInterceptor2.class.getName());
Copy the code


Serialization occurs before the partition editor implement mon. Serialization. StringSerializer interface can be custom serialization

Introduce the method of interface definition

  • void configure(MapString, ? configs, boolean isKey)

In the StringSerializer implementation, it is used to set the encoding

  • byte[] serialize(String topic, String data)

Define how to serialize

  • void close()

When the producer is closed, it is called

Partition is

Implement org. Apache. Kafka. Clients. Producer. The Partitioner can custom partitions

Kafka can hash by key (MurmurHash2) to send messages to the same partition. If no key is specified, the message will be sent to a random partition.

Introduce the method of interface definition

  • int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)

Define which partition to send to See The DefaultPartitioner for the implementation

  • void close()

When the producer is closed, it is called

And the similarities and differences between RocketMQ

  1. withkafkaConsistent,rocketMQAllows a producer to send a message to a specified 'partition'
  2. rocketMQThere is noThe serializerThe concept of. Message content is provided byrocketMQSelf-serialization
  3. From the individual's current usage,rocketMQIt doesn't offer anything similarThe interceptorconcept
  4. rocketMQprovideshockTo process the message before it is sent and after it is sent

Such as:

DefaultMQProducer producer = new DefaultMQProducer("default");

producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
    public String hookName(a) {
        return null;

    public void sendMessageBefore(SendMessageContext context) {}@Override
    public void sendMessageAfter(SendMessageContext context) {}});Copy the code
About (Moment For Technology) is a global community with thousands techies from across the global hang out!Passionate technologists, be it gadget freaks, tech enthusiasts, coders, technopreneurs, or CIOs, you would find them all here.