Whether you use Kafka as a message queue, message, bus, or data storage platform, you always need a producer that can write to Kafka and a consumer that can read from Kafka, or an application that can be both.

For example, in a credit card transaction processing system, there is a client application, which might be an online store that sends the transaction to Kafka whenever a payment occurs. Another application checks the transaction against the rules engine and decides whether to approve or reject it. An approved or rejected response message is written back to Kafka and sent to the online store that initiated the transaction. A third application reads transaction and audit status from Kafka and saves them to a database where analysts can then analyze the results and perhaps improve the rules engine.

Developers can use Kafka’s built-in client API to develop Kafka applications.

In this chapter, we will start with the design and components of Kafra producers and learn how to use Kafka producers. We’ll show you how to create KafkaProducer and ProducerRecords objects, how to send records to Kafka, and how to handle errors returned from Kafka. Then we’ll show you the important configuration options for controlling producer behavior with dry. Finally, we delve into how to use different partitioning methods and serializers, and how to customize the serializers and partitions.

In the next chapter, we’ll cover Kafra’s secret client and how to read messages from Kafka.

Producer Overview

There are many situations in which an application needs to write messages to Kafka: logging user activity (for auditing and analysis), logging metrics, saving logs, messages, logging information about smart appliances, communicating asynchronously with other applications, buffering data about to be written to a database, and so on.

Multiple usage scenarios imply multiple requirements: does every message matter? Is it possible to lose a small percentage of messages? Is it acceptable to occasionally repeat messages? Are there strict latency and throughput requirements?

In the credit card transaction system mentioned earlier, message loss or message duplication is not allowed, the maximum acceptable latency is 500ms, and the throughput requirements are high, we want to process one million messages per second.

Saving web site clicks is another usage scenario. In this scenario, a small number of lost or repeated messages can be allowed, and the latency can be higher, as long as the user experience is not affected. In other words, as long as the user clicks on the link and the page loads immediately, we don’t care if the message takes a few seconds to reach the Kafka server. How much you swallow depends on how often you use the site.

Different usage scenarios have a direct impact on how producer apis are used and configured.

Although the producer API is simple to use, the process of sending messages is a bit complicated. The following figure shows the main steps for sending a message to Kafka.

We start by creating a ProducerRecord object, which needs to contain the target topic and the content to be sent. We can also specify keys or partitions. When sending the ProducerRecord object, the producer serializes the key and value objects into byte arrays so that they can be transferred over the network.

Next, the data is passed to the partitioner. If a partition was previously specified in the ProducerRecord object, the divider does nothing more and simply returns the specified partition. If no partition is specified, the divider selects a partition based on the ProducerRecord object’s key. Once the partition is selected, the producer knows which topic and partition to send the record to. This record is then added to a record batch in which all messages are sent to the same topic and partition. A separate thread is responsible for sending these record batches to the appropriate broker.

The server returns a response when it receives these messages. If the message is successfully written to Kafka, a RecordMetaData object is returned containing the subject and partition information, as well as the offset recorded in the partition. If the write fails, an error is returned. The producer tries to resend the message after receiving an error and returns an error message if it fails again after several attempts.

Create a Kafka producer

To write a message to Kafka, you first create a producer object and set some properties.

The following code snippet shows how to create a new producer, specifying only the necessary properties and using the default Settings.

private Properties kafkaProps = new Properties(); 

kafkaProps.put("bootstrap.servers"."broker1:9092,broker2:9092");
 
kafkaProps.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
 
kafkaProps.put("value.seializer"."org.apache.kafka.common.serialization.StringSerializer");
 
producer = new KafkaProducer<String, String>(kafkaProps);
Copy the code

Kafka producers have three mandatory attributes

bootstrap.servers

This property specifies the broker’s address list in the form host:port. The list does not need to contain all broker addresses. Producers will look up information about other brokers from a given broker. However, it is recommended to provide information on at least two brokers so that producers can still connect to the cluster if one of them goes down.

key.serializer

The broker expects the key and value of the message to be byte arrays. The producer interface allows the use of parameterized types, so Java objects can be sent to the broker as keys and values. Such code is quite readable, but producers need to know how to convert these Java objects into byte arrays. Key. The serializer must be set to an implementation of a org.apache.kafka.com mon. Serialization. The serializer interface classes, producers can use this class to key object serialization into a byte array. The Kafka client provides ByteArraySerializer(which only does a few things), StringSerializer, and IntegerSerializer by default, so if you only use the common Java object types, There is no need to implement your own serializer. Note that key.serializer must be set, even if you intend to send only value content.

value.serializer

As with key.serializer, the class specified by value.serializer serializes the value. If both keys and values are strings, you can use the same serializer as key.serializer. If the key is an integer type and the value is a character fan, you need to use a different serializer.

There are three main ways to send messages:

Fire-and-forget: We send a message to the server, but we don’t care if it arrives. Most of the time, messages will arrive normally because Kafka is highly available and producers will automatically try to resend. However, messages are sometimes lost using this approach.

2. Synchronous sending: We use send() to send a message, which returns a Future object and calls get() to wait to see if the message was successfully sent.

3. Asynchronous send: We call send() and specify a callback function that the server will call when it returns a response.

In the following examples, we will show you how to use the above methods to send messages and how to handle exceptions that may occur.

All the examples in this chapter use single threads, but it is possible for producers to send messages using multiple threads. You can start with a single consumer and a single thread. If higher throughput is required, the number of threads can be increased without changing the number of producers. If this is not enough, you can increase the number of producers.

Send a message to Kafka

The simplest way to send messages synchronously is as follows:

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry"."Precision Products"."France");
try{
  producer.send(record);
} catch(Exception e) {
  e.printStack();
}
Copy the code

The producer’s send() party takes the ProducerRecord object as an argument, which requires the name of the target topic and the key and value objects to send, which are strings. The type of the key and value objects must match that of the serializer and producer objects.

We use the producer’s send() method to send the ProducerRecord object. As you can see from the producer schema, messages are first put into the buffer and then sent to the server using a separate thread. The send() method returns a Future object containing RecordMetadata, but because we ignore the return value, we have no way of knowing if the message was sent successfully. If you don’t care about sending results, you can use this mode of sending. For example, log Twitter messages or less important application logs.

We can ignore errors that may occur while sending the message or on the server side, but it is possible for the producer to have other exceptions before sending the message. These exceptions may be SerializationException (indicating that the serialization message failed), bufferhaustedexception, or TimeoutException (indicating that the buffer is full), Or InterruptException (meaning the sending thread is interrupted).

Synchronous message sending

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry"."Precision Products"."France");
try{
    producer.send(record).get();
} catch(Exception e) {
    e.printStack();
}
Copy the code

Here, the producer.send() party returns a Future object and then calls the Future object’s get() method to wait for the Kafka response. If the server returns an error, the get() side throws an exception. If no errors occur, we get a RecordMetadata object that we can use to get the offset of the message. An exception is thrown if any errors occur before or during the sending of data, such as the broker returning an exception that does not allow the message to be resent or if the number of resends has exceeded. We simply print out the exception message.

How do I handle errors returned from Kafka producers

There are two types of errors that KafkaProducer makes. One category is retried errors, which can be resolved by resending the message. For example, a connection error can be resolved by re-establishing a connection, or a “noleader” error can be resolved by re-electing a leader for the partition. KafkaProducer can be configured to retry automatically, and if the problem cannot be resolved after multiple retries, the application receives a retry exception. Another type of error is resolved by retry, such as the “message is too big” exception. For these types of errors, KafkaProducer doesn’t do any retries and simply throws an exception.

Sending messages asynchronously

Assume that a message takes 10ms to travel back and forth between the application and the Kafka cluster. If you wait for a response after each message is sent, sending 100 messages takes 1 second. But if you just send a message and don’t wait for a response, it takes much less time to send 100 messages. Most of the time, we don’t need to wait for a response — although Kafka sends back the destination topic, partition information, and offsets of the message, it’s not required for the sending application. However, when a message fails to be sent, we need to throw an exception, log the error, or write the message to an “error message” file for later analysis.

In order to handle exceptions while sending messages asynchronously, the producer provides callback support. Here is an example of sending a message asynchronously, using a callback.

Allocation of producers

So far, we have covered only a few of the necessary configuration parameters for producers — the Bootstrap. servers API and the serializer.

Producers also have a number of configurable parameters, described in the Kafka documentation, and most of them have reasonable defaults, so there is no need to modify them. However, there are a few parameters that have a significant impact on producers in terms of memory usage, performance, and reliability, which we’ll describe next.

1. acks

The acks parameter specifies how many partitions copies must receive the message before the producer considers the message write to be successful.

This parameter has a significant impact on the likelihood of message loss. This parameter has the following options. • If acks=0, the producer will not wait for any response from the server before successfully writing the message. That is, if something goes wrong and the server does not receive the message, the producer will not know and the message will be lost. However, because the producer does not need to wait for a response from the server, it can send messages at the maximum speed the network can support, resulting in high throughput.

• If acks=1, the producer will receive a successful response from the server as soon as the cluster leader receives the message. If the message does not hit the leader node (for example, the leader node crashes and the new leader has not been elected), the producer receives an error response and resends the message to avoid data loss. However, if a node that did not receive the message becomes the new leader, the message will still be lost. The throughput at this time depends on whether synchronous or asynchronous sending is used. If you make the sending client wait for a response from the server (by calling the Get () method of the Future object), you obviously add latency (the delay of transmitting a round trip over the network). If the client uses asynchronous callbacks, the latency problem is mitigated, but throughput is still limited by the number of messages being sent (for example, how many messages a producer can send before receiving a response from the server).

• If acks=all, the producer will receive a successful response from the server only if all participating nodes receive the message. This mode is the most secure because it ensures that more than one server receives the message, and the entire cluster can run even if one of the servers crashes (more details in Chapter 5). However, the latency is higher than when acks=1, because we wait for more than one server node to receive the message.

2. buffer.memory

This parameter is used to set the size of the producer memory buffer that the producer uses to buffer messages to be sent to the server. If your application sends messages faster than they can be sent to the server, you run out of producer space. At this point, the send() method call will either block or throw an exception, depending on how you set the block.on.buffe.full parameter (which was replaced with max-block.ms in version 0.9.0.0 to indicate that an exception can be blocked for a while).

3. compression.type

By default, messages are not compressed when sent. This parameter, which can be set to snappy, gzip, or LZ4, specifies which compression algorithm is used to compress messages before they are sent to the broker. Snappy compression algorithm has been invented by Google. It consumes less CPU, but provides better performance and a considerable compression ratio. If you are concerned about performance and network bandwidth, you can use this algorithm. The GZIP compression algorithm generally consumes more CPU but provides a higher compression ratio, so it can be used if the network bandwidth is limited. Using compression can reduce network transport overhead and storage overhead, which are often bottlenecks for sending messages to Kafka.

4. retries

The error the producer receives from the server may be a temporary error (such as a partition that cannot find the leader). In this case, the value of the retries parameter determines how many times the producer can retry the message, at which point the producer gives up retry and returns an error. By default, the producer waits 1OOms between retries, but this interval can be changed with the retries.backoff.ms parameter. It is recommended that before setting the number of retries and the retry interval, you test how long it takes to recover from a broken node (for example, how long it takes all partitions to elect a leader) so that the total retry time is longer than the time it takes a Kafka cluster to recover from a crash, otherwise the producer will give up retries too soon. However, some errors are not temporary and can be resolved by retry (such as the “whisper too big” error). In general, because the producer automatically retries, there is no need to deal with retried errors in the code logic. You only need to deal with unretried errors or cases where the number of retries exceeds the upper limit.

5. batch.size

When multiple messages need to be sent to the same partition, the producer puts them in a batch. This parameter specifies the amount of memory a batch can use, measured in bytes (not messages). When the batch is filled, all messages in the batch are sent. However, producer Wells do not always wait for batches to be filled before being sent. Batches of half catch, or even batches containing only one message, may also be sent. So even if the batch size is large, there is no delay, just more memory. But setting it too small can add some extra overhead because producers need to send messages more frequently.

6. linger.ms

This parameter specifies how long the producer waits for more messages to join the batch before sending it. KafkaProducer sends batches when they fill up or when linger.ms reaches its maximum. By default, producers send messages whenever there are available threads, even if there is only one message in the batch. Set lingering.ms to a number greater than 0 to make the producer wait a while before sending the batch so that more messages are added to the batch. While this increases latency, it also improves throughput (because more messages are sent at once, the overhead per message is smaller).

7. client.id

This parameter can be any string that the server uses to identify the source of the message, and can be used in logging and quota metrics.

8. max.in.flight.requests.per.connection

This parameter specifies how many messages a producer can send before receiving a server response. The higher its value, the more memory it consumes, but also improves throughput. Setting it to 1 ensures that messages are written to the server in the order they were sent, even if retries occur.

9. A timeout. Ms, request. A timeout. Ms and metadata. The fetch. The timeout. Ms

Request. A timeout. Ms specifies the producers in waiting for the server returns a response time while sending data, metadata. The fetch. The timeout. Ms specifies the producers in access to metadata (for example, who is the leader of the destination partition) when waiting for the server returns a response time. If waiting for the response times out, the producer either tries sending the data again or returns an error (throwing an exception or performing a callback). Timeout. ms specifies how long the broker must wait for a synchronous copy to return a message acknowledgement, matching the configuration of asks — if no acknowledgement is received within a specified time, the broker will return an error.

10. max.block.ms

This parameter specifies how long the producer will block if the send() method is called or the parttitionFor() method is used to get the metadata. These square bends block when the producer’s send buffer has been caught, or when there is no metadata available. When the blocking time reaches max-block. ms, the producer throws a timeout exception.

11 . max.request.size

This parameter is used to control the size of requests sent by producers. It can refer to the maximum size of a single message that can be sent, or it can refer to the total size of all messages in a single request. For example, assuming this value is 1MB, the maximum single message that can be sent is 1MB, or a producer can send a batch of 1000 messages in a single request, each 1KB in size. In addition, the broker has its own limit on the maximum number of messages that can be received (message.max.bytes). Therefore, it is best to match the configurations of the two sides to prevent messages sent by producers from being rejected by the Broker.

. 12. The receive buffer, bytes and the send buffer. The bytes

These two parameters specify the buffer size of the TCP socket for receiving and sending packets, respectively. If they are set to -1, the operating system defaults are used. If the producer or consumer is in a different data center than the broker, these values can be appropriately increased because networks across data centers tend to have higher latency and lower bandwidth.

In order to ensure

Kafka ensures that messages within the same partition are ordered. That is, if a producer sends messages in a certain order, the broker writes them to the partition in that order, and the consumer reads them in the same order. In some cases, order is very important. If set retries for non LingZheng number, at the same time, the Max. In the flight. Requests. Per. The connection is set to the number larger than 1, so, if the first batch message to fail, and the second batch to succeed, the broker will try again to write the first batch. If the first batch is also written successfully, the order of the two batches is reversed.

In general, if some scenario requires messages to be ordered, it is critical that the messages are written successfully, so it is not recommended that the order be very important. If I set REtries to 0. Can put the Max. In. Flight. Requests. Per. The connection is set to 1, so the producers attempt to send the first message, there will be no other messages sent to the broker. However, this severely affects the throughput of the producer, so this should only be done if there are strict requirements on the order of the messages.