WebSocket has been widely used in modern browsers. In some business scenarios, it is required to be able to push messages from the server to the client. We used DWR in the days before Websockets, and DWR was a great solution back then. But with the rise of WebSocket, we prefer to use standard implementations to solve problems,

First of all, this article is not about WebSocket configuration, mainly about the choice of solutions in the cluster mode of microservices architecture.

Everyone is familiar with microservice architecture. In microservice architecture, services are distributed and each service exists in the form of cluster to ensure the availability of business. In cluster mode, data consistency, such as cache and session, is required to ensure that each node in the cluster accesses the same result.

Microservice cluster caching is usually solved by distributed cache Redis, and session consistency is usually solved by Redis. However, stateless Http, or session-free, is more popular nowadays. The most common solution is OAuth.

WebSocket is different, it is to establish a long connection with the server. Under the cluster mode, it is obviously impossible to establish a connection between the front end and every node in the service cluster. A feasible idea is to realize WebSocket session sharing through Redis, just like solving the HTTP session sharing. However, the number of WebSocket sessions is much larger than the number of HTTP sessions (because every page opened will establish a Websocket connection), so with the growth of users, the amount of shared data is too large, which is easy to cause bottlenecks.

Another idea is that webSocket will always be connected to a node in the cluster, so as long as you find the node where the connection is, you can push messages to the server, so the problem to solve is how to find a webSocket connection to the node. To find out which node the connection is on, we need a unique identifier to find the connection. However, in a STOMp-based publish-subscribe model, a push of a message may be for several connections and may be distributed to each node in the cluster, making it costly to find the connection. For every Websocket message, we push it on every node in the cluster, subscribe to the connection of the message, no matter there is one or ten thousand, we will eventually receive the message. Based on this idea, we made some technical selection:

  • RabbitMQ

  • Spring Cloud Stream

RabbitMQ is an advanced message queue that can broadcast messages (kafka can do the same, but only one is here), and Spring Cloud Stream is a framework for building highly scalable event-driven microservices. And it can integrate with RabbitMQ, Kafka and many other messaging services. Using stream, switching from RabbitMQ to Kafka is just a matter of configuration change. Next, focus on the use method:

Introduction of depend on

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
Copy the code

Configuration of Binder

Binder, an important concept in Streams, is the message-oriented middleware used to configure the publishing and subscribing events for streams. Let’s look at the configuration:

spring:
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                username: username
                password: password
                virtual-host: /
Copy the code

Type specifies the type of the message-oriented middleware. Environment specifies the configuration of the message-oriented middleware. The configuration structure here is exactly the same as that of the configuration items in the spring. rabbitMQ namespace. If environment is not configured, binder will use the spring. rabbitMQ namespace. For example, the rabbitMQ configuration in your project looks like this:

spring:
  rabbitmq:
    host: localhost
    username: username
    password: password
    virtual-host: /
Copy the code

The environment configuration of the binder on the door can be completely removed.

Binding of message flows to binder

As the name of the Spring Cloud Stream implies, you need to use streams, so you need to declare two event streams in the configuration, one input stream and one output stream:

spring:
  cloud:
    stream:
      bindings:
        websocketMessageIn:
          destination: websocketMessage
          binder: defaultRabbit
        websocketMessageOut:
          destination: websocketMessage
          binder: defaultRabbit
Copy the code

As you can see here, event flows reference binders to indicate that the two flows use RabbitMQ middleware. (As you can see here, it is perfectly acceptable to use both Rabbit and Kafka as messaging-middleware for event flows in one project.)

WebsocketMessageIn, websocketMessageOut is the name of the event stream. Destination specifies that the destination of the two event streams is the same. This determines that writes and reads point to the same place (not necessarily to the same message queue).

Event flow declaration

Event flows are defined using interfaces:

/** * WebSocket message event stream interface * Created by Ng on 18-11-8. **@authorWaley *@since1.4.3 * /
interface WebSocketMessageStream {
  companion object {
    const val INPUT: String = "webSocketMessageIn"
    const val OUTPUT: String = "webSocketMessageOut"
  }

  /** * enter */
  @Input(INPUT)
  fun input(a): SubscribableChannel

  /** * output */
  @Output(OUTPUT)
  fun output(a): MessageChannel
}
Copy the code

Declare the event stream interface, which defines two constants corresponding to the two stream names in the configuration. Call the input() method to get the input stream, and call the output() method to get the output stream.

The implementation of this interface is done by Spring Cloud Stream and does not need to be implemented by itself.

Using event streams

Declare a bean:

@ Component @ EnableBinding (WebSocketMessageStream: : class) class WebSocketMessageService {...Copy the code

The @enableBinding annotation here specifies the event flow interface class, and only when this annotation is added (to be recognized by Spring, either to the entry class or to the @Configuration annotation class) will the interface be implemented and added to the Spring container (which can be injected).

The WebSocketMessageService content is as follows:

  @Autowired
  private lateinit var stream: WebSocketMessageStream
  @Autowired
  private lateinit var template: SimpMessagingTemplate

  @StreamListener(WebSocketMessageStream.INPUT)
  fun messageReceived(message: WebSocketMessage) {
    template.convertAndSend(message.destination, message.body)
  }

  fun send(destination: String, body: Any) {
    stream.output().send(
        MutableMessage(WebSocketMessage(destination, body))
    )
  }
Copy the code

Receives the message

The @StreamListener annotation specifies the stream of events to listen on, the parameters received by the method are the message content of the event (using Jackson deserialization), and the messageReceived method sends the received message directly to the front end using webSocket

Send a message

Sending is also very simple, sending a message directly to the input stream. The send method above sends a message that would have been sent to the WebSocket using SimpMessagingTemplate to the Spring Cloud Stream event stream. After doing this, all operations in your project that need to push webSocket messages to the front end should call the SEND method.

At this point, you may be confused and have some questions. Why should each microservice node receive event messages? Or how the configuration controls event messages received by a single node versus multiple nodes. Take your time, guys. I’m going to introduce you to rabbit lore:

First look at the Rabbit message queue:

As you can see from the figure, there are multiple queues starting with webSocketMessage. This is when each microservice node creates a message queue.

Both the Exchange name here and the message queue name prefix above are webSocketMessage, which is specified by destination in the binding configuration above, consistent with the destination name

When an application writes an event to the input stream, destination is used as the key (that is, webSocketMessage) to write the message to an exchange named webSocketMessage. Since exchange is bound to message queues with a webSocketMessage prefix and routing key #, Exchange will route messages to each message queue starting with webSocketMessage. If you do not understand, please refer to the information), so that each microservice can receive the same message.

This configuration can push messages to each microservice node. What if a message needs to be received by only one node? It’s simple, a configuration item can do it:

spring:
  cloud:
    stream:
      bindings:
        websocketMessageIn:
          group: test
          destination: websocketMessage
          binder: defaultRabbit
Copy the code

Rabbitmq will create a message queue called webSocketmessage. test. (The message queue is deleted automatically when the microservice is disconnected. The message queue is persistent, that is, it will not be deleted even if all the microservice nodes are disconnected), all the microservice nodes listen to this queue, and when there is a message in the queue, it will only be consumed by one node.

There’s more to Spring Cloud Stream than that, but it’s enough to do what I need to do. Please refer to the official Spring Cloud Stream documentation for the rest of the configuration:

Cloud. Spring. IO/spring – clou…