This is the seventh day of my participation in the August More text Challenge. For details, see: August More Text Challenge

Spring Cloud Stream is a framework for building highly extensible and event-driven microservice systems that connect to common messaging systems. Spring Data encapsulates a unified framework for access to a common messaging system, just as Spring Data encapsulates access to all Data sources. At this time of writing, Spring Cloud Stream encapsulates rabbitMq, Kafka, kafkaStream, Amazon Kinesis, Then there are Google PubSub, Solace PubSub+, and Azure Event Hubs in maintenance. What is all this stuff? Well, I know RabbitMQ and Kafka. However, I feel that with such a high level framework, the details will have to be omitted, and it will certainly not replace the API of each product. It’s just that this kind of encapsulation does make it easy to get started, and the idea of unified encapsulation is actually the best thing to learn. Use RabbitMQ to play with this awesome framework. The sample code is uploaded to the gitee.com/tearwind repository,…

What is event driven?

The essence of event-driven is the observer model, broadcasting changes in data as events without paying attention to each of the parties involved. In the JDK, there are observers and Observables to encapsulate the Observer pattern, and in Spring there are event-driven mechanisms to facilitate event-driven systems. For example, the electronic mall, someone to buy something, looking simple, a lot of people behind to follow the busy. Order record, notify inventory, initiate payment, SMS notification, email notification… A bunch of things, all have to start one by one. Scrambling, worrying about all the processes going wrong. So how do you get out of this mess? Then use the event drive, the order of the event, broadcast in the mall, let each related service to take the initiative to pay attention to the order of the event, rather than one by one to notify. In this way, it becomes event-driven. The event-driven pattern adds a lot of scalability to the system. One day, to add a service that recommends products to people who buy something, you don’t need to make any changes to the existing ordering function, just add a service that focuses on ordering events.

Basic concepts of Spring Cloud Stream

Spring Cloud Stream encapsulates the three most basic concepts for a unified abstraction of various third-party messaging middleware. 1. Destination Binders: Components responsible for integrating external messaging systems. 2. Destination Binding: a bridge created by Binders that connects external messaging systems, message senders, and message consumers. 3. Message: A simple data structure for Message senders to communicate with Message consumers.

Simple use of Spring Cloud Stream

The Srping Cloud Stream encapsulates a very simple set of frameworks that provide fast access to the messaging middleware with very little code. Take RabbitMQ as an example. First, create a Maven project and introduce dependencies:

        <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
Copy the code

There is also spring-cloud-stream support for Rabbit and there is a package called Spring-cloud-stream-Binder rabbit, which seems to make no difference. In fact, their Github codebase is all in one place. The spring-boot-starter- Web dependency is simply to encapsulate a test environment. Then, define the message receiver

@Component @EnableBinding(Sink.class) public class StreamReceiver { @EventListener @StreamListener(Sink.INPUT) public void process(Object message) { System.out.println("received message : " + message); }}Copy the code

Then, define an HTTP interface to encapsulate a message sender

@RestController @EnableBinding(Source.class) public class SendMessageController { @Autowired private Source source; @GetMapping("/send") public Object send(String message) { MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message); source.output().send(messageBuilder.build()); return "message sended : "+message; }}Copy the code

Next, add the SpringBoot configuration file application.properties

spring.cloud.stream.bindings.output.destination=streamExchange

spring.cloud.stream.bindings.input.destination=streamExchange
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain
Copy the code

Then, you can configure the startup class to start the application and access the RabbitMQ service on your host.

@SpringBootApplication public class RabbitApplication { public static void main(String[] args) { SpringApplication.run(RabbitApplication.class, args); }}Copy the code

Start the service, http://localhost:8080/send? access interface If message=123, you can see that the background service received the message that was sent.

What does Spring Cloud Stream do?

Accessing and receiving RabbitMQ messaging middleware is accomplished in a few lines of code. So what exactly does Spring Cloud Stream do in this example? So let’s sort this out. Configure server addresses in the SpringBoot AutoConfigure package, which encapsulates a set of default Rabbit server configurations. Check out the RabbitProperties class. Of course, you can override these configurations in application.properties to point to the actual RabbitMQ address.

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualHost=/
Copy the code

If you look at the RabbitMQ server management page, you can see that the program declares a streamExchange for a Topic and binds it to a streamExchange. Stream Queue. The binding route keyword is #. (I won’t go into the details of RabbitMQ, but check out my blog post on RabbitMQ.) The message sender sends the message to streamExchange, RabbitMQ forwards the message to the streamExchange. Stream queue, and the message receiver receives the message from the queue. This process is what Spring Cloud Stream does behind the scenes for us. Interacting with Kafka, SpringCloudStream does something similar under the same code encapsulation. It helps us to block out the details of the interaction with the messaging middleware, so that the developers are not even aware of the existence of the messaging middleware, and can pay more attention to the business details of message processing. In fact, to change the example above to access Kafka, you don’t need to move the code at all. You just need to change the spring-cloud-starter- Stream-Rabbit dependency and change the configuration file to point to the kafka service address. Of course, such a simple use, certainly is not enough. The Spring Cloud Stream package removes some of the more detailed aspects of RabbitMQ and exposes some of them when it comes to further use of RabbitMQ. What doesn’t agree with you? Let’s dig a little deeper.

Dive into Spring Cloud Stream

For configuration information, see github.com/spring-clou…

Binder configuration

Spring Cloud Stream defines an external messaging server with Binder. For RabbitMQ, the binder is an abstraction of Exchange. By default, RabbitMQ binders use SpringBoot’s ConnectionFactory, so they also support all the RabbitMQ configuration information provided in the Spring-boot-starter – AMQP component. These configurations start with spring.rabbitmq in the application. Properties file. — About spring-boot-starter- AMqp in the Gitee example as well. Can refer to. And if you want to configure a number of different Binder to access external message server (such as access to kafka and the rabbitmq) at the same time, by spring. Cloud. Stream. The binders.. Environment.= format. In addition, when configured with multiple binder, by spring. Cloud. Stream. The default – binder attribute specifies the default binder.

spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.password=guest
Copy the code

For example, such a configuration specifies a binder named TestBinder. As to’s official website to spring. Cloud. Stream. Rabbit. Binder some of the configuration in the beginning, I tried. I don’t know if the configuration format is wrong, or the version is too low. Eclipse’s STS plug-in does not support this type of attribute. For details, see Spring Cloud Stream Rabbitmq Github: github.com/spring-clou…

2. Binding configuration

A Binding is the bridge through which messages are actually exchanged. In RabbitMQ, a Binding to receive a message usually corresponds to a queue (and, of course, to an Exchange). In Spring Cloud Stream, binding is still bound to the binder, and the message receiver receives the message through the binding. In the Spring Cloud Stream, we need to inject a Binding interface object into the container with the @enableBinding annotation. In the interface object, we need to add the @input annotation specifying the Binding to receive the message. The @output annotation specifies a binding for sending the message. In Spring Cloud Stream, three interface objects, Source, Sink and Processor, are provided by default. These three objects are very simple interfaces. Of course, you can also configure your own binging interface object. For example, Source is defined like this:

public interface Source {
	String OUTPUT = "output";
	@Output(Source.OUTPUT)
	MessageChannel output();

}
Copy the code

The Output specified in the @output annotation is the name of the declared binding object that will be assigned to a queue in RabbitMQ, which is declared by the Spring Cloud Stream as a queue for sending messages by default. Once declared with the @enableBinding, this binding does not require any configuration to be injected by @AutoWired and used directly for sending messages. Without any configuration information, the application will declare a default exchange and queue in RabbitMQ after starting the application. But the default queue name is long and many of the details are not good enough. If you want to customize it, you will of course need to configure some of the binding properties in the configuration file. Sping Cloud in the Stream of bingding configuration is spring. The Cloud. Stream. Bindings.. = format. Such as:

spring.cloud.stream.bindings.output.destination=streamExchange
spring.cloud.stream.bindings.output.group=myoutput
Copy the code

Note that the destination property on the sender side corresponds to an Exchange in RabbitMQ, and the group to a queue. With this declaration, Spring Cloud Stream will declare a streamExchange exchange(topic type) in RabbitMQ and a queue named streamExchange. Myoutput. Bind to streamExchange (routingKey is #). This can be seen on the RabbitMQ administration page. Messages sent through this interface are sent to the streamExchange where they are forwarded by RabbitMQ to the queue bound to it. For more properties, see github. None of the above configuration examples seem to be available directly, but they are available. For example, usually used by spring. Cloud. Stream. Bindings. The output. The content-type specifies send message format. When there are multiple binder, can through property spring. Cloud. Stream. Bindings.. Binder Manually specifies the binder to which the current Binding is bound.

3. Configure consumer groups

The grouped consumption strategy for SpringCloudStream

SpringCloudStream’s consumption grouping strategy is intended to be unified into a kafka-like grouping mechanism for consumption. Recall that the grouping policy in Kafka is that different groups consume the same copy of a message, while only one consumer in the same group consumes a message. This group consumption strategy, strictly speaking, does not exist in Rabbit. RabbitMQ uses different types of exchange to implement different consumption policies. This is quite different from the kafka approach, but it is easy to see the echoes of kafka’s grouping strategy in SpringCloudStream’s implementation of RabbitMQ. Spring Cloud Stream also wants to port this grouping strategy to RabbitMQ when there are multiple instances of consumers consuming the same bingding. That is, in different groups, the same Message will be consumed, while in the same group, there will only be a consumer Message to a Message. Spring Cloud Stream also provides a customized configuration for this consumption strategy. The grouped consumption policy in Spring Cloud Stream is configured as follows.

# start sender partition spring. Cloud. Stream. Bindings. The output. The producer. The partitioned = true # assigned to participate in news consumption of partition node number Spring. Cloud. Stream. Bindings. The output. The producer. The partition - count only consumption partition ID = 2 # 1 consumer can end receives the message spring.cloud.stream.bindings.output.producer.partition-key-expression=1Copy the code

On the message consumer side, you can also specify the partition ID of the current consumer

# start partition spring. Cloud. Stream. Bindings. Input. Consumer. The partitioned = true # involved in the consumption of partition node number Spring. Cloud. Stream. Bindings. Input. Consumer. The instance - count = 2 # set the instance the consumer end of the partition ID spring.cloud.stream.bindings.input.consumer.instance-index=1Copy the code

With this grouping policy enabled, only odd-numbered messages will be consumed by the current consumer instance, and even-numbered messages will not be sent to the consumer. Note that this does not mean that even numbered messages will not be consumed, just not by this instance. This is a consumption pattern that RabbitMQ does not have in its native form. In fact, Spring Cloud Stream implements this partition consumption function in its own way. In fact, if you trace the RabbitMQ implementation, you will see that the Spring Cloud Stream creates a separate queue for each valid partition after adding partitions on the consumer side. In the example above, a queue named streamExchange. Stream-1 is created (streamExchange. Stream-0 is created only if there is a consumer with partition ID 0), and the messages on the sender side, Streamexchange.stream-1, not streamExchange.stream. This completes the partition consumption. However, after using it, I found that the partition consumption strategy for RabbitMQ does not feel very strict anyway. There are not many checks and logs when the number of partitions and partition ids are allocated differently, but no messages are received. In addition, there is the condition attribute in the @StreamListener annotation to allocate messages based on conditions, which supports a SPEL expression that accepts only messages that meet the conditions.

Use the native message forwarding mechanism

Of course, Spring Cloud Stream can also transfer the message forwarding mechanism to the external messaging middleware itself. If you want to use RabbitMQ fanout, worker and other modes, you can create your own exchange and queue in RabbitMQ. Then declare it in the Spring Cloud Stream.

# binding exchange spring. Cloud. Stream. Binding. < bindingName >. The destination = fanoutExchange # binding queue Spring. Cloud. Stream. Binding. < bindingName >. The group = myQueue # does not automatically create the queue Spring. Cloud. Stream. Rabbit. Bindings. < bindingName >. Consumer. BindQueue = false # does not automatically statement exchange exchange (to be automatic statements are topic) Spring. Cloud. Stream. Rabbit. Bindings. < bindingName >. Consumer. DeclareExchange # queue name = false statement only group name (the front without destination prefix) Spring. Cloud. Stream. Rabbit. Bindings. < bindingName >. Consumer. QueueNameGroupOnly rouytingKey = true # binding Spring. Cloud. Stream. Rabbit. Bindings. < bindingName >. Consumer. BindingRoutingKey = myRoutingKey # binding exchange types Spring. Cloud. Stream. Rabbit. Bindings. < bindingName >. Consumer. ExchangeType = < type > # binding routingKey spring.cloud.stream.rabbit.bindings.<bindingName>.producer.routingKeyExpression='myRoutingKey'Copy the code

4. Service consumer message forwarding

The service consumer also forwards the received message in the Spring Cloud Stream by adding the ** @sendto ** declaration to the recipient’s service interface. The Spring Cloud Stream forwards the return value from the receive method to another specified binding.

5. Message format

In message-oriented middleware, messages that are typically delivered are just string types, but Spring Cloud Stream also supports extending message types, for example, by passing objects as JSON strings between message senders and message receivers. But in my opinion, it’s not very useful.

spring.cloud.stream.bindings.input.content-type=text/plain
Copy the code

In Applicaton.properties, you can specify the message type as content-type. The default is Text /plain. If you need to pass an object, you can change it to Application /json. This essentially adds a message converter that converts the message format when it is received.

6. Rabbit customization

In addition to the general configuration, Spring Cloud Stream also supports personalized configuration of each messaging middleware. This part of the configuration in the spring. Cloud. Stream. Begin with the rabbit, support to configure a separate binder, producer and consumer. For example, dead letter queues and lazy load queues, described below, are unique to RabbitMQ. For details, refer to the official Github repository.

6.1, Dead letter exchange

RabbitMQ has a special class of exchanges and queues that handle Dead letters. This means that the message in the queue becomes dead letter when one of the following occurs: 1. The message is rejected (basic) reject/basic) nack) and set the message does not return queue, spring. The rabbitmq. Listener. Default – requeue – rejected = true = false (the default configuration is true, If a message fails to be processed, it will return to the queue. If the queue is full, it will loop back to the queue. This mechanism of RabbitMQ is very good for delaying queues or resending messages. The dead-letter mechanism for RabbitMQ is to declare a dead-letter switch dlExchange on the normal dlQueue. Then the dead-letter switch can be unqueued and distributed just like a normal Exchange. This is done by declaring several properties in the queue to specify the dead-letter switch. These properties can be specified in the SpringBoot Bean declaration, in the SpringCloudStream configuration, or even when the RabbitMQ manager sets up a queue. The SpringCloudStream configuration is shown below to specify this property. The other way is easy, so I won’t go into it.

X-dead-letter-routing-key: x-dead-letter-exchange: mirror. DlExchange Mirror. MessageExchange1. MessageQueue1 dead-letter switch routing - key x - message - TTL: 3000 message expiration time durable: true persistence, this is a must.Copy the code

The management side sees the following queue messages:

For example, the following sets of configurations enable messages sent from the Output binding to be consumed by the Input binding after 3 seconds.

spring.cloud.stream.rabbit.bindings.input.destination=DlqExchange
spring.cloud.stream.rabbit.bindings.input.group=dlQueue

spring.cloud.stream.rabbit.bindings.output.destination=messageExchange1
spring.cloud.stream.rabbit.bindings.output.producer.required-groups=messageQueue1
spring.cloud.stream.rabbit.rabbit.bindings.output.producer.autoBindDlq=true
spring.cloud.stream.rabbit.rabbit.bindings.output.producer.ttl=3000
spring.cloud.stream.rabbit.rabbit.bindings.output.producer.deadLetterExchange=DlqExchange
spring.cloud.stream.rabbit.rabbit.bindings.output.producer.deadLetterQueueName=DlqExchange.dlQueue
Copy the code

6.2 lazy Queue

RabbitMQ introduced the concept of Lazy queues in version 3.6.0. An important design goal of lazy queues, which store messages to disk as much as possible and are loaded into memory when the consumer consumes the corresponding message, is to be able to support longer queues, that is, more message storage. Lazy queues are necessary when consumers are unable to consume messages for a long period of time due to a variety of reasons, such as consumers being offline, down, or down for maintenance, etc. The specific configuration is to configure an “x-queue-mode” attribute for the queue. The value of this attribute can be default or lazy. The default value is default. Then add this property to the queue as above.

Add small details of discovery:

In the example, for the Spring Cloud Stream version, the 1.2.1.RELASE version of The Dalston.sr3 version of Spring Cloud is used initially. Later, a newer version of 2.2.0.RELEASE was manually specified. In the new version, consumers defined in StreamReceiver can consume not only RabbitMQ messages but also many system events. Like AsyncConsumerStartedEvent, ApplicationReadyEvent (springBoot start events), ServletRequestHandledEvent (request response events) and so on. This also means that Spring Cloud Stream gradually becomes a higher level of messaging middleware than external messaging middleware.

conclusion

Spring Cloud Stream provides us with a unified and simple package of access to messaging middleware to quickly develop event-driven microservices. However, each message middleware in fact processing is very different, this unified package is quite a strong twist of the taste of the melon is not sweet. This is also thanks to the Great god of the Spring community, the average hand, estimated casually yellow. Rabbit, for example, when I use the low version of Spring in the Cloud, does not support Spring. Cloud. Stream. The properties of Rabbit configuration, a see be want to have all the consumption patterns to a unified message middleware, But later versions also support spring. Cloud. Stream. The properties of rabbit configuration, can do some special configuration and processing for rabbit. My feeling is that if you don’t have much use for message-oriented middleware, you can use Spring Cloud Stream to build RabbitMQ quickly. But if you go deeper, it feels like a lot more work than using a native API or a spring-cloud-starter- AMQP package for specific development.