Necessary parameters

  • bootstrap.servers

This parameter is the address of the broker. You do not need to fill in all of them, as Kafka will retrieve information about other brokers from the current broker. In case a broker fails, multiple broker addresses are used

  • key.serializer

How do I serialize the message key

  • value.serializer

How is the message content serialized

The sample code

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Copy the code

Producer interceptor

The message is processed before it is sent, before the serializer, before the partitioning.

Implement org. Apache. Kafka. Clients. Producer. ProducerInterceptor interface, can be custom interceptors

Introduce the method of interface definition

  • ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)

Messages can be processed before they are sent

  • void onAcknowledgement(RecordMetadata metadata, Exception exception

Called before the message is answered or when the message fails to be sent

  • void close()

When the producer is shut down, it is called

Kafka allows you to configure a chain of interceptors, with multiple interceptors separated by a number.

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TestProducerInterceptor.class.getName() + "," + TestProducerInterceptor2.class.getName());
Copy the code

serialization

Serialization occurs before the partition editor implement org.apache.kafka.com mon. Serialization. StringSerializer interface can be custom serialization

Introduce the method of interface definition

  • void configure(Map<String, ? > configs, boolean isKey)

In the StringSerializer implementation, it is used to set the encoding

  • byte[] serialize(String topic, String data)

Define how to serialize

  • void close()

When the producer is closed, it is called

Partition is

Implement org. Apache. Kafka. Clients. Producer. The Partitioner can custom partitions

Kafka can hash by key (MurmurHash2) to send messages to the same partition. If no key is specified, the message will be sent to a random partition.

Introduce the method of interface definition

  • int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)

Define which partition to send to See The DefaultPartitioner for the implementation

  • void close()

When the producer is closed, it is called

And the similarities and differences between RocketMQ

  1. withkafkaConsistent,rocketMQAllows a producer to send a message to a specified ‘partition’
  2. rocketMQThere is noThe serializerThe concept of. Message content is provided byrocketMQSelf-serialization
  3. From the individual’s current usage,rocketMQIt doesn’t offer anything similarThe interceptorconcept
  4. rocketMQprovideshockTo process the message before it is sent and after it is sent

Such as:

DefaultMQProducer producer = new DefaultMQProducer("default");

producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
    @Override
    public String hookName(a) {
        return null;
    }

    @Override
    public void sendMessageBefore(SendMessageContext context) {}@Override
    public void sendMessageAfter(SendMessageContext context) {}});Copy the code