I. Main points of this paper

In this article, we have put SpringBoot integration mybatis+Hikari+ ES + Redis, and through docker build kafka environment, this article will introduce SpringBoot how to integrate Kafka, using MQ technology, do peak filling valley, module decoupling. A complete catalog of articles in the series

  • Kafkalistener test

  • The message of decoupling

  • Message partitioning timing

  • Kafka springboot integration

  • springboot + mybatis + Hikari + elasticsearch + redis + kafka

Second, development environment

  • JDK 1.8
  • Maven 3.6.2
  • Springboot 2.4.3
  • Zookeeper 3.4.13
  • junit 5
  • Kafka, 2.12 to 2.3
  • idea 2020

Modify pom.xml to add dependencies

        <! -- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

Copy the code

3. Modify the configuration file

Modify the application-dev.properties file, and also modify the corresponding configuration file for later release to test or formal environments.

#################### KAFKA ####################
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=kafka-single-demo-group
# earliest: When there is a submitted offset under each partition, the money will be consumed from the submitted offset; If there is no submitted offset, the consumption starts from scratch
* latest: consumes the offset submitted from each partition if there is an offset submitted. If there is no committed offset, the newly generated data under the partition is consumed
# none: Consumption will start after offset if there is a committed offset in each topic partition. An exception is thrown whenever a partition does not have a committed offset
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.buffer-memory=524288
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.template.default-topic=kafka-single-demo-topic


Copy the code

Add constant classes

1) Add Const. Java.

public class Const {

    /** * redis key prefix. */
    public static final String MMC_MEMBER_KEY_PREFIX = "mmc:member";

    /** * kafka topic. */
    public static final String KAFKA_SINGLE_DEMO_TOPIC = "kafka_single_demo_topic";
}

Copy the code

2, write kafkasender. Java, used to send kafka messages.

@Slf4j
@Component
public class KafkaSender {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /** * Send a message to kafka */
    public void sendMessage(String topic, String message) {
        log.info("sendMessage to kafka ,topic =[{}],message=[{}]", topic, message);
        kafkaTemplate.send(topic, message);
    }

    /** * Send a message to kafka */
    public void sendMessage(String topic, String partionKey, String message) {
        log.info("sendMessage to kafka ,topic =[{}],partionKey=[{}],message=[{}]", topic, partionKey, message); kafkaTemplate.send(topic, partionKey, message); }}Copy the code

3. Write kafkaReceiver. Java to receive Kafka messages.

@Slf4j
@Component
public class KafkaReceiver {


    @KafkaListener(id = "kafka-single-demo", topics = Const.KAFKA_SINGLE_DEMO_TOPIC)
    public void receiveMesage(ConsumerRecord<String, String> record) {

        if (null == record || StringUtils.isEmpty(record.value())) {

            log.warn("KafkaReceiver record is null or record.value is empty.");
            return;
        }

        String reqJson = record.value();
        log.info("KafkaReceiver {}", reqJson); }}Copy the code

4. Modify MemberService. Java to trigger a change message to the downstream system every time the member message is modified.

@Slf4j
@Service
public class MemberService {

    @Resource
    private TblMemberInfoMapper tblMemberInfoMapper;

    @Resource
    private ElasticSearchConfig elasticSearchConfig;

    @Resource
    private RestHighLevelClient restHighLevelClient;

    @Resource(name = "esObjectMapper")
    private ObjectMapper objectMapper;
    
    @Resource
    private KafkaSender kafkaSender;

    // The cache is invalidated when the data is updated
    @CacheEvict(key = "#member.uid", cacheNames = {Const.MMC_MEMBER_KEY_PREFIX})
    public TblMemberInfo save(TblMemberInfo member) throws JsonProcessingException {

        tblMemberInfoMapper.upsert(member);

        // Send messages to the downstream system for decoupling
        // Use memberId as partition to ensure certain message timing
        kafkaSender.sendMessage(Const.KAFKA_SINGLE_DEMO_TOPIC,
                member.getUid() + "", objectMapper.writeValueAsString(member));
        
        returnmember; }}Copy the code

Six, run it

If you install Kafka and ZooKeeper, you can easily test Kafka’s usability.

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest
class KafkaSenderTest {


    @Resource
    private KafkaSender kafkaSender;

    @Test
    void sendMessage(a) throws IOException {

        String json = "hello";

        for (int i = 0; i < 10; i++) { kafkaSender.sendMessage(Const.KAFKA_SINGLE_DEMO_TOPIC, json); } System.in.read(); }}Copy the code

There seems to be a problem with testing kafka-related functionality that requires the startup of an external Kafka-server. If kafka-Server is not on the same network as your development machine, or if you have a network isolation policy, you can only start zooKeeper and Kafka-Server locally. Spring-kafka-test allows you to test kafka-related features without having to start an external Kafka-server, such as @kafkalistener.

SpringBoot batch Fetch Kafka messages (2)

3. How to write bulk consumption? SpringBoot batch Fetch Kafka messages (1)

Seven, summary

This is just a brief introduction to how to integrate Kafka. For more details, please check out the following article. SpringBoot integration with Dubbo

Add me to exchange learning!