preface

Kafka, as the most widely used message queue in the field of big data at present, its internal implementation and design are worth further study and analysis. Using Kafka requires first working with Producer development and then consumer development. The following is the analysis of producer.Copy the code

Producer Outline Design

Sending Brief Flow chartCopy the code

The process is as follows: <1>KafkaProducer first serializes the data that needs to be sent using a serializer. <2> KafkaProducer then uses a partitioner to determine the partition of the Topic that the data needs to be sent. Kafka provides a default partitioner. The destination partition is determined based on the hash value of the key. If no key is specified, the destination partition is determined using 'polling', which maximizes the message load of each partition. After the partition is determined, The leader node of the partition (the primary node that handles message reads and writes for the partition) will be further identified, messages will be buffered into the buffer pool, and messages will be sent in batches when they reach a certain number or sizeCopy the code

Send messages synchronously or asynchronously

<1> Synchronous message sending advantages: It can ensure that each message is written to the broker correctly, which is suitable for situations where results need to be sent immediately. It can also guarantee the correctness of results in the case of producer failure or outage. Message loss can be avoided. Disadvantages: Low performance and throughput due to synchronous sending, which requires each message to be sent to the broker in a timely manner, without buffering batch operations. <2> Advantages of asynchronous message sending: Messages can be buffered through the buffer pool and then sent in batches, greatly reducing the frequency of interaction with the broker. The performance is very high, and the sending result can be obtained through the callback mechanism. Large throughput. Disadvantages: When the producer fails or restarts directly, message sending results may be lost, which is not applicable to scenarios that require high message accuracyCopy the code

Using producer is very simple

1. Initialize the producer object and set the KafkaProducer() parameter producer = KafkaProducer(bootstrap_Servers =[' Host1:9092 ',' Host2 :port']) 2. Create ProducerRecord message # MSG = XXXXX 3 Send ('topic_1', MSG) 4 #producer.close()Copy the code

How do messages partition

A producer can specify a topic when sending a message to a Kafka message queue. So how do you specify partitions? <1> Message partition policy: the calculation of partition value can be divided into three cases: 1) In the case of specifying partition, the specified value is directly used as the partiton value; However, clients need to consider data balancing when specifying partition information. 2) If the partition value is not specified but there is a key, mod the hash value of the key and the number of partitions of the topic to obtain the partition value; 3) In the case of neither partition value nor key value, a random integer is generated during the first call (the integer is incremented in each subsequent call), and the partition value is obtained by modulating this value with the total number of partitions available for topic. Also known as the polling algorithm. < 2 > custom partition strategy can be implemented by org. Apache. Kafka. Clients. Producer. The Partitioner set partitioning strategy in constructing KafkaProducer is configuration parameters Partitioner class for custom partition typeCopy the code

Single threaded code example

from kafka import KafkaProducer

Instantiate a producer object using the KafkaProducer class
producer = KafkaProducer(bootstrap_servers=['host1:9092'.'host2:port'])  
# where IP can be multiple,
# bootstrap_servers = [' 0.0.0.1:9092 ', '0.0.0.2:9092', '0.0.0.3:9092]

for i in range(3):
    msg = "msg%d" % i
    producer.send('topic_1',msg)  Write the message to topic. Test is topIC_1
producer.close()
Copy the code

Multithreaded code example

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import random
import time
import threading
from kafka import KafkaProducer
from traitlets import log

# callback function (success)
def on_send_success(record_metadata) :
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

# callback function (failed)
def on_send_error(excp) :
    log.error('I am an errback',exc_info=excp)
    # handle exception

def send_messag(topic,id,interval_time) :
    producer = KafkaProducer(bootstrap_servers=['cdh01:9092'.'cdh02:9092'.'cdh03:9092'])
    while True:
        startTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

        message = "|".join((id,startTime,str(interval_time)))

        producer.send(topic,message.encode("utf-8")).add_callback(on_send_success).add_errback(on_send_error)
        How long does it take to produce a message
        time.sleep(interval_time)

interval_times = [60.300.600.1200.1800.3600]

# start 50 threads
for i in range(0.50):
    deviceId = "%06d" % random.randint(0.999999)

    interval_time = times[random.randint(0.len(interval_times) - 1)]

    recv_thread = threading.Thread(target=send_messag, args=("my-topic".id, interval_time))
    
    The child thread is destroyed after the main thread exits
    recv_thread.setDaemon(True)
    recv_thread.start()

while True:
    The main thread is always running
    time.sleep(5000)   
Copy the code