demand

The Kafka service is available. Kafka service data (GPS) is deployed to a local disk (stored as a text file). Now we want to implement a real-time vehicle map based on Echarts.

Analysis of the

  1. Front-end real-time display: Websocket technology is used to achieve server-side data push to the front-end display
  2. Data is obtained through Java’s Kafka client and pushed to the front end through WebSock.

websocket

Introduction to the

Websocket is a protocol that HTML5 started to provide for full duplex communication over a unit TCP connection. In the WebSocket API, the browser and the server only need to shake hands once, and then the browser and the server form a fast channel. Data can be transmitted between the two.

The development of

  • The server side
  package com.ykkj.weiyi.socket;
  import org.springframework.stereotype.Component;
  import org.springframework.web.socket.server.standard.SpringConfigurator;
  import javax.websocket.*;
  import javax.websocket.server.ServerEndpoint;
  import java.io.IOException;
  import java.util.concurrent.CopyOnWriteArraySet;

  / * * *@ServerEndpointAn annotation is a class-level annotation that defines the current class as a WebSocket server. * The value of the annotation will be used to listen for the user's connection to the terminal access URL, which the client can use to connect to the WebSocket server */
  @ServerEndpoint(value = "/websocket")
  public class CommodityServer {
      // Static variable, used to record the current number of online connections. It should be designed to be thread-safe.
      private static int onlineCount = 0;

      // A thread-safe Set for a concurrent package, used to hold each client's corresponding MyWebSocket object. To enable the server to communicate with a single client, a Map can be used to store the Key as the user identity
      public static CopyOnWriteArraySet<CommodityServer> webSocketSet = new CopyOnWriteArraySet<CommodityServer>();

      // A connection session with a client through which to send data to the client
      private Session session;

      /** * The connection was successfully established to call the method **@paramSession Optional parameter. Session Is a connection session with a client through which data is sent to the client */
      @OnOpen
      public void onOpen(Session session) {
          this.session = session;
          webSocketSet.add(this);     // add to set
          addOnlineCount();           // The number of lines increases by 1
          System.out.println("New connection added! The number of current online users is" + getOnlineCount());
      }

      /** * the connection closes the called method */
      @OnClose
      public void onClose(a) {
          webSocketSet.remove(this);  // Delete from set
          subOnlineCount();           // The number of lines is reduced by 1
          System.out.println("There's a connection down! The number of current online users is" + getOnlineCount());
      }

      /** * The method called after receiving the client message **@paramMessage Indicates the message sent by the client@paramSession Optional parameter */
      @OnMessage
      public void onMessage(String message, Session session) {
          System.out.println("Message from client :" + message);
          // Group message
          for (CommodityServer item : webSocketSet) {
              try {
                  item.sendMessage(message);
              } catch (IOException e) {
                  e.printStackTrace();
                  continue; }}}/** ** is called when an error occurs@param session
       * @param error
       */
      @OnError
      public void onError(Session session, Throwable error) {
          System.out.println("Error occurred");
          error.printStackTrace();
      }

      /** * this method is different from the above methods. There are no annotations. You add methods as needed. * *@param message
       * @throws IOException
       */
      public void sendMessage(String message) throws IOException {
          this.session.getBasicRemote().sendText(message);
          //this.session.getAsyncRemote().sendText(message);
      }

      public static synchronized int getOnlineCount(a) {
          return onlineCount;
      }

      public static synchronized void addOnlineCount(a) {
          CommodityServer.onlineCount++;
      }

      public static synchronized void subOnlineCount(a) { CommodityServer.onlineCount--; }}Copy the code

  • The front end

    
            
    <html>
    <head>
        <title>Tomcat implementation of Java backend WebSocket</title>
    </head>
    <body>
    Welcome<br/><input id="text" type="text"/>
    <button onclick="send()">Send a message</button>
    <hr/>
    <button onclick="closeWebSocket()">Close the WebSocket connection</button>
    <hr/>
    <div id="message"></div>
    </body>
    
    <script type="text/javascript">
        var websocket = null;
        // Check whether the current browser supports WebSocket
        if ('WebSocket' in window) {
            websocket = new WebSocket("ws://localhost:8081/onepic/websocket");
        }
        else {
            alert('Current browser Not support websocket')}// Connection error callback method
        websocket.onerror = function () {
            setMessageInnerHTML("WebSocket connection error");
        };
    
        // The callback method for successfully establishing the connection
        websocket.onopen = function () {
            setMessageInnerHTML("WebSocket connection successful");
        }
    
        // The callback method that received the message
        websocket.onmessage = function (event) {
            setMessageInnerHTML(event.data);
        }
    
        // A callback method to close the connection
        websocket.onclose = function () {
            setMessageInnerHTML("WebSocket connection closed");
        }
    
        // Listen for window closing events, when the window is closed, actively close websocket connection, to prevent the connection is not closed, the server will throw exceptions.
        window.onbeforeunload = function () {
            closeWebSocket();
        }
    
        // Displays the message on the web page
        function setMessageInnerHTML(innerHTML) {
            document.getElementById('message').innerHTML += innerHTML + '<br/>';
        }
    
        // Close the WebSocket connection
        function closeWebSocket() {
            websocket.close();
        }
    
        // Send a message
        function send() {
            var message = document.getElementById('text').value;
            websocket.send(message);
        }
    </script>
    </html>
    Copy the code

test

Pay attention to the point

  1. The webSocketSet is set to a global static variable that provides calls to other classes

    public static CopyOnWriteArraySet<CommodityServer> webSocketSet = new CopyOnWriteArraySet<CommodityServer>();
    Copy the code
  2. The server side is implemented with annotations @serverendpoint @onOpen @onclose @onMessage @onError

  3. Tomcat7.0.47 and later support websocket1.0

  4. Add JAR support to POM

    <dependency>
        <groupId>javax</groupId>
        <artifactId>javaee-api</artifactId>
        <version>7.0</version>
        <scope>provided</scope>
    </dependency>
    Copy the code

Kafka

Introduction to the

Kafka is a distributed, partitioned, and replicable messaging system.

The development of

Please refer to my notes on Kafka Environment Setup (Windows)

  • kafka client for java
package com.ykkj.weiyi.socket;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import static com.ykkj.weiyi.socket.CommodityServer.webSocketSet;

public class ConsumerKafka extends Thread {
    private KafkaConsumer<String, String> consumer;
    private String topic = "test.topic";
    public ConsumerKafka(a) {}@Override
    public void run(a) {
        // Load the kafka consumer parameter
        Properties props = new Properties();
        props.put("bootstrap.servers"."localhost:9092");
        props.put("group.id"."ytna");
        props.put("enable.auto.commit"."true");
        props.put("auto.commit.interval.ms"."1000");
        props.put("session.timeout.ms"."15000");
        props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        // Create a consumer object
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(this.topic));
        // An endless loop of kafka consumption
        while (true) {
            try {
                // Consume data and set the timeout
                ConsumerRecords<String, String> records = consumer.poll(100);
                //Consumer message
                for (ConsumerRecord<String, String> record : records) {
                    //Send message to every client
                    for(CommodityServer webSocket : webSocketSet) { webSocket.sendMessage(record.value()); }}}catch (IOException e) {
                System.out.println(e.getMessage());
                continue; }}}public void close(a) {
        try {
            consumer.close();
        } catch(Exception e) { System.out.println(e.getMessage()); }}// For testing purposes, if tomcat is used to start the thread, use another method to start it
    public static void main(String[] args) {
        ConsumerKafka consumerKafka = newConsumerKafka(); consumerKafka.start(); }}Copy the code

Note the topic and bootstrap.Servers configuration

  • Call the class
  package com.ykkj.weiyi.socket;

  public class RunThread {
      public RunThread(a) {
          ConsumerKafka kafka = newConsumerKafka(); kafka.start(); }}Copy the code
  • The web.xml configuration
 <listener>
	<listener-class>com.ykkj.weiyi.socket.RunThread</listener-class>
 </listener>
Copy the code

test

Pay attention to the point

  • ConsumerKafka needs to configure listening in web.xml, otherwise it cannot fetch the webSocketSet variable in the ConsumerKafka class
  • Reference the webSocketSet variable methodimport static com.ykkj.weiyi.socket.CommodityServer.webSocketSet;
  • Note the topic and bootstrap.Servers configuration

conclusion

This is just a technical validation, not a real implementation. In a real scenario, kafka would need to clean the data that does not comply with the current specifications, and then assemble the data in the format required for the front-end presentation.

Refer to online materials

https://blog.csdn.net/lw_ghy/article/details/73252904

https://blog.csdn.net/liu857279611/article/details/70157012