Integrate SpringBoot RocketMQ

“This is my fourth day of participating in the First Challenge 2022. For more details: First Challenge 2022.”

Code address: gitcode.net/java_wxid/s…

1, fast combat

In this section we’ll look at how SpringBoot can quickly integrate RocketMQ.

When using SpringBoot’s Starter integration package, pay special attention to the version. Because SpringBoot’s starter dependency for RocketMQ integration is provided by the Spring community, it is currently being rapidly iterated and the gap between versions is so great that even the underlying objects are constantly changing. For example, if you use rocketmq-spring-boot-starter:2.0.4, upgrading to the latest rocketmq-spring-boot-starter:2.1.1 will not work.

We create a Maven project that introduces key dependencies:

<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> < version > 2.1.1 < / version > < exclusions > < exclusion > < groupId > org. Springframework. Boot < / groupId > <artifactId>spring-boot-starter</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> < version > 2.1.6. RELEASE < / version > < / dependency > < the dependency > < groupId >. IO springfox < / groupId > < artifactId > springfox swagger - UI < / artifactId > < version > 2.9.2 < / version > < / dependency > < the dependency > < the groupId > IO. Springfox < / groupId > < artifactId > springfox - swagger2 < / artifactId > < version > 2.9.2 < / version > < / dependency > </dependencies>Copy the code

\

Rocketmq-spring-boot-starter: the SpringBoot package version introduced in 2.1.1 is 2.0.5.RELEASE, which upgrades the SpringBoot dependency package.

Then we’ll quickly create a simple Demo using SpringBoot

Start the class:

@SpringBootApplication public class RocketMQScApplication { public static void main(String[] args) { SpringApplication.run(RocketMQScApplication.class,args); }}Copy the code

Configuration file application.properties

Rocketmq. name-server=139.224.233.121:9876 # Default message producer group rocketmq.producer.group=springBootGroupCopy the code

Message producer

package com.roy.rocket.basic; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.UnsupportedEncodingException; /** * @author: zhiwei Liao * @date: Created in 2022/01/09 * @description: **/ @Component public class SpringProducer { @Resource private RocketMQTemplate rocketMQTemplate; / / send a common message example public void sendMessage (String topic, String MSG) {this. RocketMQTemplate. ConvertAndSend (topic, MSG); Public void sendMessageInTransaction(String topic,String MSG) throws InterruptedException {String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { Message<String> message = MessageBuilder.withPayload(msg).build(); String destination =topic+":"+tags[i % tags.length]; SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination); System.out.printf("%s%n", sendResult); Thread.sleep(10); }}}Copy the code

Message consumer

package com.roy.rocket.basic; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * @author: zhiwei Liao * @date: Created in 2022/01/09 * @description: **/ @Component @RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic") public class SpringConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("Received message : "+ message); }}Copy the code

SpringBoot is integrated with RocketMQ, and the heart of the consumer part is in this @RocketmqMessagelistener annotation. All consumer core functions are also integrated into this annotation. For example, message filtering can be customized by the selectorType and selectorExpression properties, which specify ordered message consumption versus concurrent message consumption, which is customized by the consumeMode property. Whether a consumer is deployed in a cluster or broadcast deployment is customized by the messageModel property.

Then for transaction messages, we need to configure a transaction message listener:

package com.roy.rocket.config; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.messaging.Message; import org.springframework.messaging.converter.StringMessageConverter; import java.util.concurrent.ConcurrentHashMap; /** * @author: zhiwei Liao * @date: Created in 2022/01/09 * @description: **/ @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate") public class MyTransactionImpl implements RocketMQLocalTransactionListener { private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { Object id = msg.getHeaders().get("id"); String destination = arg.toString(); localTrans.put(id,destination); org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg); String tags = message.getTags(); if(StringUtils.contains(tags,"TagA")){ return RocketMQLocalTransactionState.COMMIT; }else if(StringUtils.contains(tags,"TagB")){ return RocketMQLocalTransactionState.ROLLBACK; }else{ return RocketMQLocalTransactionState.UNKNOWN; }} @ Override public RocketMQLocalTransactionState checkLocalTransaction Message (MSG) {/ / SpringBoot Message objects, There is no transactionId property. It's not the same as native apis. // String destination = localTrans.get(msg.getTransactionId()); return RocketMQLocalTransactionState.COMMIT; }}Copy the code

After that we start the application, will be able to access http://localhost:8080/MQTest/sendMessage? Message =123 interface to send a simple message. And it’s available in SpringConsumer.

\

Or visit http://localhost:8080/MQTest/sendTransactionMessage? Message =123 to send a transaction message.

As you can see here, the transactionId is missing when SpringBoot wraps transaction messages, which is critical in transaction control.

2, summarize

SpringBoot introduced org. Apache. Rocketmq: rocketmq – spring – the boot – starter rely on, you can interact with rocketmq by the built-in RocketMQTemplate. All related attributes start with Rockemq. Specific all the configuration information can see org. Apache. Rocketmq. Spring. Autoconfigure. RocketMQProperties this class.

The Message object in the SpringBoot dependency and the Message object in the RocketMQ-Client are two different objects, which can be very confusing when used. For example, the TAG attribute in Message in RocketMQ-Client is not present in Message in SpringBoot dependencies. The Tag attribute is moved to the send target, along with Topic, specified as Topic:Tag.

For the last time, pay attention to the version. Rocketmq-spring-boot-starter updates tend to be slightly slower than rocketMQ releases, and different releases can cause a lot of weird issues.

Apache has an official RocketMQ-Spring example at github.com/apache/rock… You can refer to this sample code in the future if the version is updated.

I also provide a case study of Springboot integration with RocketMQ at gitcode.net/java_wxid/s…

Integrate SpringCloudStream RocketMQ

Code address: gitcode.net/java_wxid/s…

SpringCloudStream is a unified message-driven framework provided by the Spring community that aims to connect all MQ messaging middleware products with a unified programming model. Let’s look at how SpringCloudStream integrates with RocketMQ.

1, fast combat

Create a Maven project and introduce dependencies:

<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> < version > 4.7.1 < / version > < / dependency > < the dependency > < groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - acl < / artifactId > < version > 4.7.1 < / version > < / dependency > < the dependency > <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> < version > 2.2.3. RELEASE < / version > < exclusions > < exclusion > < groupId > org. Apache. Rocketmq < / groupId > <artifactId>rocketmq-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> < version > 2.3.3. RELEASE < / version > < / dependency > < / dependencies >Copy the code

Application startup class:

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.cloud.stream.messaging.Source; /** * @author: zhiwei Liao * @date: Created in 2022/01/09 * @description: **/ @EnableBinding({Source.class, Sink.class}) @SpringBootApplication public class ScRocketMQApplication { public static void main(String[] args) { SpringApplication.run(ScRocketMQApplication.class,args); }}Copy the code

Note the @enableBinding ({source.class, sink.class}) annotation, which is the Binder configuration introduced by SpringCloudStream.

Then add the configuration file application.properties

# ScStream general configuration to spring. Cloud. Stream spring beginning. Cloud. Stream. Bindings. Input. The destination = TestTopic spring.cloud.stream.bindings.input.group=scGroup spring.cloud.stream.bindings.output.destination=TestTopic # rocketMQ personalized configuration in the spring. Cloud. Stream. RocketMQ beginning, cluster pattern; Separated by # spring. Cloud. Stream. Rocketmq. Binder. The name - server = 139.224.233.121:9876 Spring. Cloud. Stream. Rocketmq. Binder. The name - server = 139.224.233.121:9876Copy the code

In SpringCloudStream, a binding corresponds to a message channel. The configured input, defined in sink. class, corresponds to a message consumer. Output, which is defined in source.class, corresponds to a message producer.

You can then add message consumers:

package com.roy.scrocket.basic; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; /** * @author: zhiwei Liao * @date: Created in 2022/01/09 * @description: **/ @Component public class ScConsumer { @StreamListener(Sink.INPUT) public void onMessage(String messsage){ System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT); }}Copy the code

Message producer:

package com.roy.scrocket.basic; import org.apache.rocketmq.common.message.MessageConst; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /** * @author: zhiwei Liao * @date: Created in 2020/10/22 * @description: **/ @Component public class ScProducer { @Resource private Source source; public void sendMessage(String msg){ Map<String, Object> headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_TAGS, "testTag"); MessageHeaders messageHeaders = new MessageHeaders(headers); Message<String> message = MessageBuilder.createMessage(msg, messageHeaders); this.source.output().send(message); }}Copy the code

Finally, add a Controller class for testing:

package com.roy.scrocket.controller; import com.roy.scrocket.basic.ScProducer; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @author: zhiwei Liao * @date: Created in 2020/10/27 * @description: **/ @RestController @RequestMapping("/MQTest") public class MQTestController { @Resource private ScProducer producer; @RequestMapping("/sendMessage") public String sendMessage(String message){ producer.sendMessage(message); Return "Message sent completed "; }}Copy the code

Start the application, you can visit http://localhost:8080/MQTest/sendMessage? Message =123, sends a message to RocketMQ to TestTopic and is consumed in ScConsumer.

2, summarize

  • About SpringCloudStream. This is a set of nearly universal messaging middleware programming frameworks, such as switching from Docking RocketMQ to docking Kafka, where the business code barely needs to move beyond changing POM dependencies and modifying configuration files. However, since each MQ product has its own business model that can vary widely, it is important to pay attention to business model transformation when using SpringCloudStream. And in practice, pay close attention to the personalized configuration properties of individual MQ. For example RocketMQ personal attributes are in spring. The cloud. Stream. RocketMQ beginning, can be used only by these properties RocketMQ delay messages, sort of, such as transaction message personalization capabilities.
  • SpringCloudStream is a unified framework provided by the Spring community, but officially only encapsulates the specific dependencies for Kafka, Kafka Stream, and RabbitMQ. RocketMQ’s dependency is maintained by the vendors themselves, namely alibaba itself. This maintenance strength is obviously not small gap. So on the one hand, you can see that the versioning issues that were previously highlighted with SpringBoot are magnified with SpringCloudStream. Spring-cloud-starter-stream-rocketmq currently includes rocketMQ-Client version 4.4.0 in the latest 2.2.3.RELEASE. That’s a huge difference. On the other hand, RocketMQ’s lack of documentation is particularly problematic, and SpringCloudStream’s personalization of RocketMQ is almost impossible to document.
  • In summary, SpringCloudStream is currently not a very good integration solution for RocketMQ. It’s not as good as Kafka and Rabbit. So use it with caution.