In the previous chapter we looked at Retries, Timeouts, delays and dead-letter queues for SpringBoot RabbitMQ messages, referencing a number of Rabbit-specific codes at the code level, such as: RabbitTemplate. ConvertAndSend (), @ RabbitListener (the queues = “XXX”), and so on, are very simple code looks everything is reasonable, but vaguely felt code by the invasion.

Businesses are becoming more and more dependent on AND demanding MQ. For example, sequential consumption, transaction messages, backtracking consumption, etc., also have higher performance requirements. More and more trends remind us of better MQ solutions.

If we were to change MQ from Rabbit to Rocket on the agenda, it would be a huge undertaking. Many of the services used to have RabbitMQ signature code, and replacing the equivalent of all the service code would require significant operational risk, requiring significant development and testing resources.

Back to rabbitMQ, can we hide the feature code as much as possible when we first use it, leaving open the possibility of future upgrades and replacements?

At this point, you need to use Spring Cloud Stream, a child component of Spring Cloud. It is a framework for building message-driven microservices that provide a set of standards for message subscription consumption to integrate messaging middleware from different vendors. Currently, Kafka and RabbitMQ integration is officially available, and Alibaba also implements RocketMQ integration.

Introduction to Spring Cloud Stream

Spring Cloud Stream applications consist of third-party middleware. Communication between applications is accomplished through input channels and output channels. These channels are injected by Spring Cloud Stream. The connections between channels and external agents are implemented with binders.

RabbitMQ integration

1. Introduce the package

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code

2. Set the message input and output channels

public interface Source {
    String OUTPUT = "myOutput";

    @Output(OUTPUT)
    MessageChannel message(a);
}

public interface Sink {
    String INPUT = "myInput";

    @Input(INPUT)
    SubscribableChannel sub1(a);
}

Copy the code

The output channel is the sender of the message, and the input channel is the receiver of the message

MyOutput, myInput is the channel name. Later, configure the feature through the configuration file. Remember that the binding of the two channels should be defined separately; otherwise, inexplicable errors may occur

3. Configure message features

spring
  cloud:
    stream:
      bindings:
        myOutput:
          destination: login-user
        myInput: The channel name corresponds to the consumption listening group in the code
          destination: login-user # exchange
          group: logined-member   # consumer groups

      rabbit:
        bindings:
          myOutput:
            producer:
              routing-key-expression: headers.routingKey   Route key of the sender
              delayed-exchange: true    # Enable delay queue

          myInput:
            consumer:
              binding-routing-key: login.user.succeed   Consume the listening route expression
              delayed-exchange: true    # Enable delay queue
              auto-bind-dlq: true   Bind a dead-letter queue
              republish-to-dlq: true  Recast to dead-letter queue with error message

Copy the code

1) destinationThe subject name of the message

Used in Rabbit to define exchanges and become part of a queue

2) groupConsumer groups

  • When no consumer group is defined, a message is consumed simultaneously if multiple instances are started

  • After a consumption group is defined, multiple instances share a queue and load consumption. You can see that in the picturequeuecalleddestination.groupcomposition

  • binding-routing-key: consumes route listening expression
  • delayed-exchange: Enables the delay queue
  • auto-bind-dlq: Enables the dead letter queue
  • republish-to-dlq: This setting allows dead letter messages to report errors

4. Realization of message sending and receiving

Send a message

@Autowired
private Source source;

@GetMapping("/")
public void sendSucceed(a) {
    source.message().send(MessageBuilder.withPayload("Hello World...")
            .setHeader("routingKey"."login.user.succeed")
            .setHeader("version"."1.0")
            .setHeader("x-delay".5000)
            .build());
}
Copy the code

Here you can set different headers for messages to perform different functions, and each of these MQ has different features, depending on the situation

Receives the message

StreamListener(value = sink. MY_INPUT_1, condition = "headers['version']=='1.0'")
public void receiveSucceed_v1(@Payload String message) {
    String msg = "StreamReceiver v1: " + message;
    log.error(msg);
}
Copy the code

5. Bind the message channel

@EnableBinding(value = {Source.class, Sink.class})
@SpringBootApplication
public class RabbitApplication {
    public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); }}Copy the code

These five steps will allow you to send and receive messages normally and you will find that the code is abstract and does not have any rabbitMQ features, except for the introduction of different package and message features

RocketMQ integration

RocketMQ can be replaced with RocketMQ (except for some features) by modifying the import package and feature configuration according to the RabbitMQ code

1. Introduce the package

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
Copy the code

2. Configure message features

spring
  cloud:
    stream:
      bindings:
        myOutput:
          destination: login-user
          content-type: application/json

        myInput: The channel name corresponds to the consumption listening group in the code
          destination: login-user # exchange
          group: logined-member   # Consumer group, same group load consumption

      rocketmq:
        binder:
          name-server: 127.0. 01.: 9876

Copy the code

Kafka integration

1. Introduce the package

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Copy the code

2. Configure message features

spring
    cloud:
    stream:
      bindings:
        myOutput:
          destination: login-user
          content-type: application/json

        myInput: The channel name corresponds to the consumption listening group in the code
          destination: login-user # exchange
          group: logined-member   # Consumer group, same group load consumption


      kafka:
        binder:
          brokers: localhost:9092         Kafka's message-oriented middleware server
          auto-create-topics: true

Copy the code

Five, the summary

As you can see from the three simple examples above, Spring Cloud Stream abstracts message subscription and consumption with a single set of code to support multiple messaging middleware. At the same time, it can also be very simple to achieve a variety of message-oriented middleware mixing, greatly expanding the gameplay of message-oriented middleware.

It is also recommended to use the Spring Cloud Stream component for message subscription and consumption, providing a high degree of decoupling to the middleware, unless there is no specific feature scenario to implement.

Vi. Source code

The code in this article due to the length of reasons have some omission is not complete logic, if interested please Fork source code gitee.com/hypier/barr…

Seven, please pay attention to my public number

Barry's fantasy world