Python Kafka producer sends data frequently missing data?

The condition in which a producer loses a message

After a Producer invokes the send method to send messages, the messages may not be sent due to network problems.

Therefore, we cannot default to sending a message after calling send.

To determine whether the message was sent successfully, we need to determine the result of the message sent. Note that Kafka producers use send to send messages asynchronously.

Python Kafka producers send data in three ways

The synchronous

  • Sending data takes the longest time
  • The status of sending data prevents data loss and ensures high data reliability

When a message is sent synchronously, it is sent one by one. Judging the result returned by each message, you can clearly know the status of each message. However, because the synchronous mode blocks, the next message will be sent only when the message returns the future object via GET

Asynchronous send

  • Sending data takes the shortest time
  • Data may be lost and the data reliability is low

Since the return result of sending the message is not retrieved, the throughput is highest in this way, but the reliability of the message is not guaranteed, so data may be lost. (Most examples on the web use asynchronous operations.)

Asynchronous sending + callback processing

  • Sending data takes a fast time
  • Data is sent, preventing data loss and ensuring high data reliability

When the send method is called to send a message, specify a callback function. When the server returns a response, the callback function is invoked to handle the exception. When the callback function is invoked, the producer ends only after the callback function is executed.

The scene that

Although the three methods have some differences in time, it does not mean that the faster the better, depending on the application scenario of the business

Scenario 1: If the business requires messages to be sent sequentially, use synchronous and retry only in one partation with the retries parameter set to max_IN_FLight_requestS_per_connection =1. The producer can be controlled to send only one message before receiving a response from the server, so as to control the order of sending messages.

Scenario 2: If the service only cares about the message throughput and allows a small number of messages to fail to be sent and does not care about the message sending order, the send and forget mode can be used with the parameter acks=0. In this way, the producer does not need to wait for the response from the server and sends the message at the maximum speed supported by the network.

Scenario 3: If a service needs to know whether the message is sent successfully and does not care about the message order, it can send the message in asynchronous + callback mode with the parameter REtries =0 and record the failed message to a log file.

The sample code

# -*- coding:utf-8 -*- import json from kafka import KafkaConsumer, KafkaProducer class KProducer: def __init__(self, bootstrap_servers, topic): self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers, value_serializer=lambda m: Json. Dumps (m).encode(' ASCII '),) # json formatted content self.topic = topic def sync_producer(self, data_li: list): Return: """ for data in data_li: future = self.producer.send(self.topic, Data) record_metadata = future.get(timeout=10 Print ('save success, partition: {}, offset: {}'. Format (partition, offset)) def asyn_producer(self, data_li: list): """ param data_li: send data :return: """ for data in data_li: Self.producer. send(self. Topic, data) self.producer.flush() # def asyn_producer_callback(self, data_li: list): """ Sending data asynchronously + processing the sending status :param data_li: sending data :return: """ for data in data_li: self.producer.send(self.topic, Data).add_callback(self.send_success).add_errback(self.send_error) self.producer.flush() # def send_success(self, *args, **kwargs): "" print('save success') return def send_error(self, *args, **kwargs): "" print('save error') return def close_producer(self): try: self.producer.close() except: pass if __name__ == '__main__': send_data_li = [{"test": 1}, {"test": 2}] kp = KProducer(topic='topic', = '127.0.0.1 bootstrap_servers: 9001127.00 0.1:9002') # KP. The synchronous sync_producer (send_data_li) # asynchronously KP. Asyn_producer_callback (send_data_li) KP. Close_producer ()Copy the code