1. Kafka

1.1 Kafka structures,

For example, you can deploy Kafka in a Docker environment. Or if you prefer the traditional way, you can build Kafka on Windows and install Kafka on Linux. Of course, it’s the most reliable if you look directly at the website.

1.2 Configuration Parameters

There are many parameters you can tune in Kafka, but most of them can be configured using default parameters

The broker side configuration

  • broker.id

Each Kafka broker is represented by a unique identifier, which defaults to 0 but can be any value. This unique identifier is broker.id.

  • port

Kaka’s listening port, default 9092. However, if you use a port below 1024, you need to start Kakfa with root permission.

  • zookeeper.connect

Zookeeper connection ground. For example, localhost:2181 indicates that the Zookeeper is running on local port 2181. We also can get through it For example, we can through zk1:2181, zk2:2181, zk3:2181 to specify zookeeper. Connect multiple parameter values. The configuration parameters are a set of hostname:port/ Path lists separated by colons, which have the following meanings

  • Hostname: indicates the domain name or IP address of Zookeeper.
  • Port: indicates the Zookeeper connection port.
  • Path: Kafka working root directory. This parameter is optional.

If you have two sets of Kafka clusters, let’s say kafka1 and kafka2, then the zookeeper.connect parameter for both sets of clusters can be specified as follows: Zk2 zk1:2181:2181, zk3:2181 / kafka1 and zk1:2181, zk2:2181, zk3:2181 / kafka2

  • log.dirs

Kafka persists messages to disk in the path specified by this parameter. It is the local system path separated by a set of commas. Log. dirs has no default value, you must manually specify its default value. If more than one path is specified, Kafka implements partition log balancing based on the principle of least use, but only if the logs of the same partition are placed in the same path.

  • num.recovery.threads.per.data.dir

Kafka uses a thread pool to process directory logs. This thread pool is used for:

  • When successfully started, used to open each partition of the log;
  • Check and restore partition logs when recovering from a fault;
  • When off, gracefully turn off logging.

By default, only one thread is used per log directory. Since these threads are only used when the server is started and shut down, it is perfectly possible to set up a large number of threads to operate in parallel. Note, however, that the configured number corresponds to a single log directory specified by log.dirs. . That is to say, if num. Recovery. Threads per. The data. The dir is set to 8, and the dirs specifies three paths, so it will be altogether 24 threads.

  • auto.create.topics.enable

This parameter defaults to true, indicating that the broker will automatically create the theme in the following scenarios:

  • When the producer starts writing messages to this topic;
  • When consumers start reading messages from this topic;
  • When any client queries meta information about this topic.

Theme Default Configuration

Kafka provides a number of default configuration parameters for newly created themes. Here’s a look at these default parameters

  • num.partitions

Specifies the number of partitions to be created for the theme. The default is 1 partition. If automatic theme creation is enabled (which is enabled by default), the number of theme partitions is the value specified by this parameter. Note, however, that we can only increase the number of topic partitions, not decrease them.

  • default.replication.factor

Indicates the number of copies of a message that Kafka keeps. If one copy fails, the other can continue to serve. The default value is 1.

  • log.retention.hours

Kafka generally determines how long data can be retained based on time. This parameter is used to configure the time. The default value is 168 hours, that is, a week. In addition, there are two parameters log.retention. Minutes and log.retention.ms. These three parameters have the same function and all determine how long it takes for messages to be deleted. Log.retention. Ms is recommended.

  • log.retention.bytes

Another way to retain a message is to determine whether the message has exceeded the storage threshold. Its value, specified with the log.retention. Bytes parameter, applies to each partition. That is, if you have a topic with eight partitions and log.retention. Bytes is set to 1GB, the topic can hold up to 8GB of data.

  • log.segment.bytes

When a message is written, it is appended to the log segment file. If the log segment exceeds the size specified by log.segment.bytes (default: 1G), the log segment is opened for append. After an old log segment is closed, it will be deleted after a certain period of time.

  • log.segment.ms

The log.segment.ms parameter specifies how long the log is closed, and log.segment.ms and log.retention. Log fragments are closed when the size or time limit is reached, depending on which condition is satisfied first.

When using the log.segment.ms parameter, one of the scenarios to be aware of is that if there are a large number of partitions and the log segments of these partitions do not reach the specified size, the log segments of these partitions will be closed at the same time when the log.segment.ms time is reached, which may affect disk performance.

  • message.max.bytes

The broker sets the message.max.bytes parameter to limit the size of a single message. The default value is 1000, 000, or 1MB. Note that this parameter limits the size of messages that the sender can send to the broker. If the message exceeds this threshold before sending, but the compressed message is less than this threshold, the sending will still succeed.

This value has a significant impact on performance. The larger the value, the more time it takes for the thread handling network connections and requests to process those requests. In addition, Kafka has another parameter, fetch. Message.max. Bytes, to limit the size of the message that the consumer can fetch.

JVM Parameter Configuration

Currently, Java 8 and above is the most popular version, and for its default G1 garbage collector, you generally only need to adjust these two parameters:

  • MaxGCPauseMillis: Specifies the maximum pause time for garbage collection. The default is 200ms. G1 does its best to keep garbage collection below this threshold, but pauses longer than this if necessary.
  • InitiatingHeapOccupancyPercent: specify how old heap usage ratio will trigger the garbage collection, the default value is 45%. This percentage includes both new generation and old generation.

Kafka itself is very memory efficient, so we can set these two parameters even smaller. In 64 g machine memory, 5 g of Kafka memory case, we can set MaxGCPauseMills to 20 ms, InitiatingHeapOccupancyPercent of 35.

2. Kafka producers

2.1 Creating a Kafka producer

To write a message to Kafka, you first need to create a producer object and set some properties. There are three basic attributes:

  • bootstrap.servers

This property specifies the broker’s address list in the form host:port. The list does not need to contain all broker addresses. Producers will look up other broker information from a given broker. However, it is recommended to specify at least two brokers so that producers can connect to one broker if it goes down.

  • key.serializer

The property value is the class name. This property specifies the class to serialize the key. Kafka Broker accepts only byte arrays, but the producer’s sending message interface allows you to send any Java object, so you need to serialize those objects into byte arrays. Key. The serializer specified class needs to implement org.apache.kafka.com mon. Serialization. The serializer interface, it indicates what will be the serialization of class, its role is to put the object into a byte, Classes that implement the Serializer interface include ByteArraySerializer, StringSerializer, and IntegerSerializer. ByteArraySerialize is the default Serializer for Kafka. One thing to note: Key. serializer must be set, even if you intend to send only the value of the content.

  • value.serializer

As with key.serializer, the class specified by value.serializer serializes the value.

The following is an example:

Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers"."broker1:9092,broker2:9092");
kafkaProps.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
Copy the code

2.2 Sending messages to Kafka

After the producer is created, we can send the message. There are three ways to send messages in Kafka:

  • Fire and forget

    In the simplest send mode, only the interface is called to send a message to the Kafka server, regardless of whether it is written successfully or not. Because Kafka is highly available, messages are written in most cases, but messages are lost in exceptional cases.

    ProducerRecord<String, String> record = new ProducerRecord<>("UserInfo"."Name"."Li Si");
    try {
      producer.send(record);
    } catch (Exception e) {
      e.printStackTrace();
    }
    Copy the code

    The logic is simple:

    • Create a ProducerRecord and specify the subject and key/value of the message. Where the ProducerRecord constructor called here is

      public ProducerRecord(String topic, K key, V value) {}
      Copy the code
    • The send() method is used to send the message, which returns a RecordMetadata Future object, but since we don’t track the Future object, we don’t know the result.

    • Although we ignore the exceptions for sending messages to the broker, we may still encounter some exceptions when calling the send() method, such as serialization exceptions, send buffer overflow exceptions, and so on. So with the try.. catch.. To deal with

  • Synchronous Send

    Calling the send() method returns a Future object whose get() method we can use to determine whether the message was sent successfully.

    ProducerRecord<String, String> record = new ProducerRecord<>("UserInfo"."Name"."Li Si");
    try {
        producer.send(record).get();
    } catch (Exception e) {
        e.printStackTrace();
    }
    Copy the code

    If the message is sent successfully, a RecordMetadata object is returned, otherwise an exception is thrown. The broker returns an unrecoverable exception. For example, if the message is too large, the producer throws the exception directly. Second, recoverable exceptions, such as connection exceptions, will be retried by the producer, and if the retry is not successful after a certain number of times, the exception will be thrown.

  • Asynchronous send

    Provide a callback method when send() is called, which is called back when the broker result is received.

    class ProducerCallBack implements Callback {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if(exception ! =null) {
                exception.printStackTrace();
            }
        }
    }
    
    ProducerRecord<String, String> record = new ProducerRecord<>("UserInfo"."Name"."Li Si");
    try {
        producer.send(record, new ProducerCallBack());
    } catch (Exception e) {
        e.printStackTrace();
    }
    Copy the code

    As shown above, you need to define a Callback for org. Apache. Kafka. Clients. Producer. The Callback class, this interface is only one onCompletion method. If Kafka returns an error, the onCompletion method throws an exception that we can customize.

2.3 partition

In the example above, we declare the subject and content of the message when we create the message, and the key of the message is optional, which defaults to null when no key is specified.

Message keys serve two important purposes: 1) provide additional information describing the message; 2) Used to determine which partition the message will be written to. All messages with the same key will be allocated to the same partition.

If the key is null, Kafka uses the default polling allocator to balance the message across multiple partitions; otherwise, the key is hashed and the message is allocated to the corresponding partition based on the result. Note that the total number of partitions is used when calculating the mapping between messages and partitions. If a partition is unavailable and the message happens to be assigned to that partition, the write will fail. In addition, if additional partitions need to be added, the mapping between messages and partitions will change.

Kafka supports custom partitioning policies. The Partitioner. Class parameter is displayed to configure the producer side

public interface Partitioner extends Configurable.Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close(a);
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}}Copy the code
  • Partition (): This class takes several arguments:topicIs the topic to be passed;keyRepresents the key value in the message;keyBytesRepresents the serialized key and byte array passed in the partition;valueRepresents the value of the message;valueBytesRepresents the serialized array of values in the partition;clusterIndicates the original data of the current cluster. Kafka gives you so much information that you can use it to partition the message and figure out which partition it should be sent to.
  • Close () : inheritedCloseableThe interface can implement the close() method, which is called when the partition is closed.
  • OnNewBatch (): notifies the partitioner to create a new batch

The following is an example of putting a message with key Name on a separate partition

public class NamePartitioner implements Partitioner {
    @Override
    public void configure(Map
       
         configs)
       ,> {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if ((keyBytes == null) | | (! (keyinstanceof String))){
            throw new InvalidRecordException("We expect all messages to have customer name as key");
        }
        if (((String) key).equals("Name")) {return numPartitions; 
        }
        return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1));
    }

    @Override
    public void close(a) {}}Copy the code

2.4 Configuration of producers

Similarly, a good producer is also inseparable from the configuration items according to local conditions, the following will introduce some influential parameters to illustrate:

acks

Acks controls how many replicas must be written to the producer before the message is considered successful. This parameter has a significant impact on the possibility of message loss. There are three values:

  • Acks =0: The producer will not wait at all for any acknowledgement from the server and will consider sending the message to the broker successful. This method has the highest throughput, but is also the most likely to lose messages.
  • Acks =1: the producer considers that the message is sent successfully after the leader of the partition writes a message and returns a successful message, but does not wait for the complete confirmation from all followers. This method can avoid message loss to a certain extent. However, if the group leader breaks down and the message is not copied to another copy, the message will still be lost.
  • Acks =all: The producer waits for all replicas to successfully write the message. This is the safest way to ensure that records are not lost as long as at least one synchronous replica is still active, equivalent to the acks= -1 setting. But the delay is also the biggest.

buffer.memory

This parameter sets the memory size of the messages sent by the producer buffer, and if the record is sent faster than it can be sent to the server, the producer will block max-block. ms, after which it will throw an exception.

compression.type

Messages are not compressed by default. This parameter can be none, gzip, snappy, LZ4, or ZSTD. By using compression, we can save on network bandwidth and Kafka storage costs.

retries

When a producer sends a message and receives a recoverable exception, it retries. This parameter specifies the number of retries. By default, the producer waits 100ms between retries, and this wait parameter can be modified with retry.backoff.ms. It is recommended that the total retry time be longer than the time it takes for the cluster to re-elect the leader to avoid the failure caused by the producer ending the retry prematurely.

batch.size

When multiple messages are sent to a partition, the producer sends them in batches. This parameter specifies the maximum size (in bytes) of a batch message. When batch messages reach this size, producers send them to the broker together.

linger.ms

This parameter specifies how long a producer must wait before sending a batch message. When this parameter is set, the producer will send a batch message to the Broker after it arrives, even if the specified batch message size has not been reached. By default, the producer’s sending thread sends messages whenever it is idle, even if there is only one message. When this parameter is set, sending threads wait a certain amount of time, which increases throughput by sending messages in batches, but also increases latency.

client.id

This parameter can be any string that the broker uses to identify which client the message is coming from.

max.in.flight.requests.per.connection

This parameter specifies how many messages a producer can send to the broker and wait for a response. Setting this parameter to a higher value can improve throughput but also increase memory consumption. In addition, setting it too high can reduce throughput because batch messages are less efficient. Setting it to 1 ensures that the order in which the send methods are sent to the broker is the same as the order in which they are called, even if there is a retry after a failure.

timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms

These parameters control how long the producer waits for the broker to respond. Request. A timeout. Ms specified to send data, waiting for the response time, metadata. The fetch. The timeout. Ms specified for metadata (such as division of the group of the first information) waiting for the response time. Timeout. ms specifies how long the broker will wait for responses from other replicas (related to the acks parameter) before returning the result. If the time is up but no response from other replicas, the return message fails to be written.

max.block.ms

This parameter specifies how long the application will block when calling the send method or retrieving metadata methods (such as partitionFor), after which a timeout exception will be thrown.

max.request.size

This parameter limits the size of packets sent by producers. The size of packets is related to the size and number of messages. If we specify a maximum packet size of 1M, the maximum message size is 1M, or up to 1000 messages of 1K can be sent in batches. In addition, the broker also has a message.max. Bytes parameter to control the size of packets received. In practice, it is recommended that these parameter values match to avoid producers sending more data than the broker limits.

receive.buffer.bytes, send.buffer.bytes

Kafka is implemented based on TCP. To ensure reliable message transmission, these two parameters specify the buffer size of the TCP Socket for receiving and sending packets, respectively. If they are set to -1, the operating system defaults are used.