This is the 23rd day of my participation in Gwen Challenge

preface

What gave birth to Spring Cloud Stream? When talking about microservices, we will inevitably talk about message middleware, such as RabbitMQ, ActiveMQ, RocketMQ, Kafka, etc. In general, these message middleware is designed to deliver messages. Although the purpose of message delivery is different, different middleware has different definition of message structure. Spring Cloud Stream has emerged, shielding the differences of the underlying messaging middleware, reducing switching costs, and unifying the messaging programming model.

What is Spring Cloud Stream?

Let’s take a quick look at two projects related to Spring Cloud Stream: Spring Messaging and Spring Integration

Spring Messaging

Spring Messaging is a module in Spring source code, its structure is:

In addition to the structure listed in the picture below, there is also a base (in the current Spring Message source directory).

The base effect

  • Defines a Message(MessageHeader and body). A Message consists of two parts: Header and Payload
  • MessageHandler: A convention for processing messages
  • MessageChannel: sends a message

Other modules

  • Converter module: converter that supports message conversion
  • Core module: The core core module provides template methods for messages
  • Handler module: a processing module
  • Simp module: contains generic support for simple messaging protocols such as STOMP
  • The Support module provides Message implementation, MessageBuilder to create a Message, MessageHeaderAccessor to get a Message header, and various MessageChannel implementations and Channel interceptor support
  • TCP module: provides the abstraction and implementation of establishing TCP Connection through TcpOperations, processing messages through TcpConnectionHandler and sending messages through TcpConnectionf; Another aspect includes support for TCP messaging based on Reactor.

Spring Integration

Spring Integration provides extensions to the Spring programming model to support Enterprise Integration Patterns and is an extension to Spring Messaging.

  • The message is routed by MessageRoute
  • Message distribution MessageDispatcher
  • Message Filter
  • Message conversion Transformer
  • An Aggregator of messages
  • Message split Splitter
  • MessageChannel and MessageHandler implementations are provided

Spring Cloud Stream

Based on Spring Integration, Binder, Binding, @enableBinding, @StreamListener and other concepts are proposed.

How to use Spring Cloud Stream?

Applications interact with Binder objects in Spring Cloud Stream through inputs or outputs

  • Binding through our configuration
  • The Binder objects of Spring Cloud Stream are responsible for interacting with the messaging middleware
  • Message event-driven is achieved by using Spring Integration to connect message broker middleware

Use effect

The separation between the application and the details of the messaging middleware is achieved by defining the Binder as an intermediate layer

Spring Cloud Stream

Create the Spring Cloud Stream message sending service module

Pom content


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>study</artifactId>
        <groupId>brief.talk.spring.cloud</groupId>
        <version>1.0.0 - the SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <! -- Basic Configuration -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

Copy the code

yml

server:
  port: 8500

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: Configure the rabbitMQ service to bind to rabbitMQ.
        defaultRabbit: # represents the name of the definition used for binding consolidation
          type: rabbit Message component type
          environment: Set the environment configuration for RabbitMQ
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # Service integration processing
        output: This name is the name of a channel
          destination: studyExchange # indicates the Exchange name definition to use
          content-type: application/json # set message type to JSON and text to "text/plain"
          binder: defaultRabbit Set the specific Settings for the message service to bind

eureka:
  client: The client is configured to register with Eureka
    service-url:
      defaultZone: http://localhost:8080/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 Set heartbeat interval (default: 30 seconds)
    lease-expiration-duration-in-seconds: 5 # If the 5 seconds interval is now exceeded (default is 90 seconds)
    instance-id: send-8500.com  Display host name in info list
    prefer-ip-address: true     Change the access path to an IP address
Copy the code

Business interface and implementation

public interface MessageProvider {
    public String send(a);
}
Copy the code
public class MessageProviderImpl implements MessageProvider {
    @Resource
    private MessageChannel output; // Message sending pipeline

    @Override
    public String send(a)
    {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("Send data serial =" + serial);
        return null; }}Copy the code

Control layer implementation

@RestController
public class SendMessageController
{
    @Resource
    private MessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage(a) {
        returnmessageProvider.send(); }}Copy the code

Create the Spring Cloud Stream message receiving service module

Pom is the same as the YML messaging service module, with port changed to something else, such as 8501

Control layer implementation

@Component
@EnableBinding(WriteBuffer.Sink.class)
public class ReceiveMessageListenerController
{
    @Value("${server.port}")
    private String serverPort;


    @StreamListener(Sink.INPUT)
    public void input(Message<String> message)
    {
        System.out.println("Consumer port number is" + serverPort +",-----> Received message:+message.getPayload()+"\t port: "+serverPort); }}Copy the code

Today’s summary

Today we will talk about Spring Messaging and Spring Integration, as well as Spring Cloud Stream.