Three Kafka receives consumption messages

This tutorial is used in Windows. If you are learning Linux, you can replace the command prefix bin\ Windows \kafka-topics. In Linux, the command prefix is bin\kafka-topics.

3.1 create a topic

Kafka uses producer producers to produce messages, and its core component server is broker. Comsumer consumers are used to consume messages. Topic is used to receive messages. There are partitions and replicas in topics;

Create a topic named test and specify partition 1 and replica 1.

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
Copy the code

Use the following to view the topic description

bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
Copy the code

The result is as follows, partition 1, copy 1, and name test

Topic: test     TopicId: hkPExRf8T72y2FFNEOiFnQ PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Copy the code

3.2 Message Occurrence

Create a producer to send a message to topic test

bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test 
Copy the code

Press Enter to send the following message

welcome to my site that is zszxz.com
Copy the code

3.3 Consumption Message

Create a consumer to get messages from Topic Test

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
Copy the code

Get the following

Four producer

4.1 Producer workflow

Kafaka’s producer is responsible for writing data to Kafka; Each producer in Kafka works independently. There is no relationship between the producer instances. When kafka sends a message to a topic, if the message has a key specified, Kafka computes the hash value of the key and stores the message in a different partition to improve throughput. If the message does not have a key specified, Kafka polls the message to the partition. After confirming the partition, Kafka’s producer looks for the leader of the partition. Only the leader can respond to requests sent by the client, while the followers and the leader in the other copy keep in sync.

4.2 Example for sending Messages from Producer

Introducing client dependencies. Producer and Cosumer are both clients in kafka, so they both introduce client dependencies.

		<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
Copy the code

Send example message example

 public static void main(String[] args) {
        Properties properties = new Properties();
        // Specify the server address bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        // Specify the client ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"test-producer");
        // Serializer for key is set to String key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // Value Serialization Set the value to String value.serializer
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // Create a producer
        KafkaProducer<String,String> producer=new KafkaProducer(properties);
        / / message
        String msg = "welcome to zszxz.com";
        / / theme
        String topic = "test";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
        producer.send(record);
        producer.close();
    }
Copy the code
  1. You first need to build a java.util.properties object specificationbootstrap.servers.key.serializer.value.serializer, these three attributes must be specified;
  2. Secondly, KafkaProducer instance is constructed.
  3. The ProducerRecord is then used to build a message object to send messages to the partition;
  4. Finally, KafkaProducer’s send method is called to send ProducerRecord and KafkaProducer is closed.
  • Bootstrap. servers specifies a set of hosts: ports that kafka uses to connect to the server; It can specify multiple sets of IP and port, such as domain1: port1, domain2: port2;
  • Key. serializer specifies that the format for sending messages is byte arrays. These byte arrays must be serialized before being sent to the broker.StringSerializer.class.getName()Represents the serializer fororg.apache.kafka.common.serialization.StringDeserializer;
  • Value. Serializer serializes message content to the broker;

You can also specify key and value serializers when creating KafkaProducer.

 public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(Utils.propsToMap(properties), keySerializer, valueSerializer);
    }
Copy the code

Asynchronous send

KafkaProducer’s Send method implements the Callback and provides a Future object for retrieving the result of sending a message.

public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, (Callback)null);
    }
Copy the code

The synchronous

Synchronous sending uses the future.get () method to implement an infinite wait for results; If the message fails to be sent, you can catch the exception

 producer.send(record).get();
Copy the code

Message exception capture

	try {
            producer.send(record).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
Copy the code

It can also handle callback exceptions and implement retry mechanism

 producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e==null){
                        System.out.println("Message sent successfully");
                    }else {
                        // Retry or other processing
                    }
                }
            }).get();
Copy the code

4.3 Producer Main parameter description

The bootstrap.servers, key. Serializer, and value. Serializer parameters must be specified.

  • The acks parameter is used to control the persistence of Kafka messages. As long as a message is written to Kafka, it is considered unlost! Ack has three values; 0 indicates that producer does not confirm whether a message was sent successfully to the broker. 1 indicates that the leader receives the message acknowledgement and writes the message to the local log. -1 is all, which needs to be confirmed by the leader and followers. Under the three conditions, 0 has the highest throughput and the worst message persistence, followed by 1, which has moderate message persistence. All has the worst throughput and the best message persistence.

Usually we need to set acks to 1

// Set the acks reply
properties.put(ProducerConfig.ACKS_CONFIG,"1");
Copy the code
  • Buffer. memory: the size of the buffer used to cache messages. The default unit is 32MB (33554432).
// Set the buffer size
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
Copy the code
  • Compression. Type: Compresses the message. By default, it is none. In network communication, message compression is usually used to improve throughput, so it is necessary for Kafka to compress messages when sending them. Kafka supports three compression algorithms, and there are others, usually GZIP,LZ4, and Snappy; In Kafka using LZ4 algorithm compression is better!
// Set compression
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
Copy the code
  • Retries: The number of attempts to retry a message if it fails. Callback also has this mechanism, but kafka might as well send the callback itself. The default value is 0, but the default value displayed on my console isretries = 2147483647In order to be safe, we need to set the general situation of 3~5 times is better;
// Number of retries
properties.put(ProducerConfig.RETRIES_CONFIG,"3");
Copy the code
  • Batch. size: indicates batches. When producer sends messages, kafka does not send messages directly to a topic. Kafka stores messages to the Batch. The default value is 16384, that is, 16KB. The value should be set to a proper value in the actual production environment.
/ / set the batch
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
Copy the code
  • Linger. Ms: indicates whether to delay sending. The default value is 0. Therefore, when we use the default parameter, we will send batch when it is not full, which is the impact!
 // Set delay
properties.put(ProducerConfig.LINGER_MS_CONFIG,"200");
Copy the code
  • Max-request. size: specifies the size of the request to be sent. The default value is 1048576, which is too small for complex business situations;
// Set the message size
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,"1048576");
Copy the code
  • Request.timeout. ms: indicates the response timeout time, which is the time for the broker to respond after sending the request to the broker. The default value is 30000, or 30 seconds. If the application load is heavy, adjust the size to an appropriate value.
// Set the timeout response time
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,"50000");
Copy the code

4.4 Customizing Partitions

When Kafka receives a message, it calculates the hash value based on the Key value by default and sends the message to the partition. If no partition is specified, polling is performed to ensure that the message is evenly distributed on the partition. If you want to customize the partitioning policy, you need to implement the Partitioner interface.

/ * * *@author lsc
 * <p> </p>
 */
public class ZszxzPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close(a) {}@Override
    public void configure(Map
       
         map)
       ,> {}}Copy the code

Main parameters

  • The topic theme
  • Key Key of the message
  • KeyBytes: an array of bytes for message keys, or null;
  • Value: indicates the message value
  • ValueBytes: a byte array of message values, or NUL;
  • Cluster: cluster

Custom partition example

/ * * *@author lsc
 * <p> </p>
 */
public class ZszxzPartitioner implements Partitioner {

    private Random random;
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // Get the partition in the cluster
        List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
        // Number of partitions
        int size = partitionInfos.size();
        int partitionNum = 0;
        if (key==null) {// Key not set, random partition
            partitionNum = random.nextInt(size);
        }else {
            // Compute partitions using the hash value
            partitionNum = Math.abs((key.hashCode())) % size;
        }
        System.out.println("Partition:"+partitionNum);
        return partitionNum;
    }

    @Override
    public void close(a) {}@Override
    public void configure(Map
       
         map)
       ,> {
        random = newRandom(); }}Copy the code

Configure the partition properties when sending messages

 // Custom partition
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.zszxz.kafka.partition.ZszxzPartitioner");
Copy the code

4.5 Kafka serialization

Kafka serialization and deserialization can be configured according to different data types

  • Byte [] : org.apache.kafka.com mon. Serialization. ByteArraySerializer
  • ByteBuffer: org.apache.kafka.common.serialization.ByteBufferSerializer
  • Interger: org.apache.kafka.common.serialization.IntegerSerializer
  • Short:org.apache.kafka.com mon. Serialization. ShortSerializer
  • Long:org.apache.kafka.com mon. Serialization. LongSerializer
  • Double:org.apache.kafka.com mon. Serialization. DoubleSerializer
  • String: org.apache.kafka.com mon. Serialization. StringSerializer

The previous example code used a simplified approach, so recall that it would otherwise be replaced with a full class name

 // Key serialization configuration type is String
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
  // value The serialization configuration type is String
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
Copy the code

4.6 Multithreading

KafkaProducer is a variable shared by threads. It can be used as a member variable and is thread safe. A ProducerRecord is maintained in each thread to store messages;

Sample code is as follows

/ * * *@author lsc
 * <p> </p>
 */
public class ProducerThread extends Thread{

    private final KafkaProducer<String,String> producer;
    private final String topic;

    public ProducerThread(String topic) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"test-producer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        producer=new KafkaProducer<String, String>(properties);
        this.topic = topic;
    }

    @Override
    public void run(a) {
        int num=0;
        while(num<30) {
            String msg="hello this message from producer:"+num;
            try {
                producer.send(new ProducerRecord<String, String>(topic,msg)).get();
                TimeUnit.SECONDS.sleep(2);
                num++;
                System.out.println(num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch(ExecutionException e) { e.printStackTrace(); }}}public static void main(String[] args) {
        new ProducerThread("test").start(); }}Copy the code

Five consumer

5.1 consumers

Consumers get their information from topics; But Kafka’s consumer has a few other features; Consumers are grouped under the Consumer Group. There may be multiple consumers under each group;

Thus, two models are derived

Queuing models

Publish and subscribe model

As shown in the figure, two clients form a group. Each GRUop has a group.id as a unique identifier;

If the consumer sends a message from the outage location, its identification location is implemented using offset. The Consumer periodically sends offsets to Kafka to implement the shift commit. Before version 0.9 of Kafka, consumers stored offsets in Zookeeper by default. Starting with version 0.9, consumers stored offsets in a built-in Kafka topic called __consumer_offsets by default

5.2 Example of consumer receiving messages

/ * * *@author lsc
 * <p> </p>
 */
public class ConsumerTest {

    public static void main(String[] args) {
        Properties properties=new Properties();
        // Set the address
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        // Set group ID
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
        // Set offset to commit automatically
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // Automatic commit interval
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        / / set the value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        / / set the key
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // For the current groupid, the message offset is consumed from the earliest message
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // Consumer instance
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        / / theme
        String topic = "test";
        / / subscribe
        kafkaConsumer.subscribe(Arrays.asList(topic));
        try{
            while (true){
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(30));
                records.forEach(record -> {
                    System.out.println("key:"+record.key()+" value:"+record.value()+""+record.offset()); }); }}finally{ kafkaConsumer.close(); }}}Copy the code

Bootstrap. servers, value.deserializer, key.deserializer, group.id must be specified. The bootstrap.servers can also specify multiple values ip1:port1,ip2:port2; Group. Id indicates the ID of a consumer group, usually associated with the service name. Value. Deserializer and Key. deserializer deserialize messages sent by producer, respectively.

  • First create the Properties object and then assemble the parameters;
  • Second, the KafkaConsumer consumer instance subscribes to the topic Test
  • Finally, the ConsumerRecords traversal data is obtained through kafkaConsumer.

The KafkaConsumer object constructor is shown below

public KafkaConsumer(Properties properties) {
        this((Properties)properties, (Deserializer)null, (Deserializer)null);
    }
Copy the code

Deserialization of key and value can also be specified

public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
    }
Copy the code

Attribute parameters don’t have to be Properties, they can also be map;

public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    }
Copy the code

kafkaConsumer.subscribe(Arrays.asList(topic)); For subscribed topics, you can also subscribe to multiple topics

kafkaConsumer.subscribe(Arrays.asList("topic1","topic2"));
Copy the code

kafkaConsumer.poll(Duration.ofSeconds(1)); Get the message from the topic; 1 indicates the timeout setting. If the data is not received, it will be blocked within 1 second until the data is received.

5.3 Main Parameters of Consumer

  • Session.timeout. ms: Consumer crash response time; In the event of an outage, Kafka detects a consumer crash within a specified amount of time. The default value is 1000, or 10 seconds;
  • Max.poll.interval. ms: maximum time for consumer processing logic. The default value is 300000.
  • Auto-.offset. Reset: Specifies the displacement information. Kafka reads from the displacement position. It has three values, “earliest consumption”; Latest c. None Raises an exception when a new consumer is added, because there was no offset before.
  • Enable.auto.com MIT: automatic submit shift; Manual submission is also possible using consumer.commitSync();
  • Bytes: the maximum number of bytes to fetch. The default value is 52428800.

5.4 Kafka deserialization

  • org.apache.kafka.common.serialization.ByteArrayDeserializer
  • org.apache.kafka.common.serialization.ByteBufferDeserializer
  • org.apache.kafka.common.serialization.IntegerDeserializer
  • org.apache.kafka.common.serialization.ShortDeserializer
  • org.apache.kafka.common.serialization.LongDeserializer
  • org.apache.kafka.common.serialization.DoubleDeserializer
  • org.apache.kafka.common.serialization.StringDeserializer

As a reminder, we used the class name fetch method to implement the full class name string

/ / set the value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
/ / set the key
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
Copy the code

5.5 rebalance

The Consumer Group is a set of rebalance protocols. It specifies how comsuer Group subscribes to topic partitions to achieve balance; Kafka has three allocation strategies: RoundRobin, Range, Sticky;

  • The Range allocation strategy arranges the partitions for each topic in alphabetical order, dividing the partitions into sections and assigning them to consumers in turn.
  • The RoundRobin allocation policy expands all topics in order and then allocates them to consumers through polling.
  • The Sticky partitioning policy was introduced in version 0.11 and serves two main purposes; The distribution of partitions should be as uniform as possible, and the number of theme partitions allocated to consumers should differ at most one; Partitions should be allocated as much as possible as they were last allocated

5.6 Consumer Multithreading example

Simply inherit the Thread class

/ * * *@author lsc
 * <p> </p>
 */
public class ConsumerThread extends Thread{

    private final KafkaConsumer<String,String> consumer;
    public ConsumerThread(String topic){
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
        // Set offset to commit automatically
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // Automatic commit interval
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        / / set
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        // Key serialization
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        // value serializes
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        // For the current groupid, the message offset is consumed from the earliest message
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        consumer= new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run(a) {
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            records.forEach(record -> {
                System.out.println("key:"+record.key() + " value:" + record.value() + "offset:"+ record.offset()); }); }}public static void main(String[] args) {
        new ConsumerThread("test").start(); }}Copy the code

Kafka tutorial

  • Kafka is introduced
  • Install Kafka for Linux and install Kafka for Windows
  • Kafka receives consumer messages from producers
  • To continue to get has been updated

The Knowledge Seeker site asks for your support