An overview of the

Last article Spring Boot series 21 Spring Websocket implementation Websocket cluster scheme discussion in detail introduced Websocket cluster three schemes, and concluded that the third scheme is the best, this article we implement the third scheme.

The third option is shown below

The following modifications are made on the basis of scheme 1. The flow of the new architecture diagram is as follows:

  1. Service A adds A WS module to store the connection information of the user (mainly the WebSocket sesionId value) in Redis when the WebSocket connects
  2. The switch to which the message producer sends the message. These services do not push service A/B directly
  3. A new module dispatch is added, which receives the pushed information, reads the message from Redis and receives the corresponding Websocket sesionId value of the user, calculates the routing key corresponding to the user according to the above rules, and then sends the message to the queue subscribed by the user
  4. Front-end receiving messages

Detailed implementation of the code

This article is modified based on the Spring Boot series 20 Spring Websocket implementation that sends messages to specified users.

Introduce redis, RabbitMq-related jars in POM.xml

<! Support for RabbitMQ is required for webScoekt clusters. redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>Copy the code

Rabbitmq, redis configuration

application-wscluster.properties

The webSocket cluster needs to be configured with RabbitMQSpring. The rabbitmq. Host: 192.168.21.3 spring. The rabbitmq. Virtual host: - / ICC - a local spring. The rabbitmq. Username: icc-dev spring.rabbitmq.password: icc-dev# configuration redisSpring. Redis. Database = 0 spring. Redis. Host = 192.168.21.4# spring.redis.password=
spring.redis.port=7001
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0  
spring.redis.pool.max-active=8  
spring.redis.pool.max-wait=-1
Copy the code

IRedisSessionService and implementation

The IRedisSessionService implementation class stores the relationship between the user name and the Websocket sessionId in Redis, Add, delete, and query IRedisSessionService

public interface IRedisSessionService {
    void add(String name, String wsSessionId);
    boolean del(String name);
    String get(String name);
}
Copy the code

SimulationRedisSessionServiceImpl user name and websocket sessionId relations stored in the redis, provide add, delete, query

@Component public class SimulationRedisSessionServiceImpl implements IRedisSessionService { @Autowired private RedisTemplate<String, String> template; // key = login user name, Value = WebSocket sessionId private ConcurrentHashMap<String,String> redisHashMap = new ConcurrentHashMap<>(32); @param wsSessionId @param wsSessionId @param wsSessionId @param wsSessionId @param wsSessionId @param wsSessionId @param wsSessionId String wsSessionId){ BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name); boundValueOperations.set(wsSessionId,24 * 3600, TimeUnit.SECONDS); } /** * Delete user information from cache * @param name */ public Boolean del(String name){return template.execute(new RedisCallback<Boolean>() {

            @Override
            public Boolean doInRedis(RedisConnection connection)
                    throws DataAccessException {
                byte[] rawKey = template.getStringSerializer().serialize(name);
                returnconnection.del(rawKey) > 0; }},true); } /** * get user sessionId * @param name * @return
     */
    public String get(String name){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        returnboundValueOperations.get(); }}Copy the code

AuthWebSocketHandlerDecoratorFactory

Decorative WebSocketHandlerDecorator object, when the connection is established, save the websocket session id, of which the key as the account name; When the connection is disconnected, the user’s sesionId value is removed from the cache. This WebSocket sessionId value is used to create the routing key for the message.

@Component
public class AuthWebSocketHandlerDecoratorFactory implements WebSocketHandlerDecoratorFactory {
    private static final Logger log = LoggerFactory.getLogger(AuthWebSocketHandlerDecoratorFactory.class);

    @Autowired
    private IRedisSessionService redisSessionService;

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {
        returnnew WebSocketHandlerDecorator(handler) { @Override public void afterConnectionEstablished(final WebSocketSession Session) throws Exception {// After a connection is established between the client and the server, the user records who goes online. Principal Principal = session.getPrincipal();if(principal ! = null){ String username = principal.getName(); log.info("websocket online: " + username + " session "+ session.getId()); redisSessionService.add(username, session.getId()); } super.afterConnectionEstablished(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus CloseStatus) throws Exception {// When the client is disconnected from the server, who is logged out Principal Principal = session.getPrincipal();if(principal ! = null){ String username = session.getPrincipal().getName(); log.info("websocket offline: "+ username); redisSessionService.del(username); } super.afterConnectionClosed(session, closeStatus); }}; }}Copy the code

WebSocketRabbitMQMessageBrokerConfigurer

In the Spring the Boot series 20 Spring Websocket to send message to the user specified increase on the basis of the following function, myWebSocketHandlerDecoratorFactory configuration to Websocket

@configuration // This annotation uses THE STOMP protocol to transport messages based on the message broker, At this point you can @ Controller class using the @ MessageMapping @ EnableWebSocketMessageBroker public class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer { @Autowired private MyPrincipalHandshakeHandler myDefaultHandshakeHandler; @Autowired private AuthHandshakeInterceptor sessionAuthHandshakeInterceptor; @Autowired private AuthWebSocketHandlerDecoratorFactory myWebSocketHandlerDecoratorFactory; @override public void registerStompEndpoints(StompEndpointRegistry Registry) {... } @override public void configureMessageBroker(MessageBrokerRegistry) {... } /** * The new configuration of the actual Spring weboscket cluster, * @param registration */ @override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration.addDecoratorFactory(myWebSocketHandlerDecoratorFactory); super.configureWebSocketTransport(registration); }}Copy the code

TestMQCtl:

This class is modified based on the previous Spring Boot series 20 Spring Websocket implementation that sends messages to the specified user

  • The sendMq2User() method combines routing keys based on the user account and websocket sessionId based on [” Web subscription queue name +’-user’+ webSocket sessionId”]. Messages are then sent to the AMq.topic switch via the AmqpTemplate instance with the routing key [” Web subscription queue name +’-user’+ webSocket sessionId”]. The websocket sessionId method is used to retrieve other methods from Redis based on the account name, which are not listed here
@Controller
@RequestMapping(value = "/ws")
public class TestMQCtl {
    private  static final Logger log= LoggerFactory.getLogger(TestMQCtl.class); @Autowired private AmqpTemplate amqpTemplate; @Autowired private IRedisSessionService redisSessionService; /** * send request * @param MSG * @param name * @return
     */
    @RequestMapping(value = "send2user") @ResponseBody public int sendMq2User(String msg, String name){// Obtain the session ID of the user according to the user name. String wsSessionId = redissessionService.get (name); RequestMessage demoMQ = new RequestMessage(); demoMQ.setName(msg); // Generate the route key as follows: webSocket subscribed destination +"-user"+ WebSocket sessionId value. The generated values are similar to: String routingKey = getTopicRoutingKey("demo", wsSessionId); // Send messages to the AMq.topi switch with the routingKey log.info(Send message [{}] to user [{}]sessionId=[{}], routing key [{}], name, wsSessionId, wsSessionId, routingKey);
        amqpTemplate.convertAndSend("amq.topic", routingKey,  JSON.toJSONString(demoMQ));
        return0; } /** * get the generated routing key for Topic ** @param actualDestination * @Param sessionId * @return
     */
    private String getTopicRoutingKey(String actualDestination, String sessionId){
        return actualDestination + "-user"+ sessionId; }... .}Copy the code

test

Start two services on different ports start a service class: WebSocketClusterApplication “- spring. Profiles. The active = wscluster — server. The port = 8081” service parameters start A Start service B with –spring.profiles. Active =wscluster –server.port=8082

Simulation account login: xiaoming logging service A, xiaoming2 logging service using xiaoming login service. A, B and log in the websocket http://127.0.0.1:8081/ws/login use xiaoming login, and submit

Open another browser, use xiaoming2 login service B, and login websocket http://127.0.0.1:8082/ws/login use xiaoming2 login and submit, the last login websocket

Logging service send login page http://127.0.0.1:8081/ws/send A simulation, sending A message

  1. Xiaoming -receive sends A message to account xiaoming. This message can only be received by websocket connecting to service A
  2. Send a message to xiaoming2. Xiaoming2-receive can only be received by the websocket connected to service B

At this point, two pages receive a message:

Only xiaoming-receive is received. Xiaoming2 Only xiaoming2-receive is received

Send login page http://127.0.0.1:8082/ws/send login service B simulation, send messages, and http://127.0.0.1:8081/ws/send sends the same message, the result is the same

Conclusion Whether the user logs in to service A or service B, we can send messages to the specified user through the above code, so we have implemented webSocket cluster

code

Please use tag V0.24 as much as possible. Do not use master, because master changes all the time. There is no guarantee that the code in this article will always be the same as the code on Github