“This is my 38th day of participating in the First Challenge 2022. For details: First Challenge 2022”

Kafka installation tutorial: juejin.cn/post/700462…

Springboot integration Kafka: juejin.cn/post/700543…

Kafka column: juejin.cn/column/6996…

This article project code gitee address: gitee.com/wei_rong_xi…

The previous article, we use the JDK’s own queue, implements the service throughput increases, but we know that the JDK queue based on memory, namely when the request quantity is very big, a large number of requests are cached in memory, is very big still, to the requirement of memory is not very suitable for concurrency big business scenario. Especially in the scenario of e-commerce, peak clipping and decoupling of message queues can improve the throughput of the system and ensure stability. So next, we continued to improve the system, introducing Kafka, to further improve the stability.

For details on kafka installation, introduction, and integration, please refer to the links at the beginning of this article.

Introduce Kafka

Introducing dependencies:

<! -- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.10. RELEASE</version>
</dependency>
Copy the code

Add configuration:

spring:
  kafka:
    bootstrap-servers: 172.163.29.: 9092
    producer:
      # Number of times the message was resent after an error occurred.
      retries: 0
      When multiple messages need to be sent to the same partition, the producer will put them in the same batch. This parameter specifies the amount of memory a batch can use, in bytes.
      # batch-size: 16384
      Set the size of the producer memory buffer.
      # buffer-memory: 33554432
      # # key serialization
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # # serialization of values
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # # acks=0: The producer does not wait for any response from the server before successfully writing the message.
    # # acks=1: As soon as the cluster leader receives the message, the producer receives a success response from the server.
    # # acks=all: The producer will receive a successful response from the server only when all participating nodes have received the message.
    # acks: 1
    consumer:
      group-id: test
      In Spring Boot 2.X, the value is of type Duration and must conform to specific formats, such as 1S,1M,2H,5D
      # auto-commit-interval: 1S
      This attribute specifies what to do if the consumer reads a partition without an offset or if the offset is invalid:
      # # latest (default) In the case of invalid offset, the consumer will start reading data from the latest record (records generated after consumer startup)
      # # EARLIEST: In case the offset is invalid, the consumer will read the partition's record from the starting position
      # auto-offset-reset: earliest
      The default value is true. To avoid duplicate data and data loss, set it to false and commit the offset manually
      # enable-auto-commit: false
      Deserialization of # # key
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      Deserialization of # # values
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 150
    listener:
      # # Number of threads running in the listener container.
      # concurrency: 5
      # # Listner is responsible for ack, and every time it is called, commit immediately
      # ack-mode: manual_immediate
      missing-topics-fatal: false
Copy the code

Second, test service transformation

2.1 Message Producers

Provides a simple producer utility class with a simple method for sending messages, taking topic and message.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/** * Kafka producer **@author weirx
 * @date 2021/02/03 14:22
 **/
@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /** * Kafka message send *@param
     * @author weirx
     * @return void
     * @date: 2021/2/3 * /
    public void send(String topic,String message){ kafkaTemplate.send(topic,message); }}Copy the code

2.2 Transformation of single interface

KafkaProducer is introduced to replace HTTP requests with Kafka push when placing an order. The pseudo-code is as follows:

@Autowired
private KafkaProducer kafkaProducer;


kafkaProducer.send("rob-necessities-order",JSONObject.toJSONString(map));
Copy the code

2.3 Transformation of payment interface

In the previous article, the payment interface was called as a callback interface to the order interface, which is actually a fully synchronous interface prone to problems such as blocking, request failure, timeout, etc.

So here we also change it to Kafka asynchronous consumption. The consumer utility class is as follows:

@Slf4j
@Component
public class KafkaConsumer {

    @Autowired
    private TradingServiceImpl tradingService;

    @KafkaListener(topics = {"rob-necessities-trading"})
    public void consumer(ConsumerRecord
        record) { Optional<? > kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            String orderId = message.toString();
            log.info("Payment start time *********************** : {}, order ID: {}", LocalDateTime.now(), orderId);
            tradingService.pay(Long.valueOf(orderId));
            log.info("Payment completion time / / / / / / / / / / / / / / / / / / / / / / / / / : {}, order id: {}", LocalDateTime.now(), orderId); }}}Copy the code

3. Order service transformation

2.1 Payment callback transformation

Now we need to provide a Kafka message producer to send the message to the test service:

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /** * Kafka message send *@param
     * @author weirx
     * @return void
     * @date: 2021/2/3 * /
    public void send(String topic,String message){ kafkaTemplate.send(topic,message); }}Copy the code

2.2 Transformation of single interface

Instead of waiting for an HTTP request to be invoked, the interface is now listening to Kafka, providing consumers with the following:

@Slf4j
@Component
public class KafkaConsumer {

    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = {"rob-necessities-order"})
    public void consumer(ConsumerRecord
        record) { Optional<? > kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("----------------- record =" + record);
            log.info("------------------ message ="+ message); JSONObject jsonObject = JSONObject.parseObject(message.toString()); OrderDTO orderDTO = jsonObject.toJavaObject(OrderDTO.class); orderService.saveOrder(orderDTO); }}}Copy the code

Since we have used Kafka as a component to buffer concurrent traffic, we no longer need the queue we added ourselves, so the modified order interface looks like this:

@Autowired
private KafkaProducer kafkaProducer;

@Override
public Result saveOrder(OrderDTO orderDTO) {

    // Place an order
    Result result = this.saveOrderImpl(orderDTO);
    String orderId = JSONObject.parseObject(JSONObject.toJSONString(result.getData())).getString("id");
    kafkaProducer.send("rob-necessities-trading", orderId);
    return Result.success("Order successful");
}
Copy the code

As shown above, the specific order business logic has not changed.

Four, test,

Kafka is single-node, on another server, because I run out of memory locally.

The total completion time is about 23 seconds, which is a slight increase in time, but the overall throughput is definitely not the same order of magnitude as before.

In addition, we can also call the payment interface in kafka form, but this article does not change.


This article project code gitee address: gitee.com/wei_rong_xi…