Introduction to the

A high-performance cross-language distributed message queue developed by Scala with single-machine throughput up to 10W and message latency at ms level. Kafka is a fully distributed system. Brokers, producers, and consumers all automatically support distribution and rely on Zookeeper for distributed coordination. Kafka supports multiple reads on a write, messages can be consumed by multiple clients, and messages can be duplicated without being lost.

Kafka core composition

  1. Producer publishes messages to the broker in push mode. Each message is appended to a partition, which is a sequential write disk

  2. Topic is essentially a directory consisting of Partition Logs

    • Partition reason: Easy to scale in the cluster; Improve the concurrency. The read/write unit is partition
    • Partition rule: If the partition is specified, use it directly. If a key is specified but no partition is specified, the partition is selected using the hash value of the key. If neither partition exists, use polling to select a partition
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic)
            
            if (availabelPartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                returnUtils.toPositive(nextValue) % numPartitions; }}else {
            returnUtils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }}Copy the code
  3. The same partition may have multiple replication (server. The default properties. The replication. Factor = N). In the absence of Replication, data on all partitions on the broker cannot be consumed if the broker fails, and producer cannot write data to that partition. After replication is introduced, there may be multiple Replicas on the same partition. In this case, a leader needs to be selected. The producer, consumer, and leader interact. The other replicas act as followers and copy data from the leader.

  4. Writing process

  • Zookeeper’s “/brokers/… The /state” node finds the leader of the partition
  • Producer sends messages to the leader
  • The leader writes the message to the local log
  • The followers write the leaderpull message to the local log and send an ACK to the leader
  • After receiving all replication acks from the ISR, the leader adds HW (high watermark, and finally submits Offset) and sends ACKS to the producer
// Customize the partition producer
import java.util.Map
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster

public class CustomPartitioner implements Partitioner {
    @Override
    public void configure(Map
       
         configs)
       ,> {}@Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }
    
    @Override
    public void close(a) {}}/ / use
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class PartitionerProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        
        props.put("boostrap.servers"."hadoop102:9092");
        props.put("acks"."all");
        props.put("retries".0);
        props.put("batch.size".16384);
        // Increase server request latency
        props.put("linger.ms".1);
        props.put("buffer.memory".33554432);
        
        props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        
        // Custom partition
        props.put("partitioner.class"."com.arkmu.kafka.CustomPartitioner");
        
        KafkaProducer<String, String> producer = new KafkaProducerM<>(props);
        producer.send(new ProducerRecord<String, String>("first"."1"."arkmu")); producer.close(); }}Copy the code

The interceptor

The Producer interceptor, introduced in Kafka0.10, provides customized control for the clients section.

// Time interceptor
import java.util.Map
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TimerInterceptor implements ProducerInterceptor<String.String> {
    @Override
    public void configure(Map
       
         configs)
       ,> {}@Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toStrng());
    }
    
    @Override
    public void onAcknowledgement(RecordMetadata, Exception exception) {}@Override
    public void close(a) {}}/ / use. props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.arkmu.kafka.interceptor.TimeInterceptor"); .Copy the code

Kafka partition number

  1. Create a Topic with only one partition
  2. Test the throughput of Producer (Tp) and Consumer (Tc) of the Topic
  3. Assuming the total target throughput is Tt, the number of partitions =Tt/min(Tp, Tc)

The number of partitions is generally set to 3-10

ISR copy synchronization queue

In-sync Replicas (ISR) : specifies the replica synchronization queue. An ISR contains the Leader and Follower. If the Leader process fails, a server in the ISR queue is selected as the new Leader. Messages and replica.lag.time.max.ms determine whether a server can be added to the ISR replica queue. Replica.lag.max. message was removed in version 0.10 to prevent services from frequently entering the ISR queue.

When any dimension exceeds the threshold, the followers are removed from the ISR and stored in the Outof-Sync Replicas (OSR). New followers are also stored in the OSR.

Partition allocation policy

There are two default partition allocation policies in Kafka: Range and RoundRobin.

  • The Range:

    1. Sort the partitions within the same Topic by ordinal order and the consumers by alphabetical order.
    2. Divide the number of Partitions by the total number of consumer threads to determine the number of Partitions consumed by each consumer thread
  • RoundRobin

    1. All subject partitions form the TopicAndPartition list
    2. Sort the TopicAndPartition list by hashCode
    3. Sent to each consumer thread in a polling fashion

Data loss

  1. Ack = 0, Offset increases when the message is sent
  2. Ack = 1, the Leader adds Offset after receiving the Ack from the Leader Replica on a message
  3. Ack = -1 The Leader adds Offset after receiving all Ack packets sent by the Replica for a message

Message backlog

  1. If Kafka is short of spending power, increase the number of Topic partitions and increase the number of consumers in the consumer group
  2. If the downstream data processing is not timely, improve the pull quantity of each batch

Parameter optimization

  1. Broker Parameter Configuration (server.properties)
// Log retention policy configuration# keep three days, can be shorter (log. The cleaner. Delete. Retention. Ms)Log. Retention. Hours = 72 / / up related configuration. The default replication. The factor: 1 copy of the default 1 / / network delay up. Socket. A timeout. Ms: 30000If the network between clusters is unstable, increase this parameter
replica.lag.time.max.ms = 600000    # If the network is not good, or the kafka cluster is under a lot of pressure, the replica will be lost, and then the replica will be duplicated frequently, which will cause a lot of pressure on the cluster
Copy the code
  1. Producer.properties
compression.type:none   It is recommended to configure a suitable compression algorithm to greatly reduce the network stress and the storage stress of the Broker
Copy the code
  1. Kafka memory adjustment (kafka-server-start.sh)
The default memory is 1 GB, and the production environment should not exceed 6 GB
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
Copy the code

Efficient data reading

  1. Distributed cluster, at the same time using partition technology, high concurrency
  2. Sequential write to a disk: Kafka’s Producer produces data and writes it to a log file. The write process is sequential write to the end of the file. Data from the official website shows that sequential disk writes can be up to 600M/s, while random writes are only 100K/s
  3. Zero copy technique

Size of a single log

The maximum size of a kafka message body is 1 MB by default. However, messages larger than 1 MB often occur in application scenarios. You need to set this value in server.properties

replica.fetch.max.bytes: 1048576    # Maximum number of bytes that broker can copy messages. The default is 1M
replica.max.bytes: 1000012  # kafka maximum size limit for receiving a single message. Default is 1M
Copy the code

Clearing expired data

log.cleanup.policy=delete   # Delete policy
log.cleanup.policy=compact  # Compression strategy
Copy the code

Consume data over time

Map<TopicPartition, offsetAndTimestamp> startOffsetMap = KafkaUtil.fetchOffsetsWithTimestamp(topic, sTime, kafkaProp);
Copy the code

Kafka command line operation

  • bin/kafka-server-start.sh config/server.properties
  • bin/kafka-server.stop.sh stop
  • bin/kafka-topics.sh –zookeeper hadoop102:2181 –list
  • bin/kafka-topics.sh –zookeeper hadoop102:2181 –create –replication-factor 3 –partitions 1 –topic first
  • bin/kafka-topics.sh –zookeeper hadoop102:2181 –delete — topic first
  • bin/kafka-console-producer.sh –broker-list hadoop102:9092 –topic first
  • bin/kafka-console-consumer.sh –zookeeper hadoop102:2181 –from-beginning –topic first
  • bin/kafka-topics.sh –zookeeper hadoop102:2181 –describe –topic first

Flume and Kafka integration

a1.sources = r1;
a1.sinks = k1;
a1.channels = c1;

a1.sources.r1.type = exec; a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log a1.sources.r1.shell = /bin/bash -c a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092  a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.source.r1.channels = c1 a1.source.k1.channels = c1Copy the code