This article explores how to subscribe and publish Spring Cloud application messages using the RocketMQ Binder.

introduce

RocketMQ is an open source distributed messaging system that provides low-latency, highly reliable message publishing and subscription services based on highly available distributed clustering technology. Including asynchronous communication decoupling, enterprise solutions, financial payment, telecommunications, e-commerce, express logistics, advertising and marketing, social, instant messaging, mobile applications, mobile games, video, Internet of Things, Internet of vehicles, etc.

RocketMQ is a distributed messaging middleware open-source by Alibaba in 2012. RocketMQ has been donated to the Apache Software Foundation and became an Apache Top Level project on September 25, 2017. As a domestic middleware that has experienced the baptism of “super project” of Alibaba Double 11 for many times and has stable and excellent performance, it has been used by more and more domestic enterprises in recent years with its characteristics of high performance, low latency and high reliability.

RocketMQ characteristics

  • It is a message middleware of queue model, which has the characteristics of high performance, high reliability, high real-time and distributed
  • Producers, consumers, and queues can all be distributed
  • The Producer sends messages in turn to a set of queues called topics. If the Producer does broadcast consumption, a Consumer instance consumes all the queues corresponding to the Topic. If the Producer does cluster consumption, the Producer sends messages to a group of queues called topics. Multiple Consumer instances average the collection of queues corresponding to this Topic
  • Can ensure strict message order
  • Support for pull and push message modes
  • Effective subscriber level expansion capabilities
  • Real-time message subscription mechanism
  • Hundred million message accumulation capability
  • Support for a variety of messaging protocols, such as JMS, OpenMessaging, etc
  • Less dependence

Spring Cloud Stream

Spring Cloud Stream is a framework for building message-driven microservices.

Spring Cloud Stream provides a unified abstraction for messaging middleware configuration, including PUB/Sub, Consumer Groups, Semantics, and Stateful Partition.

The core components of Spring Cloud Stream are: Binders, Bindings, and Messages, which applications interact with with Binders through inputs or outputs, bind with Binders through our configurations, bind with middleware, and Message is the unified data specification format for data exchange.

  • Binding: includes Input Binding and Output Binding.

Binding provides a bridge between the message-oriented middleware and the providers and consumers provided by the application, enabling developers to use only the providers or consumers of the application to produce or consume data, and shielding developers from the underlying message-oriented middleware.

  • Binder: Component that integrates with external message-oriented middleware to create bindings. Each message-oriented middleware has its own Binder implementation.

Such as the realization of Kafka KafkaMessageChannelBinder, The realization of the RabbitMQ RabbitMessageChannelBinder and realize RocketMQMessageChannelBinder RocketMQ.

  • Message: is a module in the Spring Framework that serves to unify the programming model for messaging.

For example, the model corresponding to Messaging includes a message body Payload and a message Header.

Spring – the cloud – stream’s official website

Window setup deploys RocketMQ

download

The latest version is 4.6.0

Download it and unpack it into the: D: RocketMQ directory, preferably without Spaces or too deep, or the service may run with an error

Start the NameServer service

Configure the system environment before startup; otherwise, errors will be reported.

Please set the ROCKETMQ_HOME variable in your environment! Copy the code

System environment variable name: ROCKETMQ_HOME

Configure the environment variables according to the directory you unzip, for example, my variable value is: D: RocketMQ

Go to the window command window, go to the D: Rocketmqbin directory, and run

start mqnamesrv.cmdCopy the code

NameServer is successfully started. Do not close the window during use.

Start the Broker service

Go to the bin directory and enter

start mqbroker.cmd -n localhost:9876Copy the code

The above IP +port is the service address and port of RocketMQ.

If the preceding command is executed, the following error may occur: The main class could not be found or loaded

If this happens, go to bin–>runbroker.cmd and change %CLASSPATH% to “%CLASSPATH%”.

Save Run the preceding command again. If boot Success is displayed, the command is successfully executed.

The sample

This example implements three types of message publishing and subscription receiving.

Create the RocketMQ message producer

Create the ali-Rocketmq-producer project with port 28081

  • Pom.xml adds dependencies
<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < the parent > < artifactId > cloud - alibaba < / artifactId > < groupId > com. Easy < / groupId > < version > 1.0.0 < / version > < / parent > < modelVersion > 4.0.0 < / modelVersion > < artifactId > ali - rocketmq - producer < / artifactId > < packaging > jar < / packaging > <dependencies> <! -- RocketMQ dependency --> <dependency> <groupId>com.alibaba. Cloud </groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> <! -- Web dependency --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>Copy the code

  • Configure Binding for Output and match@EnableBindingAnnotations make it valid

Application. Yml configuration

server: port: 28081 spring: application: name: ali-rocketmq-producer cloud: stream: rocketmq: binder: Bindings: outpuT1: {destination: test-topic1, Content-type: application/json} output2: {destination: test-topic2, content-type: application/json} management: endpoints: web: exposure: include: '*' endpoint: health: show-details: alwaysCopy the code

ArProduceApplication.java

@SpringBootApplication @EnableBinding({MySource.class}) public class ArProduceApplication { public static void main(String[] args) { SpringApplication.run(ArProduceApplication.class, args); }}Copy the code

  • Message Producer service

MySource.java

package com.easy.arProduce;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

    @Output("output1")
    MessageChannel output1();

    @Output("output2")
    MessageChannel output2();
}Copy the code

SenderService.java

package com.easy.arProduce; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.util.MimeTypeUtils; @Service public class SenderService { @Autowired private MySource source; / * * @ * * * send String param MSG * / public void the send (String MSG) {Message Message. = MessageBuilder withPayload (MSG). The build ();  source.output1().send(message); } public void sendWithTags(String MSG, String MSG, String MSG, String String) String tag) { Message message = MessageBuilder.withPayload(msg) .setHeader(RocketMQHeaders.TAGS, tag) .build(); source.output1().send(message); } public <T> void sendObject(T, T, T, T); String tag) { Message message = MessageBuilder.withPayload(msg) .setHeader(RocketMQHeaders.TAGS, tag) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); source.output2().send(message); }}Copy the code

Write the TestController.java controller to facilitate testing

package com.easy.arProduce; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = "test") public class TestController { @Autowired SenderService senderService; @RequestMapping(value = "/send", method = RequestMethod.GET) public String send(String msg) { senderService.send(msg); Return "String message sent successfully!" ; } @RequestMapping(value = "/sendWithTags", method = RequestMethod.GET) public String sendWithTags(String msg) { senderService.sendWithTags(msg, "tagStr"); Return "Message with tag string sent successfully!" ; } @RequestMapping(value = "/sendObject", method = RequestMethod.GET) public String sendObject(int index) { senderService.sendObject(new Foo(index, "foo"), "tagObj"); Return "Object message sent successfully!" ; }}Copy the code

Create a RocketMQ message consumer

Create the Ali-Rocketmq-Consumer project with port 28082

  • Pom.xml adds dependencies
<? The XML version = "1.0" encoding = "utf-8"? > < project XMLNS = "http://maven.apache.org/POM/4.0.0" XMLNS: xsi = "http://www.w3.org/2001/XMLSchema-instance" Xsi: schemaLocation = "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < the parent > < artifactId > cloud - alibaba < / artifactId > < groupId > com. Easy < / groupId > < version > 1.0.0 < / version > < / parent > < modelVersion > 4.0.0 < / modelVersion > < artifactId > ali - rocketmq - consumer < / artifactId > < packaging > jar < / packaging > <dependencies> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>Copy the code

– Configure the Input Binding with the @enableBinding annotation for it to take effect

Application. Yml configuration

server: port: 28082 spring: application: name: ali-rocketmq-consumer cloud: stream: rocketmq: binder: name-server: Bindings: {consumer.orderly: true} # RocketMQ bindings: {consumer.orderly: true} # Tags: tags. Tags: tagObj. Tags: Bindings: input1: {destination: bindings: input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1} input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxAttempts: 1} input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxAttempts: 1} management: endpoints: web: exposure: include: '*' endpoint: health: show-details: alwaysCopy the code

ArConsumerApplication.java

package com.easy.arConsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication @EnableBinding({MySource.class}) public class ArConsumerApplication { public static void main(String[] args) { SpringApplication.run(ArConsumerApplication.class, args); }}Copy the code

  • Message consumer Services

MySource.java

package com.easy.arConsumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySource {
    @Input("input1")
    SubscribableChannel input1();

    @Input("input2")
    SubscribableChannel input2();

    @Input("input3")
    SubscribableChannel input3();
}Copy the code

ReceiveService.java

package com.easy.arConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; @Service @Slf4j public class ReceiveService { @StreamListener("input1") public void receiveInput1(String receiveMsg) { Log.info (" Input1 received a message: "+ receiveMsg); } @streamListener (" inpuT2 ") public void receiveInput2(String receiveMsg) {log.info(" Input2 received the message: "+ receiveMsg); } @streamListener ("input3") public void receiveInput3(@payload Foo Foo) {log.info("input3 received a message: "+ Foo); }}Copy the code

Use the sample

Sample associated project

In this example, we created two project implementations

  • Ali-rocketmq-producer: Indicates the producer of the RocketMQ message service. The service name is Ali-RocketMq-producer and the port is 28081
  • Ali-rocketmq-consumer: RocketMQ message service consumer, service name: Ali-RocketMQ-producer, port: 28082

Run the sample tests

The ali-RocketMQ-Producer service and ali-RocketMQ-Consumer service should be started first

  • To access the message service producers address: http://localhost:28081/test/send? msg=yuntian

View the service consumer console, output

The 2019-12-04 15:37:47. 6356-859 the INFO [MessageThread_1] com. Easy. ArConsumer. ReceiveService: Input1 received the message: Yuntian 15:37:47 2019-12-04. 6356-859 the INFO/MessageThread_1 S.B.R.C.R ocketMQListenerBindingContainer: consume C0A8096E200818B4AAC212CDA70E0014 cost: 1 msCopy the code

The string consumption was successfully consumed by input1

  • To access the message service producers address: http://localhost:28081/test/sendWithTags? msg=tagyuntian

View the service consumer console, output

The 2019-12-04 15:38:09. 6356-586 the INFO [MessageThread_1] com. Easy. ArConsumer. ReceiveService: Input2 received message: tagyuntian 15:38:09 2019-12-04. 6356-592 the INFO/MessageThread_1 com. Easy. ArConsumer. ReceiveService: Input1 received the message: Tagyuntian 15:38:09 2019-12-04. 6356-592 the INFO/MessageThread_1 S.B.R.C.R ocketMQListenerBindingContainer: consume C0A8096E200818B4AAC212CDFCD30015 cost: 6 msCopy the code

The tagyuntian string is successfully consumed by input2 and input1, because input1 subscribs to test-topic1 and does not receive any messages without a tag filter. The tagyuntian string is successfully consumed by inpuT2 and input1

  • To access the message service producers address: http://localhost:28081/test/sendObject? index=1

View the service consumer console, output

The 2019-12-04 15:41:15. 6356-285 the INFO [MessageThread_1] com. Easy. ArConsumer. ReceiveService: Input3 received a message: Foo{id=1, bar=' Foo '}Copy the code

Input3 successfully received the tagObj message, but Input1 did not output the message. This is because sendObject publishes the message through the test-topic2 message pipeline, so it is not published to input1 and input2 subscribers

data

  • Spring Cloud Alibaba example source code
  • The original address
  • RocketMQ project

Spring Boot and Cloud learning projects