Brief introduction:This article supporting interactive tutorial has been logged in Ali Cloud Zhixing hands-on laboratory, PC login start.aliyun.com to experience immediately in the browser.

The author source | | los night alibaba cloud native public number

Play RocketMQ in the Spring ecosystem:

  • How to Play RocketMQ in the Spring Eco?
  • The Story of Romigi and Springbot…
  • RocketMQ-Spring is two years old. Why is RocketMQ-Spring the most popular messaging implementation in the Spring ecosystem?
  • RocketMQ message configuration, sending, and consumption using RocketMQ -spring-boot-starter
  • Introduction to Spring Cloud Stream System and Principle

This article supporting interactive tutorial has been logged in Ali Cloud Zhixing hands-on laboratory, PC login start.aliyun.com to experience immediately in the browser.

The Spring Cloud Bus positions itself as a message Bus within the Spring Cloud architecture, using Message Broker to connect all nodes of a distributed system. The official Reference document of BUS is relatively simple, so simple that there is not even a picture.

Here is the latest version of the Spring Cloud Bus code structure (relatively small amount of code) :

Bus Example Demonstration

Before examining the Bus implementation, let’s look at two simple examples using the Spring Cloud Bus.

1. New configuration for all nodes

The example of Bus is relatively simple, because the AutoConfiguration layer of Bus has the default configuration, just need to introduce the corresponding Spring Cloud Stream and Spring Cloud Bus dependency of message-oriented middleware. After that, all the applications started will use the same Topic for receiving and sending messages.

The corresponding DEMO of BUS has been put on GitHub. This DEMO will simulate the startup of 5 nodes. It only needs to add a configuration item to any instance, and all nodes will add this configuration item.

The Demo address: https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demo

Access the address of the configuration provided by the Controller provided by any node (the key is Hangzhou) :

curl -X GET 'http://localhost:10001/bus/env? key=hangzhou'

The result returned by all nodes is UNKNOWN, because there is no Hangzhou key in the configuration of all nodes.

EnvironmentBusEndpoint, which is used by Message Broker to add/update configuration, is provided inside the BUS.

Access the URL corresponding to the Endpoint of any node: / addr /bus-env? Name =hangzhou&value=alibaba to add configuration items (such as access to node1 url) :

curl -X POST 'http://localhost:10001/actuator/bus-env? name=hangzhou&value=alibaba' -H 'content-type: application/json'

Then access all nodes /bus/env again to get the configuration:

$ curl -X GET 'http://localhost:10001/bus/env? Key = hangzhou 'unknown % ~ ⌚ $curl -x GET' http://localhost:10002/bus/env? Key = hangzhou 'unknown % ~ ⌚ $curl -x GET' http://localhost:10003/bus/env? Key = hangzhou 'unknown % ~ ⌚ $curl -x GET' http://localhost:10004/bus/env? Key = hangzhou 'unknown % ~ ⌚ $curl -x GET' http://localhost:10005/bus/env? Key = hangzhou 'unknown % ~ ⌚ $curl -x POST' http://localhost:10001/actuator/bus-env? Name = hangzhou&value = alibaba '-h' the content-type: application/json '~ ⌚ $curl -x GET' http://localhost:10005/bus/env? Key = hangzhou 'alibaba % ~ ⌚ $curl -x GET' http://localhost:10004/bus/env? Key = hangzhou 'alibaba % ~ ⌚ $curl -x GET' http://localhost:10003/bus/env? Key = hangzhou 'alibaba % ~ ⌚ $curl -x GET' http://localhost:10002/bus/env? Key = hangzhou 'alibaba % ~ ⌚ $curl -x GET' http://localhost:10001/bus/env? key=hangzhou' alibaba%

It can be seen that a new configuration with Hangzhou as the key is added to all nodes, and the corresponding value is Alibaba. This configuration item is done through EnvironmentBusEndpoint provided by the BUS.

Here is a picture drawn by the program DD. Spring Cloud Config cooperates with Bus to complete the refresh of all node configurations to describe the previous instance (the instance in this paper is not refresh, but new configuration, but the process is the same) :

2. Configuration modification of some nodes

If destination on node1 is rocketmq-bus-node2 (rocketmq-bus-node2 is configured with spring.cloud.bus.id = rocketmq-bus-node2:10002), then destination on node1 is configured with rocketmq-bus-node2:10002.

curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2? name=hangzhou&value=xihu' -H 'content-type: application/json'

Access /bus/env to get the configuration (because messages are sent on node1, bus also makes configuration changes on the sender’s node node1) :

~ ⌚ $curl -x POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2? Name = hangzhou&value = xihu '-h' the content-type: application/json '~ ⌚ $curl -x GET' http://localhost:10005/bus/env? Key = hangzhou 'alibaba % ~ ⌚ $curl -x GET' http://localhost:10004/bus/env? Key = hangzhou 'alibaba % ~ ⌚ $curl -x GET' http://localhost:10003/bus/env? Key = hangzhou 'alibaba % ~ ⌚ $curl -x GET' http://localhost:10002/bus/env? Key = hangzhou xihu '% ~ ⌚ $curl -x GET' http://localhost:10001/bus/env? key=hangzhou' xihu%

As you can see, only node1 and node2 have changed their configuration, leaving the remaining three nodes unchanged.

The realization of the Bus

1. Introduction to the concept of BUS

1) events

Bus defines a remote event called RemoteApplicationEvent, which inherits from Spring’s event called ApplicationEvent, and currently has four implementations:

  • EnvironmentChangeRemoteApplicationEvent: remote environment change event. It is used to receive a Map

    data and update it to an event in the Environment in the Spring context. Examples of this paper is the use of this event and cooperate EnvironmentBusEndpoint and EnvironmentChangeListener.
    ,string>
  • AckRemoteApplicationEvent: remote confirmed event. Inside the Bus successfully receives the remote events will be sent back to the AckRemoteApplicationEvent confirm the event confirmation.
  • RefreshRemoteApplicationEvent: remote configuration to refresh the event. Dynamic refresh of the configuration class decorated with the @RefreshScope and all of the @ConfigurationProperties annotations.
  • UnknownRemoteApplicationEvent: remote unknown event. The Bus internal message body that converts a remote event is uniformly wrapped as that event if an exception occurs.

Bus also has a non-RemoteApplicationEvent event -SentApplicationEvent message sending event, along with Trace for remote message sending record.

These events will cooperate with ApplicationListener, such as EnvironmentChangeRemoteApplicationEvent matching EnvironmentChangeListener configured add/modify:

public class EnvironmentChangeListener implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> { private  static Log log = LogFactory.getLog(EnvironmentChangeListener.class); @Autowired private EnvironmentManager env; @Override public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) { Map<String, String> values = event.getValues(); log.info("Received remote environment change request. Keys/values to update " + values); for (Map.Entry<String, String> entry : values.entrySet()) { env.setProperty(entry.getKey(), entry.getValue()); }}}

Receive other nodes to send to EnvironmentChangeRemoteApplicationEven event called after EnvironmentManager# setProperty configuration Settings, Inside the method for each configuration item will send a EnvironmentChangeEvent event, and then listening by ConfigurationPropertiesRebinder, configure the rebind add/update operation.

2) physical Endpoint

Two endpoints, EnvironmentBusEndpoint and RefreshBusEndpoint, are exposed inside the BUS for configuration addition/modification and global configuration refresh. Their corresponding Endpoint IDs, or URLs, are bus-env and bus-refresh.

3) configuration

Bus sends messages that are bound to involve topics, groups, etc., encapsulated in BusProperties, with the default configuration prefix spring.cloud.bus, for example:

  • Spring. Cloud. Bus. Refresh. Enabled for open/close global refresh the Listener.
  • Spring. Cloud. Bus. Env. Enabled for open/close configuration add/modify the Endpoint.
  • Spring. Cloud. Bus. Ack. Enabled send to open open/closed AckRemoteApplicationEvent incidents.
  • Spring. Cloud. Bus. Trace. Enabled for opening/closing rates recorded trace Listener.

The default Topic for sending messages is Spring Cloudbus, which can be configured to change. The Group can be set to broadcast mode or use the UUID with offset to lastest mode.

Each Bus application has a corresponding Bus ID, and the official value method is complicated:

${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance\_index:${spring.application. index:${local.server.port:${server.port:0}}}}:${vcap.application.instance\_id:${random.value}}

It is recommended to configure the Bus ID manually because the destination in the Bus remote event will match against the Bus ID:

spring.cloud.bus.id=${spring.application.name}-${server.port}

2. Bus low-level analysis

The underlying analysis of BUS involves nothing but the following aspects:

  • How is the message sent
  • How is the message received
  • How does Destination match
  • How to trigger the next action after the remote event is received

BusAutoConfiguration automation configuration class is @ EnableBinding (SpringCloudBusClient. Class).

@ EnableBinding usage in the article “the Spring Cloud Stream system and the principle of introduction has shown, and its value is SpringCloudBusClient. Class, SpringCloudBusClient creates an input and output directChannel based on the proxy:

public interface SpringCloudBusClient {
    String INPUT = "springCloudBusInput";
    String OUTPUT = "springCloudBusOutput";
    @Output(SpringCloudBusClient.OUTPUT)
    MessageChannel springCloudBusOutput();
    @Input(SpringCloudBusClient.INPUT)
    SubscribableChannel springCloudBusInput();
}

The SpringCloudBusinessInput and SpringCloudBusOutput Binding properties can be modified by the configuration file (for example, by modifying the topic) :

spring.cloud.stream.bindings:
  springCloudBusInput:
    destination: my-bus-topic
  springCloudBusOutput:
    destination: my-bus-topic

Receiving and sending of messages:

// BusAutoConfiguration @EventListener(classes = RemoteApplicationEvent.class) // 1 public void acceptLocal(RemoteApplicationEvent event) { if (this.serviceMatcher.isFromSelf(event) && ! (event instanceof AckRemoteApplicationEvent)) { // 2 this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3 } } @StreamListener(SpringCloudBusClient.INPUT) // 4 public void acceptRemote(RemoteApplicationEvent event) { if (event instanceof AckRemoteApplicationEvent) { if (this.bus.getTrace().isEnabled() && ! this.serviceMatcher.isFromSelf(event) && this.applicationEventPublisher ! = null) { // 5 this.applicationEventPublisher.publishEvent(event); } // If it's an ACK we are finished processing at this point return; } if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher ! = null) { // 6 if (! this.serviceMatcher.isFromSelf(event)) { // 7 this.applicationEventPublisher.publishEvent(event); } if (this.bus.getAck().isEnabled()) { // 8 AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass()); this.cloudBusOutboundChannel .send(MessageBuilder.withPayload(ack).build()); this.applicationEventPublisher.publishEvent(ack); } } if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher ! = null) { // 9 // We are set to register sent events so publish it for local consumption, // irrespective of the origin this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass())); }}
  1. Using Spring event monitoring mechanism to monitor all local RemoteApplicationEvent remote events (such as bus – env will send EnvironmentChangeRemoteApplicationEvent events at the local, Bus – refresh will send RefreshRemoteApplicationEvent events at the local, these events will be here listening to).
  2. Determine local events received not AckRemoteApplicationEvent remote confirmed events (or you will die cycle, has been receiving messages, send messages…). And that the event is sent by the application itself (the sender of the event is the application itself), if both satisfy Step 3.
  3. Construct Message and use the remote event as payload, The message is then sent to the broker using a MessageChannel constructed by the Spring Cloud Stream with a Binding name of Spring CloudBusOutput.

4.@StreamListener Note that the Binding name of the consuming Spring Cloud Stream construct is SpringCloudBusinessMessageChannel, and the messages received are remote messages.

  1. If the distance is AckRemoteApplicationEvent remote confirmed events and apply to open the message tracking trace switch, at the same time the remote event was not sent by the application itself (event the sender is not the application itself, said the event is sent by other application). So local send AckRemoteApplicationEvent remote confirmation event said applications received other applications sent by the remote events, the end of the process.
  2. If the remote event is sent by another application to the application itself (and the receiver of the event is the application itself), then do steps 7 and 8; otherwise, do steps 9.
  3. If the remote event is not sent by the application itself (the sender of the event is not the application itself), the event is sent locally. The application itself is initially processed locally by the corresponding message receiver and does not need to be sent again.
  4. If open the switch AckRemoteApplicationEvent remote confirmed events, Tectonic AckRemoteApplicationEvent events and send the incident in the remote and local (local delivery because step 5 without local AckRemoteApplicationEvent events to send, is itself to its application to confirm; Remote is used to tell other applications that your own application has received a message.
  5. If the message Trace is turned on, the SentApplicationEvent event is constructed and sent locally.

Bus – all nodes after env trigger EnvironmentChangeListener to monitor the configuration changes, the console will print out the following information:

o.s.c.b.event.EnvironmentChangeListener  : Received remote environment change request. Keys/values to update {hangzhou=alibaba}

If the local monitoring remote AckRemoteApplicationEvent confirmed event, will receive all the nodes of information, such as node5 nodes by console AckRemoteApplicationEvent events is as follows:

ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationSer vice":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationSer vice":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationSer vice":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationSer vice":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationSer vice":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationSer vice":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationSer vice":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationSer vice":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationSer vice":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationSer vice":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}

So let’s go back to the four questions mentioned at the beginning of this chapter and answer them:

  • How the message is sent: Send the event via the Spring Cloud Stream in the BusAutoConfiguration# AcceptLocal method to the Spring CloudBuStopIC.
  • How the message is received: A Spring CloudBuStopIC message is received via the Spring Cloud Stream in the BusAutoConfiguration# AcceptMote method.
  • How Destination matches: Match the Destination in the BusAutoConfiguration# AcceptMote remote event method.
  • How to trigger the next action after the remote event is received: Bus is internally through Spring Event mechanism to receive the local RemoteApplicationEvent implementation of specific events to do further action (such as EnvironmentChangeListener received EnvironmentChangeRemoteApplicationEvent events, RefreshListener received RefreshRemoteApplicationEvent events).

conclusion

The content of Spring Cloud Bus itself is relatively small, but it is still necessary to understand the Spring Cloud Stream system and the event mechanism of Spring itself in advance. On this basis, In order to better understand the Spring Cloud Bus’s processing logic for local and remote events.

Now Bus built-in remote events is less, mostly configuration related events, we can inherit RemoteApplicationEvent and comply with @ RemoteApplicationEventScan annotations to build its own system of micro service message.

Author’s brief introduction

Fang Jian (Flower Name: Luo Ye), GitHub ID @fangjian0423, open source enthusiast, Alibaba senior development engineer, Alibaba Cloud product EDAS development, one of the leaders of Spring Cloud Alibaba open source project.

Copyright Notice:The content of this article is contributed by Aliyun real-name registered users, and the copyright belongs to the original author. Aliyun developer community does not own the copyright and does not bear the corresponding legal liability. For specific rules, please refer to User Service Agreement of Alibaba Cloud Developer Community and Guidance on Intellectual Property Protection of Alibaba Cloud Developer Community. If you find any suspected plagiarism in the community, fill in the infringement complaint form to report, once verified, the community will immediately delete the suspected infringing content.