What is Kafka?

Kafka is an open-source streaming platform developed by the Apache Software Foundation and written in Scala and Java. Kafka is a high-throughput distributed publish-and-subscribe messaging system that can process all the action streams of a consumer in a Web site. This action (web browsing, searching, and other user actions) is a key factor in many social functions on the modern web. This data is usually addressed by processing logs and log aggregations due to throughput requirements. This is a viable solution for logging data and offline analysis systems like Hadoop that require real-time processing limitations. The goal of Kafka is to unify online and offline message processing through Hadoop’s parallel loading mechanism, as well as to provide real-time messages through clustering.

Application scenarios

  • Messaging systems: Both Kafka and traditional messaging systems (also known as message-based middleware) have capabilities such as system decoupling, redundant storage, traffic peaking, buffering, asynchronous communication, extensibility, and recoverability. At the same time, Kafka provides assurance of message sequencing and the ability to backtrack consumption that is difficult to achieve in most messaging systems.
  • Storage system: Kafka persists messages to disk, reducing the risk of data loss compared to other memory-based systems. Thanks to Kafka’s message persistence and multi-copy mechanism, we can use Kafka as a long-term data storage system by setting the corresponding data retention policy to “permanent” or enabling logging compression for topics.
  • Streaming Platform: Not only does Kafka provide a reliable source of data for every popular streaming framework, but it also provides a complete library of streaming classes for operations such as window, join, transform, and aggregate.

Install the kafka

1. Make sure you have a virtual machine and that the virtual machine has a ZooKeeper registry. If not installed, refer to

2. Download the kafka

The installation of Kafka in this paper is based on Linux operating system and Windows system, you can refer to the resources to operate by yourself.

Official website download address:http://kafka.apache.org/downl…

3. Unzip

When the download is complete, you’ll find a tar file, execute the command, and unzip it.

The tar - ZXVF kafka_2. 12-2.1.0. TGZ

4. After the decompression is completed, enter the decompressed Kafka and use the editor to modify the config/server.properties file

vim server.properties

5. Change the monitoring IP to the IP address of your own virtual machine. The port is not 9092 by default

6. Change the IP address of ZooKeeper registry to the IP address of your own ZooKeeper. My ZooKeeper is a cluster, if it is a single machine, write an IP + port.

7. After making sure that ZooKeeper is started, start Kafka and execute the command in the directory of Kafka.

bin/kafka-server-start.sh config/server.properties &

This is preceded by a startup script that executes Kafka, followed by a specified configuration file that has just been modified.



After the above interface appears, it shows that the startup is successful, you can view the process through the JPS command.

8. Create Topic. Execute the command under Kafka directory to create topic.

./bin/ kafka-Topics. Sh --create -- ZooKeeper 127.0.0.1:2181 --replication-factor 1 --topic test

The command is preceded by the script to create the theme, followed by the IP address of ZooKeeper, followed by the partition and name of the theme.

The Created topic “test” description appears after executing the command, indicating that the creation succeeded.

9. After the creation is complete, execute the following command under the kafka directory to view the list of all the topics.

./bin/kafka-topics.sh --list --zookeeper localhost:2181



If you see the above interface, just created the theme name, it means that it has been created successfully.

10. Review the Topic details

The. / bin/kafka - switchable viewer. Sh -- -- zookeeper 192.168.126.128:2181192168 126.128:2182192168 126.128:2183 - go - the topic of the test

Here I am ZooKeeper cluster, if a single machine, write an IP can be.

11. Create Producers

./bin/kafka-console-producer.sh --broker -- list 192.168.126.128:9092 --topic test

If the following interface appears, and after typing enter, you can continue the input operation, it means that the producer has been created, and then you can create a consumer to consume the message.

12. Create a consumer

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic yzr --consumer-property group.id=group_test

13. Test production and consumers

Duplicate one of the virtual machine ports and open two terminals with one IP for production and consumption tests.

First the producer starts, then the consumer starts.

You can see the following diagram, showing that the consumer successfully consumed the producer message.



If something goes wrong in the test, first make sure:

  • Has ZooKeeper started successfully?
  • Does the configuration file in Kafka specify the IP of ZooKeeper?
  • Is a configuration file specified for Kafka startup?
  • Are the directories and IP addresses in the command correct?

SpringBoot integration kafka

1. Create SpringBoot project

2. Pom file

<! - the introduction of kafka dependence - > < the dependency > < groupId > org. Springframework. Kafka < / groupId > < artifactId > spring - kafka < / artifactId > </dependency> <! -- FastJSON Alibaba tool class, > <dependency> < grouppid >com.alibaba</ grouppid > <artifactId> fastJSON </artifactId> The < version > 1.2.72 < / version > < / dependency >

3. YML configuration file

Spring: kafka: bootstrap-servers: 192.168.126.128:9092 Producer: # Number of times messages are resent after an error. Retries: 0 # When several messages need to be sent to the same partition, the producer places them in the same batch. This parameter specifies the amount of memory, in bytes, that can be used by a batch. Batch-size: 16384 # Sets the size of the producer memory buffer. Buffer - Memory: 33554432 # Key - Serializer Org.apache.kafka.com mon. Serialization. StringSerializer # value way of serializing the value - serializer: Org.apache.kafka.com mon. Serialization. # StringSerializer acks = 0: producers in successful writing messages before will not wait for any response from the server. # acks=1: The producer receives a success response from the server as soon as the cluster leader receives the message. # acks=all: The producer receives a successful response from the server only when all the replicating nodes receive the message. Sacks: 1 consumer: # The Duration of the value is specified in the format 1S,1M,2H,5D auto-commit-interval: 1 consumer: # The Duration of the value is specified in the format 1S,1M,2H,5D auto-commit-interval: 1 consumer: # This property specifies what to do if the consumer reads a partition with no offset or if the offset is invalid: # latest (default) In the case of an invalid offset, the consumer reads the data from the most recent record (the record generated after the consumer started) # earliest: If the offset is not valid, the consumer will read the record of the partition from the start position: In order to avoid duplicate data and data loss, set it to false and then manually commit the offsets enable-auto-commit: False # key deserializer key deserializer Org.apache.kafka.com mon. Serialization. StringDeserializer # value way of deserialized value - deserializer: Org.apache.kafka.com mon. Serialization. StringDeserializer listener: # in a listener container number of threads running concurrency: 5 #listner is responsible for the ACK, and immediately commit the ACK-MODE: MANUAL_IMMEDIATE MISSING-TOPICS-FATAL: FALSE every time the call is made

Bootstrap-Servers specifies the IP of your Kafka server

3. Create a producer

@Component @Slf4j public class KafkaProducer { @Resource private KafkaTemplate<String, Object> kafkaTemplate; Public static final String TOPIC = "yzr"; public static final String TOPIC = "yzr"; Public void send(Object obj) {String jsonString = jsonObject.tojonstring (obj); public void send(Object obj) {String jsonObject.tojonstring (obj); Log.info (" Prepare to send a message as: {}", jsonString); // send message listenableFuture <SendResult<String, Object>> Future = KafateTemplate.send (Topic, obj); // return future.addCallback(new ListenableFutureCallback< sendResult <String, Public void onFailure(@NotNULL Throwable Throwable) {// Override public void onFailure(@NotNULL Throwable Throwable) {// Override public void onFailure(@NotNULL Throwable Throwable) { TOPIC, throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object > stringObjectSendResult) {/ / send a successful deal with the info (" {} - the producer sends a message success: {} ", TOPIC, stringObjectSendResult. The toString ()); }}); }}

4. Create a customer

@Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = "yzr", groupId = "A") public void topicTest(ConsumerRecord<? ,? > record, Acknowledgment ack) { Optional<? > message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); Log.info ("topic_test consumed: Topic:" + ",Message:" + MSG); ack.acknowledge(); } } @KafkaListener(topics = "yzr", groupId = "B") public void topicTest2(ConsumerRecord<? ,? > record, Acknowledgment ack) { Optional<? > message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); Log.info (" Topic_Test2 consumes: Topic:" + ",Message:" + MSG); ack.acknowledge(); }}}

5. Create control layer and send message.

@RestController public class KafkaController { private final KafkaProducer kafkaProducer; @Autowired public KafkaController(KafkaProducer kafkaProducer) { this.kafkaProducer = kafkaProducer; } /** * Send Message ** @Param Message */ @GetMapping("/kafka/{message}") public void SendMessage (@PathVariable("message")) String message) {// send message kafkaproducer.send (message); }}

There is no end to learning.