Introduction to the

SpringCloud Stream is a highly extensible event-driven microservice component for building connections to shared messaging systems. It provides a flexible programming model for setting up stand-alone production-level Spring applications based on Spring Boot, and using Spring Integration to provide connectivity to message brokers allows us to use it with little concern for the specific message queue implementation. It shields the differences of underlying messaging middleware, reduces switching costs, unifies messaging programming model, and enables developers to pay more attention to their own business.

Architectural model

Maybe we could look at a simpler picture

As you can see, each system only relies on its own Binder to interact with message-oriented middleware or other systems. Stream hides all the details of sending messages and only cares about three core modules

  • Destination BindersThe target binder that tells the Stream which message queue service you want to bind toBinderImplement. For example,RabbitMQorKafkaBinder? This is its core building block, responsible for supporting and providing integration with external systems or external messaging systems that we own
  • Destination BindingsDestination binding, which provides a bridge between message producers and consumers to the Stream. For example,RabbitMQYou need to tell the Stream what the current system is using to send messageschannel -> exchange -> routingKey -> queueWhat are they (of course this is all done in the configuration file)
  • Message: Is the Message we need to send

For any message, just provide the three core modules above and we don’t have to worry about the details of sending.

Until The 3.2.1 release of SpringCloud Stream, it supported almost all of the popular message queuing products on the market. RabbitMQ, Kafka, RocketMQ, AWS SNS/SQS, etc., mainly due to this trend of centralization, different messaging middleware vendors have developed their own binders to provide SpringCloud Stream.

Early experience

Take RabbitMQ as an example to experience Stream message driver development. First we need to introduce dependencies

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code

Make sure you have a basic understanding of the components in RabbitMQ before specifying them in the configuration file. Otherwise read the RabbitMQ Basics

producers

Configuration file:

Rabbitmq: host: 129.204.178.49 # Your rabbitMQ service port: 5672 username: guest password: guest cloud: Bindings: output channel-demo: # channel Binders: binders: binder type: Rabbit # rabbitMQ Rabbit: Bindings: output-channel Producer: routing-key-expression: ''demoRoutingKey''Copy the code

Declare output channel

Public interface MessageSource {@output ("output-channel-demo") MessageChannel Output(); }Copy the code

Define a channel binding class

/** * This annotation is used to specify one or more interfaces that define @input or @output annotations. * */ @enableBinding (MessageSource. Class) public class MessageSourceHandler {}Copy the code

Next we write an integration test and send the message

@Autowired MessageSource messageSource; / messaging Test * * * / @ Test public void Test () {messageSource. The output (). The send (MessageBuilder. WithPayload (" Test "). The build ()); }Copy the code

Now that the message has been successfully sent, let’s write the consumer

consumers

The configuration file

Binders: Cloud: stream: binders: # Binders: Type: rabbit # rabbitMQ Rabbit: Bindings: input-channel-demo: Bindings: input-channel-demo: consumer: binding-routing-key: 'demoRoutingKey' Bindings: input-channel-demo: SomeGroup # Prevents multiple consumer instances from receiving messages repeatedly, such that a message is sent to only one instance of the same group Host: 129.204.178.49 Port: 5672 username: guest password: guestCopy the code

Declare input channel

Public interface MessageSink {@input ("input-channel-demo") SubscribableChannel Input(); }Copy the code

Declaring a bound class

@enableBinding (messagesink.class) public class MessageSinkHandler {/** * listen for input-channel-demo messages, The @StreamListener annotation supports SPEL expressions, * */ @streamListener ("input-channel-demo") public void consume(String message){ System.out.println(" received message: "+message); }}Copy the code

A complete SpringCloud Stream microservice message-driven demo is completed, the application is launched, and consumers can successfully receive test messages sent by producers. To use SpringCloud Stream well you must understand the content of the configuration file!

GitHub source address SpringCloud-stream introductory case

Sending delayed messages

Sending deferred messages in SpringCloud Stream is very simple. First we need to specify the type of switch to be a deferred switch in the producer/consumer profile

Bindings: input-channel-demo: # Message input channel Consumer: delayed-exchange: true binding-routing-key: 'demoRoutingKey'Copy the code

Same as the producer, omitted here. Then you just need to add a header to the code you send above

/ / set the message for 30 seconds after a messageSource to consumers. The output (). The send (MessageBuilder. WithPayload (" test "). SetHeader (" x - delay ", 30 * 1000).build());Copy the code

If you send delayed messages and raise unknown Exchange type ‘X-delayed -message’, it is because your RabbitMQ service does not have a delay queue plug-in installed. Go to the official website to install it

The business of deferring messages is now in place, and as you can see here it is very easy to integrate messages using SpringCloud Stream, for example Almost all configurations are in RabbitConsumerProperties, RabbitProductProperties, and the common producer and consumer properties are in their parent RabbitCommonProperties. Almost all of RabbitMQ’s features and functions can be done directly in the configuration file. RabbitMQ Consumer Properties has limited capabilities. For details on other advanced features, see RabbitMQ Consumer Properties

But if you think that’s true, you’re wrong, as SpringBoot, which is easy to use and takes 20% of your energy, can take 200% of your energy to play well. SpringCloud Stream actually contains a series of complex technical systems, such as Spring Intergration, Spring Message, Spring AMQP and so on. Its internal principle implementation and component integration are very complicated.

I think one of the reasons SpringCloud Stream hasn’t caught on this long is that there are so many things involved in this technical system, and if something goes wrong in a production environment and you have to read the source code, it’s a lot more technical work than expected.

Spring Message

Spring Message is a sub-module of the Spring Framework that defines a unified programming model for messages, and in fact SpringCloud Stream is based on its implementation.

Spring Message defines the Message programming model in the figure above, proposing the abstraction of Channel Channel and Message Message. All messages are sent by the producer to the Message middleware in Output Channel, and then all consumers get messages from the Input Channel Input. The Message itself consists of two parts, the header and the payload.

The core annotations we covered in our initial experience above are the embodiment of this model

  • @output: represents the Output channel from which the producer sends a message
  • @input: represents the Input channel from which the consumer reads the message
  • @enableBinding: Binds the interface that defines the channel to aBeanSo that we can pass theBeanThe operation channel sends and receives messages.
  • StreamListener: Subscribes to messages in the input channel

SpringCloud Function functional programming

After Release 3.1 of SpringCloud Stream, you will find that several core annotations such as @enableBinding have been deprecated by the official annotation. This is due to the official release of the updated functional programming model SpringCloud Function. Try to push programming to a higher level with this component. This article does not cover this component in detail, but shows you how to combine SpringCloud Function for message sending and consumption in SpringCloud Stream.

The channel naming of messages in conjunction with the SpringCloud Function follows the following convention

  • Input:<functionName> + -in- + <index>
  • Output:<functionName> + -out- + <index>

Index represents the input or output bound index, so for now we just write 0.

Task-based message

Referring to the official document Suppliers (Sources), we start by writing a producer’s method for sending messages.

@bean public Supplier<String> source1() {return () -> "test timing message "; }Copy the code

Then according to the rules of the channel in the application. The configuration in the yml channel called source1 – out – 0, to configure spring. Cloud. The function. The destination = source1, specify the functions of the function name.

Next we write about consumers, and again we need a way to consume.

@bean public Consumer<String> sink1() {return message -> system.out.println (" Received message :" + message); }Copy the code

Then change the channel name in the configuration file to sink1-in-0 according to the channel rules. A simple timed message is sent and received, and the producer sends a message every second to the consumer. It has to be said that the integration of SpringCloud Stream and SpringCloud Function is really…… It’s amazing.

Business-triggered messages

But we are more likely to use business-triggered sending of messages, so SpringCloud Stream provides us with a StreamBridge component. You can use it to send messages by specifying the channel name

@test public void Test () {streambridge. send(" source1-out0 "," Test message "); }Copy the code

Now that we have finished sending the message, the consumer can just use the above consumption function.

conclusion

I have to say that with the integration of SpringCloud Function, the sending and receiving of messages has moved to a new stage, but the configuration specification

+ -in- +

makes me feel a little uncomfortable…… Even now I think the annotations that were deprecated before 3.1 May be more suitable for our development.

conclusion

SpringCloud Stream only supported RabbitMQ and Kafka when it was used in production projects last year, but now almost all popular messaging-oriented middleware has developed binders to accommodate it, demonstrating its dominance.

Although I always advocate the updating and iteration of technology, I would like to sincerely remind you that we can try to introduce and use it in new projects, and we should be cautious in updating technical components in old projects. After all, SpringCloud Stream involves too many and complex technical systems. This article is just the tip of the iceberg for SpringCloud Stream. We don’t have a good handle on it right now, but I still believe it will become the mainstream of messaging middleware docking!

We can see SpringCloud Stream as a set of technologies that try to push message-driven to the next level, but I think that goal is still a bit distant in terms of actual usage at this point……

If this article has helped you, please like it and follow it! Your support is my motivation to continue to create!