The relevant knowledge

1.Kafka provides the Producer class as the API of the Java Producer, which has two sending modes, Sync and Async. The default mode is sync, which means that the calling class of the producer does not return until the message is actually sent to the queue. (1) In the Producer of Java API provided by Kafka, the underlying layer only maintains the connection between the topic and each broker, rather than a connection pool in the traditional sense. When using sync, we should implement our own connection pool containing several Producer objects to maximize write efficiency. (2) Sync mode should be used when the frequency of data written is not high or the written results are required, otherwise additional delay will be introduced due to the async waiting time. (3) When the data is written with high frequency, async should be used to write in batch form to obtain maximum efficiency. The async approach differs from the sync approach in that a ProducersEndThread object is created when Scala’s Producer is initialized. Then send the call, it is not direct call eventHandler. Handle method, but put a message in a length of queue. The buffering. Max. The parameters of the messages defined queue (10000 by default), when the queue to satisfy the following two conditions, The EventHandler. Handle method is triggered by ProducersEndThread to send messages in the queue as a batch ① longer than the value defined by Queue. Buffering. Max. ms. Default: 5000ms ② The number of current messages in the queue exceeds the value defined by batch.num. Messages. Default: 200

2.Kafka Consumer has two high-level API of Consumer and a simple API — SimpleConsumer (1) Consumer’s high-level API is Consumer and ConsumerConnector. The Consumer here is the static factory class class Consumer {in ConsumerConnector

public static kafka.javaapi.consumer.ConsumerConnector

createJavaConsumerConnector(config: ConsumerConfig);


The specific messages are consumed in the ConsumerConnector

Create a message processing flow containing all topics, Decoder public Map>> CreateMateMessage Stream (Map TopicCountMap, Decoder KeyDecoder, Decoder ValueCoder);

Decoder Public Map>> CreateMessageStreams (Map TopicCountMap) Decoder Public Map>> CreateMessageStreams (Map TopicCountMap)

Obtain the specified message topic, and according to the specified Decoder public List > > createMessageStreamsByFilter (TopicFilter TopicFilter, int numStreams, Decoder keyDecoder, Decoder valueDecoder);

Specify the message topic, use the default Decoder public List > createMessageStreamsByFilter (TopicFilter TopicFilter);

Submit offsets to the consumer’s connected Topic public void commitoffSets ();

Shutdown consumer public void shutdown();

At the top of the API is more commonly used in the public List > createMessageStreamsByFilter (TopicFilter TopicFilter); And public void commitOffsets ();

(2) Consumer’s simple API — SimpleConsumer

Batch get messages public FetchResponse fetch (request: kafka. Javaapi. FetchRequest);

The topic yuan information public kafka. Javaapi. TopicMetadataResponse send (request: kafka. Javaapi. TopicMetadataRequest);

To obtain an offset currently available to the public kafka. Javaapi. OffsetResponse getOffsetsBefore (request: OffsetRequest);

Public void close();

For most applications, the high-level API is sufficient, but for further control, you can use a simple API, such as SimpleConsumer if the consumer restarts and wants to get the latest offset.

System environment

Linux Ubuntu 20.04




IntelliJ IDEA 2021.1 (Ultimate Edition)

In this experiment, a simple Java API is used to simulate the producer and consumer of Kafka. The producer generates content through a while loop and then passes the content to Kafka, while the consumer reads the content from Kafka. And output to the Console screen.

Task steps

1. Open Idea, create a new Java project, and add the configuration file from Hadoop to Resources. 2. Add Maven dependencies

<! -- --> <dependency> < the groupId > org. Apache. Kafka < / groupId > < artifactId > kafka - clients < / artifactId > < version > 1.1.1 < / version > < / dependency >

Start ZooKeeper. Switch to /apps/ZooKeeper /bin and execute the ZooKeeper startup script.

cd /apps/zookeeper/bin  
./ start  

Check the health of ZooKeeper.

./ status  

4. Switch to /apps/kafka and start Kafka’s server.

cd /apps/kafka  
bin/ config/ &  

5. In another window, go to /apps/kafka and create a topic in kafka. Name it dblab01.

cd /apps/kafka  
bin/ \  
--create \  
--zookeeper localhost:2181 \  
--replication-factor 1 \  
--topic dblab01 \  
--partitions 1  

See the topic

cd /apps/kafka
bin/  --list  --zookeeper  localhost:2181  

6. Create a producer of Kafka for the production of data. Under the package, create a Class and name it MyProducer.

package; import org.apache.kafka.clients.producer.*; import java.util.Properties; public class MyProducer { public void produce(){ Properties props = new Properties(); // Set the address of the Kafka cluster props. Put ("bootstrap.servers", "localhost:9092"); Put ("acks", "all") : props. Put ("acks", "all"); Tries (props. Put ("retries", 0); (props. Put ("batch.size", 16384)); (props. Put ("batch.size", 16384)); props.put("", 1); (props. Put ("buffer.memory", 33554432)); (props. Put ("buffer.memory", 33554432)); / / the serializer props. The put (" key. The serializer ", " mon. Serialization. StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("dblab01", Integer.toString(i), Integer.toString(i))); producer.close(); } public static void main(String[] args) { new MyProducer().produce(); }}

Producer side code: First define the name of a topic, and then create a Properties instance to set the parameters of produce. Then create an instance of Producer and upload props as parameters. Define a key and data in the produce method, create a KeyedMessage instance, upload key,data and topic as parameters, and then upload the KeyedMessage instance to Producer. The produce() method of myProduce is called directly in the main function to implement message uploading.