This article shares how to deploy a Kafka cluster using Kraft and how to implement Kafka responsive interaction in Spring.

KRaft

We know that Kafka uses ZooKeeper to store metadata such as brokers and Consumer groups for Kafka, and uses ZooKeeper to select brokers and other operations. While using ZooKeeper simplifies Kafka’s job, it also makes the deployment and operation of Kafka more complex.

Kafka 2.8.0 began removing ZooKeeper and replacing it with an internal Kafka Quorum controller, officially called the “Kafka Raft Metadata Mode.” From this point on, users can deploy Kafka clusters without the need for ZooKeeper, making Fafka simpler and lightweight. With Kraft mode, users only need to focus on maintaining the Kafka cluster.

Note: Due to the major changes to this feature, the Kraft mode currently available in Kafka2.8 is a test version and is not recommended for production use. It is believed that a later version of Kafka will soon be available in a production version of Kraft.

Here’s how to deploy a Kafka cluster using Kafka. Three Kafka nodes are deployed using three machines, using version 2.8.0 of Kafka.

1. Generate ClusterID and configuration files. (1) ClusterID was generated by kafka-storage.sh.

$ ./bin/kafka-storage.sh random-uuid
dPqzXBF9R62RFACGSg5c-Q

(2) ClusterID was used to generate configuration files

$ ./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
Formatting /tmp/kraft-combined-logs

Note: You only need to generate one ClusterID and use that ClusterID to generate configuration files on all machines, that is, all nodes in the cluster need to use the same ClusterID.

The configuration file generated by the script can only be used for a single Kafka node. If you are deploying a Kafka cluster, you will need to make some changes to the configuration file.

(1) to modify the config/kraft/server properties (using the configuration later start kafka)

Process. The roles = broker, the controller node. Id = 1 listeners = PLAINTEXT: / / 172.17.0.2:9092, controller: / / 172.17.0.2:9093 Advertised. Listeners = PLAINTEXT: / / 172.17.0.2:9092 Controller. The quorum. Voters = @172.17.0.2:1 9093, 2 @ 172.17.0.3:9093, 3 @ 172.17.0.4:9093

Process. Roles specifies the node role and has the following values

  • Broker: This machine will only act as a broker
  • Controller: Act as the controller node for Raft Quorum
  • Broker, Controller: Includes the functions of both

Different nodes in a cluster need different Node. ids. Controller.quorum.voters will need to configure all controller nodes in the cluster in the format

@< IP >: .

/ TMP/kafka-storage-logs / / kafka-storage-logs / / kafka-storage-logs / / / kafka-storage-logs / / We also need the Node. id in the/TMP/kraft-combination-logs /meta.properties configuration to remain with the server.properties configuration.

node.id=1

Start the Kafka node using the kafka-server-start.sh script

$ ./bin/kafka-server-start.sh ./config/kraft/server.properties

Let’s test the Kafka cluster 1. Create a theme

$ ./bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --bootstrap-server 172.17.0.2: $9092172.17. 0.3:9092172.17. 0.4:9092 - topic topic1

2. Production news

$. / bin/kafka - the console - producer. Sh - broker - list 172.17.0.2: $9092172.17. 0.3:9092172.17. 0.4:9092 - topic topic1

3. Consuming news

$. / bin/kafka - the console - consumer. Sh - the bootstrap - server 172.17.0.2: $9092172.17. 0.3:9092172.17. 0.4:9092 - topic topic1 --from-beginning

The use of this part of the command is consistent with the lower version of Kafka.

Kafka’s functionality is not yet complete, but this shows a simple deployment example. Kafka documents: https://github.com/apache/kaf…

Spring can use Spring-Kafka, Spring-Cloud-Stream two frameworks to achieve Kafka responsive interaction. Let’s take a look at each of these frameworks in action.

Spring-Kafka

Add a Spring-Kafka reference

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> < version > 2.5.8. RELEASE < / version > < / dependency >

2. Prepare the configuration file as follows

Spring. Kafka. Producer. The bootstrap - the servers = 172.17.0.2: $9092172.17. 0.3:9092172.17. 0.4:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer Spring. Kafka. Consumer. The bootstrap - the servers = 172.17.0.2: $9092172.17. 0.3:9092172.17. 0.4:9092 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.group-id=warehouse-consumers spring.kafka.consumer.properties.spring.json.trusted.packages=*

It’s the configuration of producers and consumers. It’s very simple.

3. Send the message in the Spring – Kakfa ReactiveKafkaProducerTemplate sending messages may be used. First, we need to create a ReactiveKafkaProducerTemplate instance. (currently SpringBoot automatically creates KafkaTemplate instance, but does not create ReactiveKafkaProducerTemplate instances).

@Configuration public class KafkaConfig { @Autowired private KafkaProperties properties; @Bean public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() { SenderOptions options = SenderOptions.create(properties.getProducer().buildProperties()); ReactiveKafkaProducerTemplate template = new ReactiveKafkaProducerTemplate(options); return template; }}

The KafkaProperties instance is automatically created by SpringBoot and reads the corresponding configuration from the above configuration file.

Next, you can use the ReactiveKafkaProducerTemplate sending messages

@Autowired private ReactiveKafkaProducerTemplate template; public static final String WAREHOUSE_TOPIC = "warehouse"; public Mono<Boolean> add(Warehouse warehouse) { Mono<SenderResult<Void>> resultMono = template.send(WAREHOUSE_TOPIC, warehouse.getId(), warehouse); return resultMono.flatMap(rs -> { if(rs.exception() ! = null) { logger.error("send kafka error", rs.exception()); return Mono.just(false); } return Mono.just(true); }); }

ReactiveKafkaProducerTemplate# the send method returns a Mono (this is the Spring of the Reactor core object), carried SenderResult in Mono, The RecordMetadata and Exception in SenderResult store the metadata of the record (including offset, timestamp, etc.) and the exception of the sending operation.

4. The news Spring – Kafka use ReactiveKafkaConsumerTemplate consumption.

@Service public class WarehouseConsumer { @Autowired private KafkaProperties properties; @PostConstruct public void consumer() { ReceiverOptions<Long, Warehouse> options = ReceiverOptions.create(properties.getConsumer().buildProperties()); options = options.subscription(Collections.singleton(WarehouseService.WAREHOUSE_TOPIC)); new ReactiveKafkaConsumerTemplate(options) .receiveAutoAck() .subscribe(record -> { logger.info("Warehouse Record:" + record); }); }}

This is different from the previous message listener implemented using the @Kafkalistener annotation, but it is also very simple and consists of two steps: (1) ReceiverOptions# subscription methods ReceiverOptions associated to create ReactiveKafkaConsumerTemplate kafka theme (2), and register the subscribe the callback function of consumption. Tip: The receiveAutoack method automatically submits the consumption group offset.

Spring-Cloud-Stream

Spring-cloud-stream is a framework provided by Spring for building message-driven microservices. It provides a flexible, unified programming model for different messaging middleware products, which can mask the differences between the underlying message components. It currently supports RabbitMQ, Kafka, RocketMQ and other message components.

Here is a brief example of implementing a Kafka responsive interaction in Spring-Cloud-Stream without going into the application of Spring-Cloud-Stream.

1. Introduce the spring-cloud-starter-stream-kafka reference

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

2. Add configuration

Spring. Cloud. Stream. Kafka. Binder. The brokers = 172.17.0.2: $9092172.17. 0.3:9092172.17. 0.4:9092 spring.cloud.stream.bindings.warehouse2-out-0.contentType=application/json Spring. Cloud. Stream. Bindings. Warehouse2 - out - 0. Destination = # warehouse2 message format Spring. Cloud. Stream. Bindings. Warehouse3 - in - 0. ContentType = # application/json message destination, Can be understood as Kafka theme spring. Cloud. Stream. Bindings. Warehouse3 - in - 0. Destination = warehouse2 # define consumer groups, Can be understood as Kafka consumer group spring. Cloud. Stream. Bindings. Warehouse3 - in - 0. Group = warehouse2 - consumers # mapping method name spring.cloud.function.definition=warehouse2; warehouse3

After version 3.1 of Spring-Cloud-Stream, StreamAPI annotations such as @EnableBinding and @Output are marked as deactivated and provide a more concise functional programming model. After this version, the user does not need to use annotations, just specify the methods that need to be bound in the configuration file. Spring-cloud-stream will bind these methods to the underlying message component for the user, and the user can directly call these methods to send messages. Or Spring-cloud-stream calls these methods to consume a message when it is received.


+ -out- +

input (consume message)

+ -out- +

input (consume message)

+ -in- +

For a typical single input/output function, the index is always 0, so it is only relevant to functions with multiple input and output parameters. Spring-cloud-stream supports functions with multiple inputs (function arguments)/ outputs (function return values).

. Spring. Cloud. The function definition configuration specified need binding method name, don’t add the configuration, spring – cloud – the Stream will automatically try to bind the return type is: Supplier/function/method of Consumer, But using this configuration can avoid Spring-cloud-stream binding confusion.

The user can write a method with the return type Supplier and send the message periodically

@PollableBean public Supplier<Flux<Warehouse>> warehouse2() { Warehouse warehouse = new Warehouse(); warehouse.setId(333L); Warehouse. setName(" The best warehouse in the world "); Warehouse. setLabel(" Level 1 "); logger.info("Supplier Add : {}", warehouse); return () -> Flux.just(warehouse); }

Once the method is defined, the spring-cloud-stream calls the method once a second, generating the Warehouse instance and sending it to Kafka. (here the method name warehouse3 already configured in the spring. The cloud. The function. The definition.)

The StreamBridge interface can be used in situations where an application does not need to send a message on a regular basis, but instead is triggered by a business scenario, such as a REST interface

    @Autowired
    private StreamBridge streamBridge;

    public boolean add2(Warehouse warehouse) {
        return streamBridge.send("warehouse2-out-0", warehouse);
    }

How StreamBridge implements reactive interaction is not yet discovered.

4. Consuming Messages To consume messages, an application simply defines a method whose return type is Function/Consumer. The following

    @Bean
    public Function<Flux<Warehouse>, Mono<Void>> warehouse3() {
        Logger logger = LoggerFactory.getLogger("WarehouseFunction");
        return flux -> flux.doOnNext(data -> {
            logger.info("Warehouse Data: {}", data);
        }).then();
    }

Note: The method name and < functionName > + – out – + < index > / < functionName > + – in – + < index >, spring. Cloud. The function. The definition of configuration need to keep consistent and to avoid mistakes.

SpringCloudStream document: https://docs.spring.io/spring…

The complete code: https://gitee.com/binecy/bin-…

If you think this article is good, welcome to pay attention to my WeChat public number, series of articles continue to update. Your attention is the power I insist!