Introduction to Message Push

Real-time message push enables users to obtain relevant information in time, improve work processing efficiency and get better interactive experience. Application scenarios of message push are as follows:

  • news
  • Friend information
  • To remind
  • Submission Success Message

The integrated message push module in the project can use the existing message architecture of the third party, refer to the previous article: # EUreka! This is the message push ah can also be developed by using the underlying protocol push, the following is a simple example of their own implementation, using netty-Socketio framework, native development of the message push function.

netty-socketio

Netty-socketio is a Java implementation of an open source socket. IO server. It is based on the Netty framework and can be used to push messages to clients. When it comes to server-side push technology, WebSocket is generally involved. WebSocket is the latest specification of HTML5. Although it has been supported by mainstream browsers, there may still be incompatibility. SocketIO encapsulates Websockets, AJAX, and other communication methods into a unified communication interface, which means you don’t have to worry about compatibility when using SocketIO, and the underlying layer automatically selects the best communication method.

Github address: github.com/mrniko/nett…

Netty-socketio connection flowchart:

The service side

The following is a simple example of integrating netty-Socketio using springBoot:

Introducing Maven dependencies
<dependency>
     <groupId>com.corundumstudio.socketio</groupId>
     <artifactId>netty-socketio</artifactId>
     <version>1.7.18</version>
</dependency>
Copy the code
Push the code
  • User Connection Authentication
@Slf4j
@Component
public class NettySocketIOAuthorizationListener implements AuthorizationListener{
    @Autowired
    private AuthenticationService authenticationService;

    @Override
    public boolean isAuthorized(HandshakeData handshakeData) {
        log.info("SocketIO authentication starts, pass in the parameter: {}", JSON.toJSONString(handshakeData));
        log.info("SocketIO address: {}", handshakeData.getUrl());

        // Get authentication parameters
        String userId = StringUtils.defaultString(handshakeData.getHttpHeaders().get("user_id"), handshakeData.getSingleUrlParam("user_id"))
        boolean authResult = true;
       
        try {
            String userName = authenticationService.authenticate(userId);
            if(StringUtils.isEmpty(userName)){
                log.info("SocketIO authentication failed: userId[{}] was not authenticated", userId);
                authResult = false; }}catch (Exception e) {
            log.error("SocketIO authentication failed: userId[{}] authentication failed, {}", userId, e.getMessage());
            authResult = false;
        }
       
        log.info("SocketIO certification ended");
        returnauthResult; }}Copy the code
  • Connection processor

/** * connect to the processor */
@Slf4j
@Component
public class ConnectHandler {
    private final SocketIONamespace socketIONamespace;


    /** * Automatically injects SockerIOServer *@param socketIOServer
     */
    @Autowired
    public ConnectHandler(SocketIOServer socketIOServer) {
        this.socketIONamespace = socketIOServer.getNamespace("test");
        this.socketIONamespace.addConnectListener(onConnect());
        this.socketIONamespace.addDisconnectListener(onDisconnect());
        socketIOServer.addPingListener(onPing());
    }

    private PingListener onPing(a) {
        return client -> {
            log.info("Heartbeat processing initiated.");
            
        };
    }

   
    /** * Establish a connection with userId */
    private void userIdConnect(SocketIOClient socketIOClient, String sessionId, String userId,String roomId){

        String userId = connectBean.getUserId();
        String userName = connectBean.getUserName();
        ConnectContentResponseModel connectContentResponseModel = new ConnectContentResponseModel(userName, connectBean.getImageUrl());
        try {
            ssaxConnectService.connect(socketIOClient, connectBean);
            ConnectResponseModel connectResponseModel = new ConnectResponseModel(200."Connection established successfully.", connectContentResponseModel);
            if(StringUtils.isNotEmpty(roomId)){
                socketIOClient.joinRoom(roomId);
                connectResponseModel.setRoomId(roomId);
            }
            log.info("Send wire response to client [{}] user [{}] : {}", sessionId, userId, JSONObject.toJSONString(connectResponseModel));
            socketIOClient.sendEvent(ConnectResponseModel.EVENT, connectResponseModel);
            log.info("Open_id [{}] client [{}] user [{}] connected successfully: username [{}]", openId, sessionId, userId, userName);
        } catch (Exception e) {
            ConnectResponseModel connectResponseModel = new ConnectResponseModel(401."Failed to establish a connection", connectContentResponseModel);
            if(StringUtils.isNotEmpty(roomId)){
                connectResponseModel.setRoomId(roomId);
            }
            log.info("Send wire response to client [{}] user [{}] : {}", sessionId, userId, JSONObject.toJSONString(connectResponseModel));
            socketIOClient.sendEvent(ConnectResponseModel.EVENT, connectResponseModel);
            log.error("Open_id [{}] client [{}] user [{}] connection failed: {}", openId, sessionId, userId, e.getMessage());
        }
        log.info("SocketIO connection terminated.");
    }

    /** Socketio connection authentication **/
    private ConnectListener onConnect(a) {
        return client -> {
            log.info("SocketIO connection begins.");
            HandshakeData handshakeData = client.getHandshakeData();
            String sessionId = client.getSessionId().toString();
            log.info("Client [{}] wire [{}], pass in: {}", sessionId, handshakeData.getUrl(), JSONObject.toJSONString(handshakeData));
            // Get authentication parameters
            String userId = HandshakeDataUtil.getParam(handshakeData, "token_id");
            String roomId = HandshakeDataUtil.getParam(handshakeData, "room_id");
            userIdConnect(client, sessionId, userId, roomId);
        };
    }

    /** Disconnection listener, record disconnection log **/
    private DisconnectListener onDisconnect(a) {
        return client -> {
            log.info("SocketIO disconnection begins.");
            HandshakeData handshakeData = client.getHandshakeData();
            String sessionId = client.getSessionId().toString();
            log.info("Client [{}] disconnects [{}], passing in the argument: {}", sessionId, handshakeData.getUrl(), JSONObject.toJSONString(handshakeData));
            client.disconnect();
            log.info("SocketIO disconnection is over."); }; }}Copy the code
  • Push message method
public void onMessage(MessageModel messageModel) {
  Collection<SocketIOClient> socketIOClientCollection = socketIOServer.getNamespace("/"+appId).getRoomOperations("test").getClients();
  
  for(SocketIOClient socketIOClient : socketIOClientCollection){					
      socketIOClient.sendEvent("event_push".new AckCallback<>() {
      	@Override
      	public void onSuccess(Object obj) {
      		log.info("SocketIOClient onSuccess Status:{}",obj.getStatus()); } }, messageModel); }}Copy the code
  • Push message entity
@Getter
@Setter
public class MessageModel implements Serializable {
	private static final long serialVersionUID = -1L;

	/ / push id
	private String pushId;

	/ / message id
	private String msgId;

	// Load protocol
	private JSONObject payloadProtocol;

	/ / load
	private JSONObject payload;

	// Message title
	private String title;

	// Message content
	private String content;

	/ / picture
	private String pic;

	// Resource locator
	private String uri;

	// Push type
	private String type;

	// options
	private JSONObject options;
}

Copy the code

Simulated client

  • The real client can be a Web client, Android client, or IOS client. After receiving socketIO messages, the client displays them in a pop-up window.
  • The client tests the code
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException {
   String url = "http://localhost:9092/test? user_id=112344094&room_id=test";
   final Socket socket = IO.socket(url, options);

   // Network event
   socket.on(Socket.EVENT_DISCONNECT, arg -> System.out.println("disconnect"));
   socket.on(Socket.EVENT_ERROR, arg -> System.out.println("error"));
   socket.on(Socket.EVENT_CONNECT, arg -> System.out.println("connected"));


   //SIO connection succeeded
   socket.on("event_connect", args -> {
       System.out.println("Server: Connection success event:" + args[0]); // It is of type org.json.jsonObject
       JSONObject roomObject = new JSONObject();
       roomObject.put("roomId"."test");
       roomObject.put("eventType"."1");
       socket.emit("event_room", roomObject);
   });


   socket.on("event_push", args -> {
       System.out.println("Server: received event_push:" + args[0]);
   });
}
Copy the code

Test send receive

  • Start the server first
  • Start the client and connect to the server
  • The server sends the message
  • The client receives the message and prints it

Print the following:

Server: successful connection event: event_connect Server: received event_push: {"pushId":"61707444881e"."payloadProtocol": {"type": 5},"payload": {},"options": {},"msgId":"dd26fa76-14f4-4ccd-202110201643"."title":"new test wwww"."type":"bar"."uri":""."content":"new test wwww"}
Copy the code

End

This paper introduces netty-Socketio through Springboot to achieve a small message push function, using the native Netty-Socketio, for convenience and simplicity can use annotations. In the test client, a message can be received as a success. There is no business logic in this example, and there is more business logic to consider in a real push case, such as long connection handling, message type differentiation, push failure retry mechanism, and so on.

reference

  • Socket. IO/docs/v4 / roo…