Learning is endless, and you are encouraged.

use

  • Obtain the latest data on the server in real time
  • View the progress and execution status of a scheduling task
  • User awareness: After data modification, related users receive information
  • Improve user experience: asynchronous processing of time-consuming services (Excel import and export, complex calculation)

The front-end polling

This method is simple to implement. The front end periodically requests the interface through setInterval to obtain the latest data. This method can be used when the real-time requirement is not high and the update frequency is low. However, when the real-time performance is very high, our requests will be very frequent and the server consumption will be very large. Moreover, the data on the server may not be changed at the time of each request, resulting in many requests are meaningless.

    setInterval(function ({

            // Request an interface operation

            / /...

        },

        3000

    );

Copy the code

webSocket

WebSocket is based on TCP protocol. It is full-duplex communication. The server can send information to the client, and the client can also send instructions to the server.

pom.xml

SpringBoot provides a starter for websockets

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-websocket</artifactId>

        </dependency>

Copy the code

The config class

Inject ServerEndpointExporter, and this bean will automatically register the Websocket endpoint declared with the @ServerEndpoint annotation

@Configuration

public class WebSocketConfig {

    @Bean

    public ServerEndpointExporter serverEndpointExporter(a) {

        return new ServerEndpointExporter();

    }

}

Copy the code

The server class

Create a service class:

  • add@ServerEndpointAnnotation to set the service address of the WebSocket connection point.
  • createAtomicIntegerUsed to record the number of connections
  • createConcurrentHashMapUsed to store connection information
  • @OnOpenThe annotation indicates that the method is called after the connection has been established
  • @OnCloseThe annotation indicates that the method is called after the connection is disconnected
  • @OnErrorThe annotation indicates that the method is being called with a connection exception
  • @OnMessageThe annotation indicates that the method is called after receiving a client message
  • Create a way to push messages
  • Create a method to remove the connection
@ServerEndpoint("/websocket/{userId}")

@Component

public class WebSocketServer {



    private final static Logger logger = LoggerFactory.getLogger(WebSocketServer.class);



    / * *

* Number of current connections

* /


    private static AtomicInteger count = new AtomicInteger(0);



    / * *

* Use the map object to get the corresponding WebSocket according to the userId, or put it in redis

* /


    private static Map<String, WebSocketServer> websocketMap = new ConcurrentHashMap<>();



    / * *

* A connection session with a client through which to send data to the client

* /


    private Session session;



    / * *

* The corresponding user ID

* /


    private String userId = "";



    / * *

* Method successfully called to establish the connection

* /


    @OnOpen

    public void onOpen(Session session, @PathParam("userId") String userId) {

        try {

            this.session = session;

            this.userId = userId;

            websocketMap.put(userId, this);

            / / the number of + 1

            count.getAndIncrement();

            logger.info("Websocket new connection: {}", userId);

        } catch (Exception e) {

            logger.error("I/O exception for new WebSocket connection");

        }

    }



    / * *

* The connection closes the method called

* /


    @OnClose

    public void onClose(a) {

        / / delete

        websocketMap.remove(this.userId);

        / / the number 1

        count.getAndDecrement();

        logger.info("close websocket : {}".this.userId);

    }



    / * *

* Method that is invoked after receiving a client message

     *

     * @paramMessage Indicates the message sent by the client

* /


    @OnMessage

    public void onMessage(String message) {

        logger.info("Message from client {} :{}".this.userId, message);

    }



    @OnError

    public void onError(Throwable error) {

        logger.info("Websocket error, remove current webSocket :{},err:{}".this.userId, error.getMessage());

        websocketMap.remove(this.userId);

        / / the number 1

        count.getAndDecrement();

    }



    / * *

* Send messages (asynchronously)

     *

     * @paramMessage Message subject

* /


    private void sendMessage(String message) {

        this.session.getAsyncRemote().sendText(message);

    }



    / * *

* Sends information to the specified user

     *

     * @paramUserId user id

     * @paramWsInfo information

* /


    public static void sendInfo(String userId, String wsInfo) {

        if (websocketMap.containsKey(userId)) {

            websocketMap.get(userId).sendMessage(wsInfo);

        }

    }



    / * *

* Group messaging

* /


    public static void batchSendInfo(String wsInfo, List<String> ids) {

        ids.forEach(userId -> sendInfo(userId, wsInfo));

    }



    / * *

* Group everyone

* /


    public static void batchSendInfo(String wsInfo) {

        websocketMap.forEach((k, v) -> v.sendMessage(wsInfo));

    }



    / * *

* Get the current connection information

* /


    public static List<String> getIds(a) {

        return new ArrayList<>(websocketMap.keySet());

    }



    / * *

* Gets the current number of connections

* /


    public static int getUserCount(a) {

        return count.intValue();

    }

}

Copy the code

The test interface

@RestController

@RequestMapping("/ws")

public class WebSocketController {



    @GetMapping("/push/{message}")

    public ResponseEntity<String> push(@PathVariable(name = "message") String message) {

        WebSocketServer.batchSendInfo(message);

        return ResponseEntity.ok("WebSocket pushes messages to everyone.");

    }



}

Copy the code

html

Create ws.html under Resources /static and set the WebSocket address to the address configured in the @Serverendpoint annotation in the service class


       

<html lang="en">

<head>

    <meta charset="UTF-8">

    <title>WebSocket</title>

</head>



<body>

<div id="message"></div>

</body>



<script>

    let websocket = null;



    // Simulate a log-in user with a timestamp

    const username = new Date().getTime();

    // alert(username)

    // Check whether the current browser supports WebSocket

    if ('WebSocket' in window) {

        console.log("Browser support Websocket");

        websocket = new WebSocket('ws://localhost:8080/websocket/' + username);

    } else {

        alert('Current browser does not support Websocket');

    }



    // Connection error callback method

    websocket.onerror = function ({

        setMessageInnerHTML("WebSocket connection error");

    };



    // The callback method for successfully establishing the connection

    websocket.onopen = function ({

        setMessageInnerHTML("WebSocket connection successful");

    };



    // The callback method that received the message

    websocket.onmessage = function (event{

        setMessageInnerHTML(event.data);

    };



    // A callback method to close the connection

    websocket.onclose = function ({

        setMessageInnerHTML("WebSocket connection closed");

    };



    // Listen for window closing events, when the window is closed, actively close websocket connection, to prevent the connection is not closed, the server will throw exceptions.

    window.onbeforeunload = function ({

        closeWebSocket();

    };



    // Close the WebSocket connection

    function closeWebSocket({

        websocket.close();

    }



    // Displays the message on the web page

    function setMessageInnerHTML(innerHTML{

        document.getElementById('message').innerHTML += innerHTML + '<br/>';

    }

</script>

</html>

Copy the code

test

Start the project, visit http://localhost:8080/ws.html, open the connection. Call news push interface http://localhost:8080/ws/push/hello, view the web page display information.

SseEmitter

SseEmitter is a technology provided by SpringMVC(4.2+). It is based on the Http protocol. It is lighter than WebSocket, but it can only send information one-way from the server to the client. We don’t need to reference other jars to use in SpringBoot.

Creating a Service Class

  • createAtomicIntegerUsed to record the number of connections
  • createConcurrentHashMapUsed to store connection information
  • Establish a connection: Creates and returns one with a timeoutSseEmitterTo the front. If the timeout is set to 0, it will never expire
  • Sets the connection end callback methodcompletionCallBack
  • Sets the callback method for connection timeouttimeoutCallBack
  • Set the callback method for connection exceptionserrorCallBack
  • Create a way to push messagesSseEmitter.send()
  • Create a method to remove the connection
public class SseEmitterServer {



    private static final Logger logger = LoggerFactory.getLogger(SseEmitterServer.class);



    / * *

* Number of current connections

* /


    private static AtomicInteger count = new AtomicInteger(0);



    / * *

* Use the map object to get the corresponding SseEmitter according to the userId, or put it in redis

* /


    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();



    / * *

* Create a user connection and return SseEmitter

     *

     * @paramUserId user ID

     * @return SseEmitter

* /


    public static SseEmitter connect(String userId) {

        // Set the timeout period. 0 indicates that the timeout period does not expire. For 30 seconds, by default over time will throw an exception: unfinished AsyncRequestTimeoutException

        SseEmitter sseEmitter = new SseEmitter(0L);

        // Register a callback

        sseEmitter.onCompletion(completionCallBack(userId));

        sseEmitter.onError(errorCallBack(userId));

        sseEmitter.onTimeout(timeoutCallBack(userId));

        sseEmitterMap.put(userId, sseEmitter);

        / / the number of + 1

        count.getAndIncrement();

        logger.info("Create a new SSE connection, current user: {}", userId);

        return sseEmitter;

    }



    / * *

* Sends a message to the specified user

* /


    public static void sendMessage(String userId, String message) {

        if (sseEmitterMap.containsKey(userId)) {

            try {

                // sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);

                sseEmitterMap.get(userId).send(message);

            } catch (IOException e) {

                logger.error("User [{}] push exception :{}", userId, e.getMessage());

                removeUser(userId);

            }

        }

    }



    / * *

* Group messaging

* /


    public static void batchSendMessage(String wsInfo, List<String> ids) {

        ids.forEach(userId -> sendMessage(wsInfo, userId));

    }



    / * *

* Group everyone

* /


    public static void batchSendMessage(String wsInfo) {

        sseEmitterMap.forEach((k, v) -> {

            try {

                v.send(wsInfo, MediaType.APPLICATION_JSON);

            } catch (IOException e) {

                logger.error("User [{}] push exception :{}", k, e.getMessage());

                removeUser(k);

            }

        });

    }



    / * *

* Remove user connection

* /


    public static void removeUser(String userId) {

        sseEmitterMap.remove(userId);

        / / the number 1

        count.getAndDecrement();

        logger.info("Remove user: {}", userId);

    }



    / * *

* Get the current connection information

* /


    public static List<String> getIds(a) {

        return new ArrayList<>(sseEmitterMap.keySet());

    }



    / * *

* Gets the current number of connections

* /


    public static int getUserCount(a) {

        return count.intValue();

    }



    private static Runnable completionCallBack(String userId) {

        return() - > {

            logger.info("End connection: {}", userId);

            removeUser(userId);

        };

    }



    private static Runnable timeoutCallBack(String userId) {

        return() - > {

            logger.info("Connection timeout: {}", userId);

            removeUser(userId);

        };

    }



    private static Consumer<Throwable> errorCallBack(String userId) {

        return throwable -> {

            logger.info("Connection exception: {}", userId);

            removeUser(userId);

        };

    }



}

Copy the code

The test interface

@RestController

@RequestMapping("/sse")

public class SseEmitterController {

    / * *

* Used to create connections

* /


    @GetMapping("/connect/{userId}")

    public SseEmitter connect(@PathVariable String userId) {

        return SseEmitterServer.connect(userId);

    }



    @GetMapping("/push/{message}")

    public ResponseEntity<String> push(@PathVariable(name = "message") String message) {

        SseEmitterServer.batchSendMessage(message);

        return ResponseEntity.ok("WebSocket pushes messages to everyone.");

    }



}

Copy the code

html

Create ws.html under Resources /static and set the address of the EventSource to the address where the connection is created


       

<html lang="en">

<head>

    <meta charset="UTF-8">

    <title>SseEmitter</title>

</head>

<body>

<button onclick="closeSse()">Close the connection</button>

<div id="message"></div>

</body>

<script>

    let source = null;



    // Simulate a log-in user with a timestamp

    const userId = new Date().getTime();



    if(!!!!!window.EventSource) {



        // Establish a connection

        source = new EventSource('http://localhost:8080/sse/connect/' + userId);



        / * *

* Once the connection is established, the open event is triggered

Onopen = function (event) {}

* /


        source.addEventListener('open'.function (e{

            setMessageInnerHTML("Establish a connection...");

        }, false);



        / * *

* The client receives data from the server

Onmessage = function (event) {}

* /


        source.addEventListener('message'.function (e{

            setMessageInnerHTML(e.data);

        });





        / * *

* If a communication error occurs (such as a broken connection), an error event is raised

* or:

Onerror = function (event) {}

* /


        source.addEventListener('error'.function (e{

            if (e.readyState === EventSource.CLOSED) {

                setMessageInnerHTML("Connection closed");

            } else {

                console.log(e);

            }

        }, false);



    } else {

        setMessageInnerHTML("Your browser does not support SSE");

    }



    // Listen for window closing events and actively close sse connections. If the server is set to never expire, manually clean server data after the browser is closed

    window.onbeforeunload = function ({

        closeSse();

    };



    // Close Sse connection

    function closeSse({

        source.close();

        const httpRequest = new XMLHttpRequest();

        httpRequest.open('GET'.'http://localhost:8080/sse/close/' + userId, true);

        httpRequest.send();

        console.log("close");

    }



    // Displays the message on the web page

    function setMessageInnerHTML(innerHTML{

        document.getElementById('message').innerHTML += innerHTML + '<br/>';

    }

</script>

</html>

Copy the code

test

Start the project, access web page http://localhost:8080/sse.html connection is established. Send information call interface http://localhost:8080/sse/push/hello, view the web page display information.

Access to the source code

All codes are uploaded to Github for easy access

>>>>>> Message push WebSocket and SseEmitter <<<<<<

Daily for praise

Creation is not easy, if you feel helpful, please support

Please focus on