Source: author: Qiu Cheng makes segmentfault.com/a/119000001…

causes

Recently, WHEN I was working on a project, I encountered a problem requiring communication between multiple users, involving WebSocket handshake request and WebSocket Session sharing in the cluster.

After several days of research, I have summarized several methods to implement distributed WebSocket cluster, from Zuul to Spring Cloud Gateway, and summarized this article, hoping to help some people, and can share ideas and research in this area.

The following is my scenario description

  • Resources: 4 servers. Only one server with SSL domain name, one Redis +mysql server, and two application servers (cluster)
  • Restrictions on application publication: The application site needs the domain name authenticated by SSL to publish applications. Therefore, the SSL-authenticated domain name server is used as the API gateway, responsible for HTTPS requests and WSS (secure authenticated WS) connections. Commonly known as HTTPS uninstallation, a user requests an HTTPS domain name server, but the access is in the form of HTTP + IP address. As long as the gateway configuration is high, it can handle multiple applications
  • Requirements: A user needs to establish a WSS connection with the server to log in to the application. Different roles can send messages either individually or in groups
  • Type of application service in the cluster: Each cluster instance is responsible for HTTP stateless request service and WS long connection service

System Architecture Diagram

In my implementation, each application server is responsible for HTTP and WS requests, but it is possible to separate the CHAT model of WS requests into a module. From a distributed point of view, the two implementation types are similar, but in terms of implementation convenience, one application serves HTTP + WS requests in a more convenient way. This will be explained below

The technology stack covered in this article

  • Eureka service discovery and registration
  • Redis sharing Session
  • Redis message subscription
  • Spring Boot
  • Zuul gateway
  • Spring Cloud Gateway Gateway
  • Spring WebSocket handles long connections
  • Ribbon Load Balancing
  • Netty multi-protocol NIO network communication framework
  • Consistent Hash Indicates a Consistent Hash algorithm

Those of you who have made it this far have already seen the stacks I’ve listed above, but if you haven’t, check out the online tutorials for a start. The following content is related to the above technology, the title of the default you all know…

Technical feasibility analysis

Below I describe the Session features and, based on them, list n clustering solutions for processing WS requests in distributed architectures

WebSocketSession and HttpSession

In the WebSocket integrated by Spring, each WS connection has a corresponding session: WebSocketSession. In Spring WebSocket, after establishing a WS connection, we can communicate with the client in a way like this:

Protected void handleTextMessage(WebSocketSession Session, TextMessage Message) {system.out.println (" The message received by the server:  "+ message ); //send message to client session.sendMessage(new TextMessage("message")); }Copy the code

So here’s the problem: WS sessions cannot be serialized to Redis, so in a cluster we cannot cache all webSocketSessions to Redis for session sharing. Each server has its own session. The opposite is HttpSession. Redis can support HttpSession sharing, but there is no webSocket session sharing scheme, so redis WebSocket Session sharing is not an option.

Some people may wonder: can I cache sessin key information to Redis, and the servers in the cluster fetch session key information from Redis and rebuild webSocket session… I just want to say that if anyone can figure it out, please let me know…

HTTP session sharing has a solution, and it’s as simple as introducing dependencies: Spring-session-data-redis and spring-boot-starter-redis, you can play a demo on the Internet and see how to do that. However, the webSocket session sharing scheme is unable to achieve the real webSocket session sharing due to the way of the underlying implementation of Websocket.

Evolution of solutions

Netty and Spring WebSocket

At the beginning, I tried to use Netty to build websocket server. In Netty, there is no such thing as a Websocket session. It is similar to a channel. Each client connection represents a channel. The front-end WS request is processed by a series of handlers (responsibility chain mode) after ws handshake connection through webSocket protocol through the port monitored by NetTY. Similar to webSocket session, the server has a channel after the connection is established, through which we can communicate with the client

/** * TODO; Assigned to different group * / private static final ChannelGroup group = new DefaultChannelGroup (ImmediateEventExecutor. INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame MSG) throws Exception {//retain Adds the reference count, System.out.println(" server receives message from "+ ctx.channel().id() +" : "+ msg.text()); WriteAndFlush (msg.retain()); // Send the message to all channels in the group. }Copy the code

So, does the server use Netty or Spring WebSocket? I’ll list the pros and cons of both implementations in several ways

Use Netty to implement Websocket

Anyone who has played Netty knows that the threading model of Netty is niO model, which is very high concurrency. Before Spring 5, the network threading model was implemented by servlets, and servlets are not NIO model, so after Spring 5, the underlying network implementation adopted Netty. If we use Netty alone to develop websocket server, the speed is absolutely fast, but may encounter the following problems:

  1. It is not easy to integrate with other applications of the system and cannot enjoy the convenience of Feign service calls in SpringCloud when RPC calls are made
  2. Business logic may have to be implemented repeatedly
  3. Using netty may require re-building the wheel
  4. How to connect to the service registry is also a troublesome thing
  5. Restful services and WS services need to be implemented separately. If restful services are implemented on Netty, how much trouble can be imagined. I believe that many people are used to one-stop restful development with Spring.

Implement WS services using Spring WebSocket

Spring WebSocket is already well integrated with SpringBoot, so developing WS services on SpringBoot is very convenient and simple.

Spring Boot foundation is not introduced, recommend the actual combat tutorial: github.com/javastacks/…

Step 1: Add dependencies

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
Copy the code

Step 2: Add a configuration class

@Configuration public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myHandler(), "/") .setAllowedOrigins("*"); } @Bean public WebSocketHandler myHandler() { return new MessageHandler(); }}Copy the code

Step 3: Implement the message listening class

@Component @SuppressWarnings("unchecked") public class MessageHandler extends TextWebSocketHandler { private List<WebSocketSession> clients = new ArrayList<>(); @Override public void afterConnectionEstablished(WebSocketSession session) { clients.add(session); System.out.println("uri :" + session.getUri()); System.out.println(" establish connection: "+ session.getid ()); System.out.println("current seesion: " + clients.size()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { clients.remove(session); System.out.println(" disconnect: "+ session.getid ()); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { String payload = message.getPayload(); Map<String, String> map = JSONObject.parseObject(payload, HashMap.class); System.out.println(" received data "+ map); Clients. ForEach (s -> {try {system.out.println (" send message to: "+ session.getid ()); S.sendmessage (new TextMessage(" The server returns the received message," + payload)); } catch (Exception e) { e.printStackTrace(); }}); }}Copy the code

From this demo, you can imagine the convenience of using Spring WebSocket to implement WS services. To better align myself with the Spring Cloud family, I ended up using Spring WebSocket to implement WS services.

So my application service architecture looks like this: an application is responsible for both restful and WS services. The WS service module was not split because it was split to use FeIGN for service calls. The first one is lazy, and the second one is a layer of IO calls between services, so I didn’t do it.

Transition from Zuul technology to Spring Cloud Gateway

To implement webSocket clustering, we had to transition from Zuul to Spring Cloud Gateway. Here’s why:

Websocket forwarding is not supported in Zuul 1.0. Websocket is available in Zuul 2.0. Zuul 2.0 was open source a few months ago, but version 2.0 is not integrated with Spring Boot and is poorly documented. So transition is necessary and easy to achieve.

In gateway, some of the following configurations in the YML file are necessary to implement SSL authentication and dynamic routing load balancing, so as to avoid any pitfalls.

Spring Boot foundation is not introduced, recommend the actual combat tutorial: github.com/javastacks/…

server:
  port: 443
  ssl:
    enabled: true
    key-store: classpath:xxx.jks
    key-store-password: xxxx
    key-store-type: JKS
    key-alias: alias
spring:
  application:
    name: api-gateway
  cloud:
    gateway:
      httpclient:
        ssl:
          handshake-timeout-millis: 10000
          close-notify-flush-timeout-millis: 3000
          close-notify-read-timeout-millis: 0
          useInsecureTrustManager: true
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true
      routes:
      - id: dc
        uri: lb://dc
        predicates:
        - Path=/dc/**
      - id: wecheck
        uri: lb://wecheck
        predicates:
        - Path=/wecheck/**
Copy the code

If we want to happily play HTTPS uninstall, we also need to configure a filter, otherwise the gateway will get an error not an SSL/TLS record

@Component public class HttpsToHttpFilter implements GlobalFilter, Ordered { private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI originalUri = exchange.getRequest().getURI(); ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest.Builder mutate = request.mutate(); String forwardedUri = request.getURI().toString(); if (forwardedUri ! = null && forwardedUri.startsWith("https")) { try { URI mutatedUri = new URI("http", originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), originalUri.getPath(), originalUri.getQuery(), originalUri.getFragment());  mutate.uri(mutatedUri); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } ServerHttpRequest build = mutate.build(); ServerWebExchange webExchange = exchange.mutate().request(build).build(); return chain.filter(webExchange); } @Override public int getOrder() { return HTTPS_TO_HTTP_FILTER_ORDER; }}Copy the code

This way we can use the Gateway to uninstall HTTPS requests. So far, our basic framework has been set up and the gateway can forward BOTH HTTPS requests and WSS requests. Next is the communication solution of session communication between users. Next, I’m going to start with the least elegant solution, based on the elegance of the solution.

The session radio

This is the simplest webSocket cluster communication solution. The scenario is as follows:

Teacher A wants to send A mass message to his students

  • The teacher’s message request is sent to the gateway, and the content is {THIS is teacher A, I want to send XXX message to my students}
  • The gateway receives the message, obtains all IP addresses of the cluster, and invokes the teacher’s request one by one
  • Each server in the cluster gets the request and searches for whether there is A local session associated with students according to teacher A’s information. If there is, the sendMessage method is called; if there is not, the request is ignored

Session broadcast implementation is very simple, but there is a fatal flaw: waste of computing power. When the server does not have session of message receiver, it is equivalent to wasting computing power of a loop. This scheme can be given priority when the concurrency requirement is not high, and it is easy to implement.

The spring Cloud method for obtaining information about each server in the service cluster is as follows

@Resource private EurekaClient eurekaClient; Application app = eurekaClient.getApplication("service-name"); InstanceInfo instanceInfo = app.getInstances().get(0); instanceInfo = app.getInstances().get(0); System.out.println("ip address: " + instanceInfo.getIPAddr());Copy the code

The server needs to maintain a mapping table to map user ids to sessions. When a session is established, a mapping relationship is added to the mapping table. When a session is disconnected, the mapping relationship in the mapping table is deleted

Implementation of Consistent Hash Algorithm (Key points of this paper)

This method is I think the most elegant implementation of the scheme, it takes a certain amount of time to understand this scheme, if you watch patiently, I believe you will be rewarded. And again, if you’re not familiar with consistent hashing, let’s just assume that the hashing loop is clockwise.

First, to apply the idea of a consistent hash algorithm to our WebSocket cluster, we need to solve the following new problems:

  • If the cluster node is DOWN, the hash ring is mapped to the node whose status is DOWN.
  • If the cluster node is UP, the old key cannot be mapped to the corresponding node.
  • Hash rings read and write to share.

In a cluster, the problem of UP/DOWN services always occurs.

The analysis of the DOWN node is as follows:

When a server goes DOWN, its webSocket session automatically closes the connection and the front-end receives a notification. This will affect the hash ring mapping error. We only need to delete the actual nodes and virtual nodes corresponding to the hash ring when the server is listening DOWN, so as to avoid the gateway forwarding to the server in DOWN state.

Implementation method: Monitor cluster service DOWN event in Eureka governance center, and update hash ring in time.

The analysis of UP nodes is as follows:

Now assume that there is a service CacheB online in the cluster, and that the IP address of the server is mapped between KEY1 and cacheA. Each time the user corresponding to KEY1 wants to send a message, the user will run to CacheB to send a message. The result is obviously that the message cannot be sent because CacheB does not have a session corresponding to key1.

At this point we have two solutions.

Plan A is simple and big:

After eureka detects the node UP event, it updates the hash ring according to the existing cluster information. Disconnect all session connections and let clients reconnect, at which point the client will connect to the updated hash link point to avoid message failure.

Plan B is complex and small:

Let’s take a look at the case without a virtual node, assuming that a server CacheB is live between CacheC and CacheA. All users mapped from CacheC to CacheB will look for sessions in the CacheB to send messages. This means that once a CacheB is online, it will affect users from CacheC to CacheB to send messages. Therefore, we simply disconnect CacheA from the session corresponding to the user from CacheC to CacheB and allow the client to reconnect.

Next is the case where there are virtual nodes, assuming the light-colored ones are virtual nodes. We use long brackets to indicate that the result of a region mapping belongs to a Cache. The first is that node C is not online. All virtual nodes of B will point to real nodes of B, so the counterclockwise part of all B nodes will map to B (because we specify that the hash ring looks clockwise).

Next, the C node comes online, and you can see that some areas are occupied by C.

From the above situation, we can know that when a node goes online, many corresponding virtual nodes will also go online at the same time. Therefore, we need to disconnect the session corresponding to keys in multiple segments (the red part in the figure above). The specific algorithm is a little complicated, the implementation of different ways, you can try to achieve their own algorithm.

Where do I put my hash ring?

  • Gateway creates and maintains hash rings locally. When the WS request comes in, the hash ring is locally retrieved and the mapping server information is retrieved to forward the WS request. This method looks good, but in fact it is not desirable. Remember that eureka can only listen on the server when it is DOWN. After eureka listens on the DOWN event, does it need to notify the Gateway to delete the corresponding node through IO? It is obviously too cumbersome, and splitting Eureka’s responsibilities to the Gateway is not recommended.
  • Eureka created and placed in Redis shared read and write. This solution works. When Eureka listens to the service DOWN, it changes the hash ring and pushes it to Redis. To keep the request response time as short as possible, we should not have the Gateway go to Redis to fetch a hash ring every time it forwards a WS request. The probability of a hash ring change is really low, and the Gateway can solve this problem by simply applying redis’s message subscription model to subscribe to a hash ring change event.

So far our Spring WebSocket cluster is almost complete, but the most important thing is the consistent hashing algorithm. Now there is a final technical bottleneck, how does the gateway forward to the specified cluster server based on ws requests?

The answer lies in load balancing. Spring Cloud Gateway or Zuul integrates ribbon as load balancing by default. All we need to do is rewrite the ribbon load balancing algorithm based on the user ID sent by the client when setting up a WS request. Hash based on the user ID and search for IP in the hash ring. And forwarding the WS request to that IP is done. The process is as follows:

Next, when the user communicates, he only needs to hash according to the ID and obtain the corresponding IP in the hash ring, and then he can know which server the session exists when he establishes the WS connection with the user!

The ribbon is incomplete in the Spring Cloud Finchley.RELEASE

In practice, the topic finds two imperfections in the ribbon……

  • When AbstractLoadBalancerRule is used to override load balancing policies based on methods found online, requests from multiple different applications become chaotic. If there are two services A and B on Eureka, after rewriting the load balancing policy, the services requested by A or B will be mapped to only one of them. Very strange! Perhaps the Spring Cloud Gateway website needs to present a demo of a properly rewritten load balancing strategy.
  • A consistent hash algorithm requires a key, similar to the user ID, to hash based on the key and then search the hash ring and return the IP. However, the ribbon does not complete the key parameter of the choose function.

Is there nothing we can do about it? In fact, there is a feasible and temporary alternative!

As shown in the figure below, the client sends a normal HTTP request (containing the ID parameter) to the gateway. The gateway hashes the IP address in the hash ring based on the ID and returns the IP address to the client, which then makes a WS request based on the IP address.

We cannot implement consistent hashing in the Ribbon because the ribbon does not handle keys properly. Consistent hashing can only be achieved indirectly by the client making two requests (one HTTP and one WS). Hopefully the Ribbon will update this bug soon! Let’s make our Websocket cluster implementation a little more elegant.

Afterword.

These are the results of my exploration these days. Many problems were encountered and solved one by one. Two webSocket cluster solutions were listed. The first is session broadcast and the second is consistent hashing.

These two schemes have their own advantages and disadvantages for different scenarios. In this paper, ActiveMQ, Karfa and other message queues are not used to realize message push. Instead, they just want to simply realize long connection communication between multiple users through their own ideas without relying on message queues. I hope to provide you with a different way of thinking.

Recent hot articles recommended:

1.1,000+ Java Interview Questions and Answers (2021)

2. Don’t use if/ else on full screen again, try strategy mode, it smells good!!

3. Oh, my gosh! What new syntax is xx ≠ null in Java?

4.Spring Boot 2.6 is out with a lot of new features.

5. “Java Development Manual (Songshan version)” the latest release, quick download!

Feel good, don’t forget to click on + forward oh!