First, the process of the producer sending messages

Let’s first introduce the process by which a Kafka producer sends a message:

  • Kafka wraps the send message as a ProducerRecord object. The ProducerRecord object contains the target topic and the content to be sent, as well as specifying keys and partitions. Before sending the ProducerRecord object, the producer serializes the key and value objects into byte arrays so that they can be transmitted over the network.
  • Next, the data is passed to the partitioning machine. If partitions have been previously specified in the ProducerRecord object, the partitioner will not do anything. If no partition is specified, the partier selects a partition based on the key of the ProducerRecord object. The record is then added to a record batch, and all messages in that batch are sent to the same subject and partition. A separate thread is responsible for sending these record batches to the appropriate broker.
  • The server returns a response when it receives these messages. If the message is successfully written to Kafka, a RecordMetaData object is returned, containing the subject and partition information, as well as the offset recorded in the partition. If the write fails, an error is returned. After receiving an error, the producer tries to resend the message. If the message is not successfully sent after the specified number of retries is reached, the producer throws an exception and does not retry any more.

Create producers

2.1 Project Dependencies

This project is built in Maven. To invoke the Kafka producer API, import the Kafka-Clients dependency as follows:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
Copy the code

2.2 Creating a Producer

When creating a Kafka producer, the following three attributes must be specified:

  • Bootstrap. servers: Specifies a list of broker addresses. The list does not need to contain all broker addresses. The producer will look for broker information from the given broker. However, it is recommended to provide information from at least two brokers as fault tolerance;
  • Key. serializer: specifies the key serializer.
  • Value. Serializer: specifies the value of the serializer.

The example code to create is as follows:

public class SimpleProducer {

    public static void main(String[] args) {

        String topicName = "Hello-Kafka";

        Properties props = new Properties();
        props.put("bootstrap.servers"."hadoop001:9092");
        props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        /* Create producer */
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, 
                                                                         "world" + i);
            /* Send a message */
            producer.send(record);
        }
        /* Close the producer */producer.close(); }}Copy the code

All of the sample code for this article can be downloaded from Github: Kafka-basis

2.3 test

1. Start the Kakfa

To run, Kafka relies on ZooKeeper and requires a pre-boot. You can start the zooKeeper built-in in Kafka, or you can start the self-installed:

#This command is used to start zooKeeper
bin/zkServer.sh start

#Built-in ZooKeeper startup commands
bin/zookeeper-server-start.sh config/zookeeper.properties
Copy the code

Start single-node Kafka for testing:

# bin/kafka-server-start.sh config/server.properties
Copy the code

2. Create a topic

#Create a topic for testing
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 1 \
                     --topic Hello-Kafka

#View all topics
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
Copy the code

3. Start the consumer

Start a console consumer to observe writes with the following command:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning
Copy the code

4. Run the project

At this point you can see the consumer console with the following output, where kafka-console-consumer will only print the value information, not the key information.

2.4 Possible Problems

One possible problem here is that the producer program is in a waiting state after it starts. This usually happens when you start Kafka with the default configuration and you need to make changes to the Listeners configuration in the server.properties file:

#Hadoop001 is my host name to start kafka. You can change it to your host name or IP address
listeners=PLAINTEXT://hadoop001:9092
Copy the code

Send a message

The example program above calls the send method and sends the message without doing anything, in which case we have no way of knowing the result of the message being sent. If you want to know the result of sending a message, you can do this using synchronous or asynchronous sending.

2.1 Synchronous Sending

The send method is followed by the get() method. The return value of the send method is a Future

object. The RecordMetadata contains information about the subject, partition, and offset of the message to be sent. The rewritten code looks like this:

for (int i = 0; i < 10; i++) {
    try {
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
        /* Send messages synchronously */
        RecordMetadata metadata = producer.send(record).get();
        System.out.printf("topic=%s, partition=%d, offset=%s \n",
                metadata.topic(), metadata.partition(), metadata.offset());
    } catch(InterruptedException | ExecutionException e) { e.printStackTrace(); }}Copy the code

The output is as follows: The offset is related to the number of calls, and all records are assigned to partition 0 because when creating the Hello-Kafka subject, you specified that the number of partitions was 1 using –partitions, i.e. only one partition.

topic=Hello-Kafka, partition=0, offset=40 
topic=Hello-Kafka, partition=0, offset=41 
topic=Hello-Kafka, partition=0, offset=42 
topic=Hello-Kafka, partition=0, offset=43 
topic=Hello-Kafka, partition=0, offset=44 
topic=Hello-Kafka, partition=0, offset=45 
topic=Hello-Kafka, partition=0, offset=46 
topic=Hello-Kafka, partition=0, offset=47 
topic=Hello-Kafka, partition=0, offset=48 
topic=Hello-Kafka, partition=0, offset=49 
Copy the code

2.2 Asynchronous Sending

In general we don’t care about sending success, we care more about sending failure, so Kafka provides asynchronous sending and callback functions. The code is as follows:

for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
    /* Send messages asynchronously and listen for callbacks */
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if(exception ! =null) {
                System.out.println("Doing exception handling");
            } else {
                System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset()); }}}); }Copy the code

Custom partitioning

Kafka has a default partitioning mechanism:

  • If the key value is null, the Round Robin algorithm is used to evenly distribute the messages across all partitions.
  • If the key value is not null, Kafka uses the built-in hash algorithm to hash the key and distribute it across partitions.

In some cases, you may have your own partitioning requirements, which can be implemented with a custom partition. Here is an example of a custom partition:

3.1 Customizing a partition

/** * Custom partition */
public class CustomPartitioner implements Partitioner {

    private int passLine;

    @Override
    public void configure(Map
       
         configs)
       ,> {
        /* Get the score line from the producer configuration */
        passLine = (Integer) configs.get("pass.line");
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, 
                         byte[] valueBytes, Cluster cluster) {
        /* The key value is the score. If the score is greater than the score line, partition 1 is allocated; otherwise, partition 0 is allocated */
        return (Integer) key >= passLine ? 1 : 0;
    }

    @Override
    public void close(a) {
        System.out.println("Partition off"); }}Copy the code

You need to specify the partition when you create the producer, and the configuration parameters required by the partition:

public class ProducerWithPartitioner {

    public static void main(String[] args) {

        String topicName = "Kafka-Partitioner-Test";

        Properties props = new Properties();
        props.put("bootstrap.servers"."hadoop001:9092");
        props.put("key.serializer"."org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");

        /* Pass a custom partition */
        props.put("partitioner.class"."com.heibaiying.producers.partitioners.CustomPartitioner");
        /* Pass the parameters required by the partition */
        props.put("pass.line".6);

        Producer<Integer, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i <= 10; i++) {
            String score = "score:" + i;
            ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
            /* Send messages asynchronously */
            producer.send(record, (metadata, exception) ->
                    System.out.printf("%s, partition=%d, \n", score, metadata.partition())); } producer.close(); }}Copy the code

3.2 test

You need to create a topic that has at least two partitions:

 bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 2 \
                     --topic Kafka-Partitioner-Test
Copy the code

At this time, input the following, you can see that scores greater than or equal to 6 points are divided into 1 partition, and less than 6 points are divided into 0 partition.

score:6, partition=1, score:7, partition=1, score:8, partition=1, score:9, partition=1, score:10, partition=1, score:0, partition=0, score:1, partition=0, score:2, partition=0, score:3, partition=0, score:4, partition=0, score:5, If partition=0, the partition is disabledCopy the code

Other attributes of the producer

The producer is created to specify only the service address, key serializer, and value serializer. In fact, Kafka’s producer has many configurable properties, such as:

1. acks

The acks parameter specifies how many partition replicas must receive the message before the producer considers the write to be successful:

  • Acks =0: When the message is sent, it is considered as successful and will not wait for any response from the server.
  • Acks =1: The producer receives a successful response from the server as long as the cluster leader node receives the message;
  • Acks =all: The producer receives a successful response from the server only when all participating nodes receive the message.

2. buffer.memory

Sets the size of the producer memory buffer.

3. compression.type

By default, sent messages are not compressed. If you want to compress, you can set this parameter. The available values are snappy, gzip, and LZ4.

4. retries

Number of times a message has been resent after an error occurred. If the set value is reached, the producer will abandon the retry and return an error.

5. batch.size

When there are multiple messages that need to be sent to the same partition, the producer puts them in the same batch. This parameter specifies the amount of memory, in bytes, that can be used by a batch.

6. linger.ms

This parameter specifies how long the producer waits for more messages to be added to the batch before sending it.

7. clent.id

The client ID that the server uses to identify the source of the message.

8. max.in.flight.requests.per.connection

Specifies how many messages a producer can send before receiving a server response. The higher it is, the more memory it consumes, but it also improves throughput, and setting it to 1 ensures that messages are written to the server in the order they were sent, even if retries occur.

9. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms

  • Timeout. Ms specifies the acknowledgement time for borker to wait for a message to return from the synchronous copy;
  • Request.timeout. ms specifies how long the producer waits for a response from the server when sending data;
  • Metadata. The fetch. The timeout. Ms specifies the producers in access to metadata, such as division chief who is waiting for the server returns a response time.

10. max.block.ms

Specifies the blocking time of the producer when the send() method is called or metadata is fetched using the partitionsFor() method. These methods block when the producer’s send buffer is full, or when no metadata is available. When the blocking time reaches Max. Block.ms, the producer throws a timeout exception.

11. max.request.size

This parameter is used to control the size of requests sent by producers. It can refer to the maximum value of a single message sent or the total size of all messages in a single request. For example, if the value is 1000K, then the maximum single message that can be sent is 1000K, or the producer can send a batch in a single request that contains 1000 messages of size 1K each.

12. receive.buffer.bytes & send.buffer.byte

The two parameters specify the size of the TCP socket buffer for receiving and sending packets, respectively. -1 indicates that the default value of the operating system is used.

The resources

  1. Neha Narkhede, Gwen Shapira,Todd Palino. An authoritative guide to Kafka. People’s Posts and Telecommunications Press. 2017-12-26

More articles in the big Data series can be found in the GitHub Open Source Project: Getting Started with Big Data