preface

In my previous work, springCloud was used for microservice framework and RocketMQ was used for message middleware. Recently, I saw That Alibaba integrated RocketMQ with Spring Cloud Alibaba. Out of curiosity, I wrote a demo

Some of the concepts

  • Spring Cloud Stream is a framework for building message-based microservices applications. Create a production-level stand-alone Spring application based on SpringBoot and connect to the Broker using Spring Integration.
  • Binder: Components responsible to provide integration with the external messaging systems.
  • Binding: Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by The Destination Binders provide a bridge between the message-oriented middleware and the providers and consumers provided by the application. Developers only need to use the producers or consumers of the application to produce or consume data, shielding developers from the underlying message-oriented middleware.
  • Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other Applications via External Messaging Systems. [Specification data structures used by producers and consumers to communicate with target binders.]

Quickly start RocketMQ locally

Step 1: download: www.apache.org/dyn/closer…. Step 2: extract Step 3: modify the three configuration files: runbroker. Sh, runserver then executes. Sh, tools. Sh, change the JAVA_HOME to own computer environment configuration, revise as below

[!-e "$JAVA_HOME/bin/java"] && JAVA_HOME= own address#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java[!-e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
Copy the code

Step 4: Execute commands one by one

./mqnamesrv
./mqbroker -n localhost:9876
./mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
Copy the code

If the startup is successful, no error is reported, on behalf of successful startup ha, the following can be developed

The development of the demo

Step 1: Import the relevant POM

<dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>0.2.1. RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
 </dependency>
Copy the code
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
         <! -- To view Endpoint information -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.dropwizard.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>3.2.6</version>
        </dependency>
Copy the code

Step 2: Create a SpringBoot project with the following boot class:

@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class Application {
    public static void main(String[] args) { SpringApplication.run(Application.class, args); }}Copy the code

Step 3: Create the provider

@Service
public class RocketmqProducer {
    public void send(String message) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("test_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        Message msg = new Message("test-topic"."test-tag", message.getBytes()); producer.send(msg); }}Copy the code

Step 4: Create a Consumer

@Service
public class ReceiveService {

    /** * The default is input, specified in the Sink class. If you want more than one input, you need to write a class that implements Sink *@param receiveMsg
     */
    @StreamListener("input")
    public void receiveInput1(String receiveMsg) {
        System.out.println("input receive: "+ receiveMsg); }}Copy the code

Step 5: Add the configuration file:

Server. port=8087 spring.application.name=spring-cloud-alibaba-rocketmq-demo Spring. Cloud. Stream. Rocketmq. Binder. Namesrv - addr = 127.0.0.1:9876 # define the name for the output of the binding spring.cloud.stream.bindings.output.destination=test-topic Spring. Cloud. Stream. Bindings. The output. The content-type = application/json # define the name for the input of the binding spring.cloud.stream.bindings.input.destination=test-topic spring.cloud.stream.bindings.input.content-type=application/json spring.cloud.stream.bindings.input.group=test-group management.endpoint.health.show-details=alwaysCopy the code

Step 6: Write a Controller, start the project, and access the interface

@RestController
@RequestMapping(value = "/api/demo/test")
public class TestController {

    @Autowired
    RocketmqProducer rocketmqProducer;

    @RequestMapping(value = "/send", method = RequestMethod.GET)
    public String send(a) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        rocketmqProducer.send("test rocketmq message");
        return "success"; }}Copy the code

You’ll see console output: Input Receive: Test RocketMQ message

Viewing Endpoint Information

The browser input: http://127.0.0.1:8087/actuator/rocketmq-binder

conclusion

This article only covers Spring Cloud Stream and RocketMQ. In fact, spring Cloud Stream and RocketMQ are still learning stages, so I can only sigh at the extensive and profound spring Cloud

For more information, visit www.zplxjj.com or follow our official account: