The foreword 0.

In this article we mainly talk about the installation and use of Kafka, Kafka introduction and functions will not be introduced in detail, to recommend an article, you can understand: blog.csdn.net/weixin_4223…

1. Download the kafka

  • Kafka’s official website: kafka.apache.org/
  • Download address: archive.apache.org/dist/kafka/…

Because foreign web site, download speed is slow, it is recommended that you use mirror download address: www.apache.org/dyn/closer….

Image download links: mirrors.tuna.tsinghua.edu.cn/apache/kafk…

2. Install Kafka in Linux

2.1 Transferring Kafka to a cloud server/VM using Xftp

/usr/local/src/software

2.2 Using Xshell to Connect to cloud Servers or VMS

Go to the folder where the Kafka package is stored and decompress the package: tar -zxvf kafka_2.13-2.1.1.tgz

Then the CD command goes to the decompressed folder:

  • The bin directory stores executable commands, such as start or close commands
  • The config directory stores related configuration files
  • The libs directory houses some dependency packages

Go to the bin directory to view:

2.3 Modify related configurations in the Config directory

zookeeper.properties

  • After the modification is complete,wqSave and exit!

server.properties

Next, modify the server.properties configuration file, and also modify the vim server.properties configuration file in the config directory

Dir in vim to quickly locate the location to be modified:

Let’s change it to :(data path is still the path we copied)

# A comma separated list of directories under which to store log files
log.dirs=/ usr/local/SRC/software/kafka_2. 13-2.5.1 / data/kafka - logs
Copy the code

Wq save and exit!

3. Start the Kafka

3.1 Starting Zookeeper and Then Kafka

Since Kafka relies on Zookeeper, start Zookeeper first!

  • Go to the bin directory and run the./zookeeper-server-start.sh.. / config/zookeeper. The properties command to specify the configuration file to start the zookeeper service, daemon startup mode, at the end add a &;

    • zookeeperAfter the service starts, open a new command window and execute./kafka-server-start.sh .. /config/server.propertiesCommand, startKafkaService, daemon startup, add one at the end&Can;
    • Then you can see the Kafka and ZooKeeper startup log files in the newly created data directory

Note: If you are using a cloud server, open the security group port in advance:

Above 2189 and 9092 are the open ports of my cloud server!

If you are using a VIRTUAL machine, please do not forget the firewall permit port!

3.2 Disabling ZooKeeper and Kafka

  • inbinClose directoryzookeeper ./zookeeper-server-stop.sh
  • Shut downkafka ./kafka-server-stop.sh

4. The use of Kafka

Here we use the Kafka message Subscribe (Topics) feature as an example to start the related file

4.1 Creating a Theme

Go to the bin directory, Sh –create –bootstrap-server XXX.XXXX. XXX :9092 –replication-factor 1 — Partitions 1 –topic test Command to create a theme

4.2 Viewing the List of All Created Topics

Sh –list –bootstrap-server XXX.XXXX. XXX :9092./kafka-topics

4.3 Producer sends messages to a topic in Kafka

Sh –broker-list XXX.XXXX. XXX :9092 –topic test./kafka-console-producer.sh –broker-list XXX.XXXX

4.4 Consumer receives messages from a topic in Kafka

Sh –bootstrap-server 8.131.66.136:9092 –topic test –from-beginning./kafka-console-consumer.sh –bootstrap-server 8.131.66.136:9092 –topic test –from-beginning

When we produce another haha message from the producer, the consumer continues to read the message!

Reference article: blog.csdn.net/qq_29308413…

The broker is trying to join The wrong cluster. Configured Zookeeper. connect may be wrong. www.cnblogs.com/qi-yuan-008…

5. SpringBoot integrates Kafka

Create a New Spring Boot project:

5.1 Importing Dependencies

Kafka dependencies are introduced in pom.xml:

<! -- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.7. RELEASE</version>
</dependency>
Copy the code

5.2 configuration Kafka

# Spring related configuration
spring:
  # kafka configuration
  kafka:
    Server address and its Kafka port
    bootstrap-servers: 8.13166.136.: 9092
    # Consumer Configuration
    consumer:
      The id of the consumer group can be configured in the consumer.properties directory of kafka config
      group-id: test-consumer-group
      Whether to automatically submit the consumer's offset
      enable-auto-commit: true
      # Auto-submit frequency 3s
      auto-commit-interval: 3000
Copy the code

5.3 Test Use

producers

/** * the producer */
@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;/ / injection kafkaTemplate

    /** * Send message **@paramTopic Message topic *@paramContent Message content */
    public void sendMessage(String topic, String content) { kafkaTemplate.send(topic, content); }}Copy the code

consumers

/** * consumer */
@Component
class KafkaConsumer {

    /** * consumers subscribe to the subject of test * is passed@KafkaListener(topics = {"test"}) Annotation implementation * *@paramThe message received by Record is encapsulated as the ConsumerRecord object */
    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) { System.out.println(record.value()); }}Copy the code

The Test Test

@SpringBootTest
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;// Inject producer kafkaProducer

    @Test
    public void testKafka(a) {
        // The producer sends the message
        kafkaProducer.sendMessage("test"."Hello, Hello!");
        kafkaProducer.sendMessage("test"."Are you there?);

        // Perform thread blocking here, mimicking the process by which the consumer consumes the message
        try {
            Thread.sleep(1000 * 3);// 10s
        } catch(InterruptedException e) { e.printStackTrace(); }}}Copy the code

Test results:

“UnknownHostException” was reported for remote Kafka

This question can be solved by referring to this article: blog.csdn.net/chang_harry…