background

At the front end, the company developed an active service robot program, which means that the message generated is pushed to the client (H5, IOS, Android) through the server. It supports the user’s personalized switch Settings, and the user can freely choose the type of message to accept; At the same time, users are supported to ask questions; Document the general idea of the deployment and implementation here;

Meanwhile, I would like to thank my Leader for his help.

The deployment of

Nginx configuration

  • To keep the long connection valid, configure HTTP version 1.1.
  • configurationUpgradeandConnectionResponse header information;

The complete configuration is as follows:

location / {
    proxy_pass http://nodes;

    # enable WebSockets
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}
Copy the code

The Socket configuration

The Socket configuration class

public class WebSocketConfig {

    private Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    @Value("${wss.server.host}")
    private String host;

    @Value("${wss.server.port}")
    private Integer port;

    @Value("${redis.passwd}")
    private String redisPasswd;

    @Value("${redis.address}")
    private String redisAddress;

    @Bean
    public PubSubStore pubSubStore(a) {
        return socketIOServer().getConfiguration().getStoreFactory().pubSubStore();
    }

    @Bean
    public SocketIOServer socketIOServer(a) {


        Config redissonConfig = new Config();
      	// Higher versions require redis:// prefix
      redissonConfig.useSingleServer().setPassword("xxx").setAddress("redis://xxx:xx").setDatabase();

        RedissonClient redisson = Redisson.create(redissonConfig);
        RedissonStoreFactory redisStoreFactory = new RedissonStoreFactory(redisson);


        Configuration config = new Configuration();
        config.setHostname(host);
        config.setPort(port);
        config.setOrigin(origin);
        config.setHttpCompression(false);
        config.setWebsocketCompression(false);

        config.setStoreFactory(redisStoreFactory);

        // Note that if you open cross-domain Settings, you need to set it to null instead of "*".
        config.setOrigin(null);
        // Timeout period for protocol upgrade (ms). The default value is 10000. Update HTTP handshake to WS timeout
        config.setUpgradeTimeout(10000);
        // Ping message interval (ms), default 25000. The interval between a client sending a heartbeat message to the server
        config.setPingInterval(25000);
        Ping message timeout (ms). The default value is 60000. If no heartbeat message is received within this interval, a timeout event will be sent
        config.setPingTimeout(60000);

        /** all methods */ must be overridden
        config.setExceptionListener(new ExceptionListener(){
            @Override
            public void onConnectException(Exception e, SocketIOClient client) {
                ResponseMessage error = ResponseMessage.error(-1."Abnormal connection!");
                client.sendEvent("exception", JSON.toJSON(new Response<String>(error, "Abnormal connection!")));
            }
            @Override
            public void onDisconnectException(Exception e, SocketIOClient client) {
                ResponseMessage error = ResponseMessage.error(-1."Disconnect exception!");
                client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "Abnormal connection!")));
            }
            @Override
            public void onEventException(Exception e, List<Object> data, SocketIOClient client) {
                ResponseMessage error = ResponseMessage.error(-1."Server exception!");
                client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "Abnormal connection!")));
            }
            @Override
            public void onPingException(Exception e, SocketIOClient client) {
                ResponseMessage error = ResponseMessage.error(-1."PING timeout exception!");
                client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "PING timeout exception!")));
            }
            @Override
            public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
                return false; }});// Similar to filter Settings, not handled here
       config.setAuthorizationListener(data -> {
// // You can use the following code to obtain user password information
// String appId = data.getSingleUrlParam("appId");
// String source = data.getSingleUrlParam("source");
// log.info("token {}, client {}", appId, source);
            return true;
        });

        return new SocketIOServer(config);
    }

    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return newSpringAnnotationScanner(socketServer); }}Copy the code

Socket to start the class

@Log4j2
@Component
@Order(value=1)
public class ServerRunner implements CommandLineRunner {

    private final SocketIOServer server;


    @Autowired
    public ServerRunner(SocketIOServer server) {
        this.server = server;
    }

    @Override
    public void run(String... args) throws Exception {
        server.start();
        log.info("Socket. IO started successfully!); }}Copy the code

The final architecture

The implementation process

As a KafKa consumer, the data producer pushes the processed data to KafKa, and the consumer monitors the message and broadcasts it to the client. When pushing, the user’s personalized Settings are queried in the database and only the messages that the client chooses to accept are pushed.

Since the active push service is deployed with multiple nodes, which are assigned to the same KafKa consumption group, this can cause the problem that multiple nodes consume only part of the total message. Here, Redis’ publish/subscribe mechanism is used to solve this problem: after each node consumes the message, after the message is published, other nodes subscribe to the Topic and send the message to the clients connected on their nodes, where each node is both the publisher and the subscriber.

From data generation to consumption

Use Redisson’s Topic for distributed publish/subscribe

To facilitate the use of the publish/subscribe mechanism in Redis, Redisson packages it as a Topic and provides code-level publish/subscribe operations so that multiple JVM processes connected to Redis (single machine/cluster) can implement a Topic published in a single JVM process. Messages are received in time in other JVM processes that have subscribed to the topic.

After Netty-Socketio integrated Redisson, a publish/subscribe mechanism was also used internally

Release of information

public void sendMessageToAllClient(String eventType, String message, String desc) {
    Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
    for(final SocketIOClient client : clients){
      // Do Somthing
    }

    Packet packet = new Packet(PacketType.MESSAGE);
    packet.setData(new BroadcastMessage(message, eventType, desc));
    publishMessage(packet);
}

private void publishMessage(Packet packet) {
    DispatchMessage dispatchMessage = new DispatchMessage("", packet, "");
    pubSubStore.publish(PubSubType.DISPATCH, dispatchMessage);
    BroadcastMessage broadcastMessage = dispatchMessage.getPacket().getData();

}
Copy the code
Subscription of messages
@PostConstruct
public void init(a) {
  pubSubStore.subscribe(PubSubType.DISPATCH, dispatchMessage -> {
      BroadcastMessage messageData = dispatchMessage.getPacket().getData();
    
      Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();

      for(final SocketIOClient client : clients){
        // DO Somthing
      }, DispatchMessage.class);
}
Copy the code