Introduce a,

Soul Uses soul-admin to synchronize user and access service data to the local cache of the gateway service. There are currently four methods of data synchronization. This week we will learn about these four methods, and the last day we will explore how the high availability of the gateway service is implemented.

  • websocket
  • zookeeper
  • HTTP long training in rotation
  • nacos

For a taste of webSocket data synchronization today, post an image from Soul’s official website.

What is websocket?

Excerpt from Liao Xuefeng’s official website

WebSocket is a long link technology based on HTTP. The traditional HTTP protocol is a request-response model. If the browser does not send the request, the server cannot actively push data to the browser. If you need to periodically push data to the browser, such as stock quotes, or irregularly push data to the browser, such as online chat, based on HTTP protocol to achieve such requirements, you can only rely on the browser’S JavaScript timed polling, which is inefficient and real-time.

HTTP is based on TCP connections, so WebSocket does a simple upgrade on the HTTP protocol. After establishing a TCP connection, the browser sends a request with several headers:

GET /chat HTTP/1.1 
Host: www.example.com
Upgrade: websocket
Connection: Upgrade
Copy the code

If the client wants to upgrade the connection to a long-connection WebSocket, the server returns a successful update:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Copy the code

After receiving a successful response, the WebSocket “shakes hands” successfully. In this way, the TCP connection representing the WebSocket will not be closed by the server, but will remain. The server can push messages to the browser at any time, and the browser can push messages to the server at any time. Messages pushed by both parties can be text messages or binary messages. Generally speaking, most applications push JSON-based text messages.

WebSocket data synchronization

Today we will not demonstrate the operation, through the code to see how to achieve WebSocket mode data synchronization

Soul-admin is the server and gateway is the client of WebSocket communication. Then, you can see that Spring event-driven mode is used to publish listening events before calling WebSocket for communication.

1. Listen for events from the server and look down

Our class from the middle tier event listeners DataChangedEventDispatcher entrance, first from the top-down walk again.

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

    private ApplicationContext applicationContext;

    private List<DataChangedListener> listeners;

    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: "+ event.getGroupKey()); }}}@Override
    public void afterPropertiesSet(a) {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(newArrayList<>(listenerBeans)); }}Copy the code

You can see that the events that you want to synchronize — that is, data — have five classes (as the name suggests) :

  • APP_AUTH permissions
  • The PLUGIN plug-in
  • The SELECTOR SELECTOR
  • RULE Routing RULE
  • META_DATA metadata

We don’t care about business first, focus on communication, it can be seen that DataChangedListener is specific work, look at some of the implementation of this interface have a called WebsocketDataChangedListener class in the class

public class WebsocketDataChangedListener implements DataChangedListener {

    @Override
    public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
        WebsocketData<PluginData> websocketData =
                new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
    }

    @Override
    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
        WebsocketData<SelectorData> websocketData =
                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
    }

    @Override
    public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
        WebsocketData<RuleData> configData =
                new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
    }

    @Override
    public void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) {
        WebsocketData<AppAuthData> configData =
                new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
    }

    @Override
    public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
        WebsocketData<MetaData> configData =
                newWebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType); }}Copy the code

You can see it’s using Websocket Collator #send

    public static void send(final String message, final DataEventTypeEnum type) {
        if (StringUtils.isNotBlank(message)) {
            if (DataEventTypeEnum.MYSELF == type) {
                try {
                    Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
                    if(session ! =null) { session.getBasicRemote().sendText(message); }}catch (IOException e) {
                    log.error("websocket send result is exception: ", e);
                }
                return;
            }
            for (Session session : SESSION_SET) {
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    log.error("websocket send result is exception: ", e); }}}}Copy the code

Although I haven’t written Java WebSocket code, it’s not hard to guess that after the client and server establish a link, the server stores the session list and maintains the relationship. Send data via sesson.

2. Look up from the server event publisher

The events of the Spring framework class ApplicationEventPublisher, take a look at what the caller.

How to view the call method, MAC is CTRL + Option + H, Windows should be similar

You can see that there are many, many calling classes, who will call, that is, who will publish the event? In order to update the data, guess two cases need to update the data.”

  • soul-adminThe management console needs to notify the gateway service of data updates, so clicking on the call stack finds most of themsoul-adminController method, indicating the management console interface.
  • WebSocket is used to enable the client to initiate a request to the server. In this case, the gateway sends a request to the serversoul-adminUpdate data, when sent, [reveal story, is the client at the first time to establish a link].

Publish the event method SyncDataServiceImpl#syncAll code, which, as the name implies, fully updates the data

    @Override
    public boolean syncAll(final DataEventTypeEnum type) {
        appAuthService.syncData();
        List<PluginData> pluginDataList = pluginService.listAll();
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
        List<SelectorData> selectorDataList = selectorService.listAll();
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
        List<RuleData> ruleDataList = ruleService.listAll();
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
        metaDataService.syncData();
        return true;
    }
Copy the code

Its caller is the WebsocketCollector#onMessage method requested by the WebSocket server interface client

    @OnMessage
    public void onMessage(final String message, final Session session) {
        if (message.equals(DataEventTypeEnum.MYSELF.name())) {
            try {
                ThreadLocalUtil.put(SESSION_KEY, session);
                SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
            } finally{ ThreadLocalUtil.clear(); }}}Copy the code

As you can see, the “MYSELF” parameter is the gateway getting the data for itself, using ThreadLocal to store its session, and finally calling the client that sends the data to the request.

3. Look at WebSocket communication from the client

The Soul gateway relies on the Spring project, and many custom module dependencies are also introduced in the form of Springboot-starter. Soul-bootstrap: soul-spring-boot-starter-sync-data-websocket: soul-sync-data-websocket: soul-bootstrap: soul-spring-boot-starter-sync-data-websocket: soul-sync-data-websocket The library on which the client depends is

  <dependency>
      <groupId>org.java-websocket</groupId>
      <artifactId>Java-WebSocket</artifactId>
      <version>${java-websocket.version}</version>
  </dependency>
Copy the code

Gateway service WebSocket client startup class WebsocketSyncDataService, constructor code

    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                    final PluginDataSubscriber pluginDataSubscriber,
                                    final List<MetaDataSubscriber> metaDataSubscribers,
                                    final List<AuthDataSubscriber> authDataSubscribers) {
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect".true));
        for (String url : urls) {
            try {
                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
            } catch (URISyntaxException e) {
                log.error("websocket url({}) is error", url, e); }}try {
            for (WebSocketClient client : clients) {
                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                if (success) {
                    log.info("websocket connection is successful.....");
                } else {
                    log.error("websocket connection is error.....");
                }
                executor.scheduleAtFixedRate(() -> {
                    try {
                        if (client.isClosed()) {
                            boolean reconnectSuccess = client.reconnectBlocking();
                            if (reconnectSuccess) {
                                log.info("websocket reconnect is successful.....");
                            } else {
                                log.error("websocket reconnection is error....."); }}}catch (InterruptedException e) {
                        log.error("websocket connect is error :{}", e.getMessage()); }},10.30, TimeUnit.SECONDS);
            }
            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80))); * /
        } catch (InterruptedException e) {
            log.info("websocket connection... exception....", e); }}Copy the code

It can be seen that the operation is to establish a link, and then a scheduled task to ensure the health of the link, break the re-link, the code here is how the WebSocket client and server do this.

When creating the link, initialize the SoulWebsocketClient class and take a look at its code

@Slf4j
public final class SoulWebsocketClient extends WebSocketClient {
    
    private volatile boolean alreadySync = Boolean.FALSE;
    
    private final WebsocketDataHandler websocketDataHandler;
    
    /**
     * Instantiates a new Soul websocket client.
     *
     * @param serverUri             the server uri
     * @param pluginDataSubscriber the plugin data subscriber
     * @param metaDataSubscribers   the meta data subscribers
     * @param authDataSubscribers   the auth data subscribers
     */
    public SoulWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,
                               final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
        super(serverUri);
        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
    }
    
    @Override
    public void onOpen(final ServerHandshake serverHandshake) {
        if(! alreadySync) { send(DataEventTypeEnum.MYSELF.name()); alreadySync =true; }}@Override
    public void onMessage(final String result) {
        handleResult(result);
    }
    
    @Override
    public void onClose(final int i, final String s, final boolean b) {
        this.close();
    }
    
    @Override
    public void onError(final Exception e) {
        this.close();
    }
    
    @SuppressWarnings("ALL")
    private void handleResult(final String result) { WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class); ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType()); String eventType = websocketData.getEventType(); String json = GsonUtils.getInstance().toJson(websocketData.getData()); websocketDataHandler.executor(groupEnum, json, eventType); }}Copy the code

You can see in the link building called when SoulWebsocketClient# onOpen method, execute the send (DataEventTypeEnum. MYSELF. The name ()), parameter is MYSELF, we see. This corresponds to the WebSocket server code, which sends a request to the server after the client establishes the link for the first time. The server pushes the full data, which is received by the client and stored in the local JVM.