why

In the previous discussion of RocketMQ installation, we forgot to mention some important issues, namely:

  • Why message queues
  • What are message queues for?
  • What are the benefits of using message queues?

Before we talk about why, let’s look at the message model, “What is it?” , we refer to RocketMQ’s message model:

RocketMQ consists of a Producer, a Broker, and a Consumer. A Producer produces messages, a Consumer consumes messages, and a Broker stores messages. Brokers correspond to one server during actual deployment. Each Broker can store messages from multiple topics, and messages from each Topic can be fragmented among different brokers. Message Queues are used to store the physical addresses of messages, and Message addresses within each Topic are stored in multiple Message queues. The ConsumerGroup consists of multiple Consumer instances.

As can be seen, there are mainly Producer, Broker and Consumer. With such a model, we can generally achieve the following applications:

  • asynchronous
  • The decoupling
  • Peak peel

asynchronous

Asynchronous execution is theoretically faster and more efficient than synchronous. The main thread executes its logic and then sends a message to the message queue. Another asynchronous thread subscribers and consumes the message.

The decoupling

In fact, asynchronous decoupling is very close. If you change a business from synchronous to asynchronous, it is actually decoupled from the business. Formally, whether you decouple multiple microservices or multiple threads, it is decoupled. For an example of common and asynchronous scenario, such as electricity users order after purchase, increase consumption integral scene: order service after the main business logic to send a message queue increase spending, downstream marketing service to subscribe to it the message consumption, increase integral logic asynchronously execution.

Of course, after decoupling, whether the inter-system call relationship is synchronous or asynchronous on a large business is entirely determined by the business itself.

Peak peel

In plain English, it means processing information according to the system’s processing capacity. Before we did not use the message queue, system should receive many requests are processed response back, the ability of processing is based on the processing capacity of the single machine or cluster, of course, there is a limit, although can be extended, but scalable granularity is more coarse: scale out or scale up, or increase the machine, or expand the single machine performance. If I just have a problem with one type of request and nothing else, this kind of granularity is not appropriate.

With message queues, we can achieve flexible scaling, decoupling and abstracting more fine-grained requests into messages and sending them to the message queue for downstream services to consume as they can. There is also the issue of “flow control”, which means that as well as controlling the flow of water, we also control the flow of messages based on the processing capacity of the upstream and downstream systems and message queues. Take RocketMQ for example:

  • Producer flow control as broker processing capacity reaches a bottleneck
  • Consumer flow control, as consumption capacity reached a bottleneck.

The so-called peak cutting and valley filling is about a balance, which is the balance between the processing capacity of the system and message queue and message flow. Hold it down at the peak and fill it up at the trough, that’s all.

Spring Cloud Stream

After talking about why, let’s get back to the topic of this article. Since it is the integration of SpringCloud, let’s talk about SpringCloud first.

The Spring Cloud architecture itself has a message-driven microservices framework called Spring Cloud Stream.

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

Spring Cloud Stream provides a unified abstraction of message-oriented middleware configuration and introduces unified concepts such as publish-subscribe, consumer Groups and partition, which effectively simplifies the complexity of MQ used by upper level developers. Let developers focus more on the core business.

What problems does Spring Cloud Stream solve?

  • Use message middleware without awareness

Stream solves the problem of developers being unaware of message-oriented middleware because Stream’s further encapsulation of message-oriented middleware makes it code-level unaware of the middleware.

  • High decoupling of middleware and services

Spring Cloud Stream has configuration isolation, only configuration adjustment is required, and middleware can be switched dynamically during development (such as RabbitMQ switching to Kafka), making microservice development highly decoudecoued, and services can focus on more of their own business processes.

Application model

Spring Cloud Stream consists of a neutral middleware kernel. Spring Cloud Stream injects input and output channels through which the application communicates with the outside world, and channels are connected to external brokers through an explicit middleware Binder.

Binder implementations are listed below:

  • RabbitMQ
  • Apache Kafka
  • Amazon Kinesis
  • Google PubSub (partner maintained)
  • Solace PubSub+ (partner maintained)
  • Azure Event Hubs (partner maintained)
  • Apache RocketMQ (partner maintained)

Another concept is Binding. Binding provides a bridge between the message-oriented middleware and the providers and consumers provided by the application, enabling developers to produce or consume data using only the providers or consumers of the application. The developer is shielded from the underlying messaging middleware. Binding includes Input Binding and Output Binding.

Note that input and output are used from the perspective of producers and consumers, not from the perspective of brokers. If you produce messages, output should be used, and if you consume messages, input should be used.

Access RocketMQ

Rely on

Using the combination of SpringCloud SpringBoot SpringCloud Alibaba, version information is as follows:

  <spring.boot.version>2.3.2. RELEASE</spring.boot.version>
  <spring.cloud.version>Hoxton.SR9</spring.cloud.version>
  <spring.cloud.alibaba.version>2.2.6. RELEASE</spring.cloud.alibaba.version>
Copy the code

Depend on the package

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Copy the code

configuration

spring:
  mvc:
    throw-exception-if-no-handler-found: true # Handle 404 problems
  resources:
    add-mappings: false # Disable 404 resource mapping
  application:
    name: mq-example
  cloud:
    stream:
      bindings:
        # define a binding whose name is input
        input:
          content-type: application/json
          destination: test-topic
          group: test-group
        Binding for output
        output:
          content-type: application/json
          destination: test-topic
      rocketmq:
        binder:
          Configure rocketMQ nameserver address
          name-server: 127.0. 01.: 9876

Copy the code

Production & Consumption

Producers of the controller

 @GetMapping("/produce")
    public void produceMsg(a) {

        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "test");
        Message message = MessageBuilder.createMessage("Hello RocketMQ!".new MessageHeaders(headers));
        output.send(message);
        System.out.println("Message sent"+message);

    }
Copy the code

Consumer subscription

@Service
public class ReceiveService {

    /** * Subscribe message *@param receiveMsg
     */
    @StreamListener("input")
    public void receiveInput1(String receiveMsg) {
        System.out.println("Input receive:"+ receiveMsg); }}Copy the code

test

Start RocketMQ(Nameserver and Broker), then start the service, call the Controller interface to send messages, and view the contents of the received messages.

You can also view the receiving status of messages using the dashboard.

reference

  • Spring. IO/projects/sp…
  • Github.com/alibaba/spr…
  • Docs. Spring. IO/spring – clou…
  • www.cnblogs.com/binyue/p/12…