In today’s increasingly complex applications, message-oriented middleware is essential, and different businesses may choose different message-oriented middleware, such as RabbitMQ or Kafka. Different middleware has different logical structures. Switching from RabbitMQ to Kafka can be a disastrous migration, so to solve this problem, Spring Cloud Stream helps us to block the differences in messaging middleware and implement messaging.

1. Introduction

Spring Cloud Stream provides inputs, outputs, and binders to implement interactions between applications, where output is the message sender and input is the message receiver, bound by the binders, and then connected through channels. Developers do not need to pay attention to the specific messaging middleware used, only to implement the corresponding interface.

2. Install the RabbitMQ

In the author’s another article [the RabbitMQ and Kafka: Win10 installation tutorial] (www.jianshu.com/p/63d32ab93…

After RabbitMQ is successfully started, enter http://localhost:15672/#/ in the browser. If the following page is displayed, RabbitMQ is successfully installed and started.

3. Set up stream – producer – 8700

The new sub-project springCloud-Stream-Producer-8700 (stream-producer-8700 for short) has the following structure:

3.1 Pom.xml introduces dependencies


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloudtest</artifactId>
        <groupId>com.elio.springcloud</groupId>
        <version>1.0 the SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>springcloud-stream-producer-8700</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

    <! -- Hot start plugin -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <fork>true</fork>
                    <addResources>true</addResources>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
Copy the code

3.2 Adding the application.yml configuration file

Where cloud.stream.bindings is the output channel we want to bind, where myOutput is custom

server:
  port: 8700 # port

spring:
  application:
    name: springcloud-stream-producer

  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

  cloud:
    stream:
      bindings:
        myOutput:
          destination: stream-test

eureka:
  instance:
    instance-id: ${spring.application.name}:${server.port}
  client:
    fetch-registry: true
    register-with-eureka: true
    service-url:
      defaultZone: http://localhost:8300/eureka/,http://localhost:8301/eureka/


Copy the code

3.3 New StreamProducer8700 primary startup class

Is there any special comment on the main launcher class

package com.elio.springcloud;

import com.elio.springcloud.service.MySource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableDiscoveryClient
public class StreamProducer8700 {

    public static void main(String[] args){ SpringApplication.run(StreamProducer8700.class, args); }}Copy the code

3.4 Added the MySource personalization interface

In fact, there is a standard Source interface, but can be customized, because the actual production is also more customized.

package com.elio.springcloud.service;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

@Component
public interface MySource {

    @Output("myOutput")
    MessageChannel myOutput(a);
}


Copy the code

3.5 Adding SendService

This service class will be called by the Controller to send the message

package com.elio.springcloud.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;

@EnableBinding({MySource.class})
public class SendService {

    @Autowired
    private  MySource mysource;

    public void sendMsg(String msg){ mysource.myOutput().send(MessageBuilder.withPayload(msg).build()); }}Copy the code

3.6 New StreamProducerController class

package com.elio.springcloud.controller;

import com.elio.springcloud.service.SendService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class StreamProducerController {

    @Autowired
    private SendService sendService;

    @GetMapping(value="/send/{msg}")
    public void send(@PathVariable("msg") String msg){ sendService.sendMsg(msg); }}Copy the code

4. Set up stream – consumer – 8800

Added springcloud-stream-consumer-8800 (stream-consumer-8800 for short) as the message receiver.

4.1 Modifying the import dependencies of POM. XML


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloudtest</artifactId>
        <groupId>com.elio.springcloud</groupId>
        <version>1.0 the SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>springcloud-stream-consumer-8800</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

    <! -- Hot start plugin -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <fork>true</fork>
                    <addResources>true</addResources>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
Copy the code

4.2 Adding the Application. yml configuration file

Where myInput is a custom input

server:
  port: 8800 # port

spring:
  application:
    name: springcloud-stream-consumer

  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

  cloud:
    stream:
      bindings:
        myInput:
          destination: stream-test

eureka:
  instance:
    instance-id: ${spring.application.name}:${server.port}
  client:
    fetch-registry: true
    register-with-eureka: true
    service-url:
      defaultZone: http://localhost:8300/eureka/,http://localhost:8301/eureka/


Copy the code

4.3 StreamConsumer8800 main startup class added

package com.elio.springcloud;

import com.elio.springcloud.message.MySink;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableDiscoveryClient
public class StreamConsumer8800 {

    public static void main(String[] args){ SpringApplication.run(StreamConsumer8800.class, args); }}Copy the code

4.4 New MySink class

package com.elio.springcloud.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {

    @Input("myInput")
    SubscribableChannel myInput(a);
}

Copy the code

4.5 New MySink class

package com.elio.springcloud.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {

    @Input("myInput")
    SubscribableChannel myInput(a);
}

Copy the code

4.6 Added the ReceiveService class

package com.elio.springcloud.message;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(MySink.class)
public class ReceiveService {

    @StreamListener("myInput")
    public void recieve(Object payload){ System.out.println(payload); }}Copy the code

5. Test

Start eureka-server-8300, Eureka-server-8301, stream-producer-8700, and stream-consumer-8800 in sequence. After the startup, access the stream-producer-8700 sending interface

http://localhost:8700/send/hello%20world%20stream

It then looks at the stream-Consumer-8800 console and finds that the consumer has successfully received the message.

6. Summary

In this article, we simply implement the spender and spender of messages via Spring Cloud Stream. During the configuration process, we do not interact with RabbitMQ, but continue to interact with input, output, and binder, thus masking the details of the underlying messaging middleware. Achieve the effect of decoupling.