Today we’ll look at Soul’s HTTP long polling to synchronize data to the gateway.

1. Start soul-admin

  1. Configure the application. XML configuration file to disable WebSocket and ZooKeeper and enable HTTP.

    soul:
      database:
        dialect: mysql
        init_script: "META-INF/schema.sql"
        init_enable: true
      sync:
        websocket:
          enabled: false
    #    zookeeper:
    #      enabled: true
    #      url: localhost:2181
    #      sessionTimeout: 5000
    #      connectionTimeout: 2000
        http:
          enabled: true
    Copy the code
  2. Start SoulAdminBootstrap.

2. Start soul-bootstrap

  1. Configure application-local.yml to disable Dubbo, webSocket, and ZooKeeper and to enable HTTP
soul :
    file:
      enabled: true
    corss:
      enabled: true
#    dubbo :
#      parameter: multi
    sync:
#        websocket :
#             urls: ws://localhost:9095/websocket

#        zookeeper:
#             url: localhost:2181
#             sessionTimeout: 5000
#             connectionTimeout: 2000
        http:
             url : http://localhost:9095
Copy the code
  1. Start SoulBootstrapApplication.


    Source code analysis:

    1. Analyze the HTTP source code of soul-admin

    1. Through the analysis of the previous two articles, we analyzed from the configuration file of sole-admin’s application. XML, which enabled HTTP long polling to synchronize data. We full-text search soul. Sync. HTTP, found DataSyncConfiguration class loads application. XML HTTP configuration information, and perform HttpLongPollingDataChangedListener constructor. To AbstractDataChangedListener afterPropertiesSet at the same time, after completion of the AbstractDataChangedListener initialization, AfterPropertiesSet method is from the DB query AppAuth, Rlugin Rule, the Selector, Metadata, data and the initialization to the local CACHE (AbstractDataChangedListener CACHE), Then execute HttpLongPollingDataChangedListener afterInitialize method, open time task, once every 5 minutes to perform, The AppAuth, Rlugin, Rule, the Selector and the Metadata from the DB query and CACHE to the local CACHE (AbstractDataChangedListener CACHE).

      After completion of the operation, will also perform DataChangedEventDispatcher afterPropertiesSet method, listeners to initialize the data, data change events to monitor.

@Configuration public class DataSyncConfiguration { /** * http long polling. */ @Configuration @ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true") @EnableConfigurationProperties(HttpSyncProperties.class) static class HttpLongPollingListener { @Bean @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class) public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) { return new HttpLongPollingDataChangedListener(httpSyncProperties); }},,}Copy the code
@Slf4j @SuppressWarnings("all") public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener { private static final String X_REAL_IP = "X-Real-IP"; private static final String X_FORWARDED_FOR = "X-Forwarded-For"; private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ","; private static final ReentrantLock LOCK = new ReentrantLock(); /** * Blocked client. */ private final BlockingQueue<LongPollingClient> clients; private final ScheduledExecutorService scheduler; private final HttpSyncProperties httpSyncProperties; /** * Instantiates a new Http long polling data changed listener. * @param httpSyncProperties the HttpSyncProperties */ public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) { this.clients = new ArrayBlockingQueue<>(1024); this.scheduler = new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("long-polling", true)); this.httpSyncProperties = httpSyncProperties; } / * * * HttpLongPollingDataChangedListener initialization complete execution * / @ Override protected void afterInitialize () {long syncInterval = httpSyncProperties.getRefreshInterval().toMillis(); // Periodically check the data for changes and update the cache scheduler.scheduleWithFixedDelay(() -> { log.info("http sync strategy refresh config start."); try { this.refreshLocalCache(); log.info("http sync strategy refresh config success."); } catch (Exception e) { log.error("http sync strategy refresh config error!" , e); } }, syncInterval, syncInterval, TimeUnit.MILLISECONDS); log.info("http sync strategy refresh interval: {}ms", syncInterval); } / * * * the AppAuth, Rlugin, Rule, the Selector and the Metadata from the DB query and CACHE to the local CACHE (AbstractDataChangedListener CACHE) * / private void refreshLocalCache() { this.updateAppAuthCache(); this.updatePluginCache(); this.updateRuleCache(); this.updateSelectorCache(); this.updateMetaDataCache(); },,}Copy the code
@Slf4j @SuppressWarnings("all") public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {,,, @Override public final void afterPropertiesSet() {updateAppAuthCache(); updatePluginCache(); updateRuleCache(); updateSelectorCache(); updateMetaDataCache(); afterInitialize(); },,}Copy the code
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
    String json = GsonUtils.getInstance().toJson(data);
    ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
    ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
    log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}
Copy the code
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

    private ApplicationContext applicationContext;

    private List<DataChangedListener> listeners;

    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    @Override
    public void afterPropertiesSet() {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }
Copy the code
  1. Let’s debug and start soul-admin. Discovery project startup DataSyncConfiguration class load application. XML HTTP configuration information, and perform HttpLongPollingDataChangedListener constructor. To AbstractDataChangedListener afterPropertiesSet at the same time, after completion of the initialization AbstractDataChangedListener execution. Finally performs DataChangedEventDispatcher afterPropertiesSet method, listeners to initialize the data, data change events to monitor.

2. Analyze the HTTP source code of soul-Bootstrap

  1. Sourl-bootstrap application-local.yml: soul.sync.http.url = “soul.sync.http.url” Found the HttpSyncDataConfiguration. HttpSyncDataConfiguration executes SyncDataService constructor. Refresh the cache of soul-admin and soul-bootstrap APP_AUTH, PLUGIN, RULE, SELECTOR, and META_DATA.
@Slf4j public class HttpSyncDataService implements SyncDataService, AutoCloseable { private static final AtomicBoolean RUNNING = new AtomicBoolean(false); private static final Gson GSON = new Gson(); /** * default: 10s. */ private Duration connectionTimeout = Duration.ofSeconds(10); /** * only use for http long polling. */ private RestTemplate httpClient; private ExecutorService executor; private HttpConfig httpConfig; private List<String> serverList; private DataRefreshFactory factory; public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers); this.httpConfig = httpConfig; this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl())); this.httpClient = createRestTemplate(); this.start(); } private RestTemplate createRestTemplate() { OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory(); factory.setConnectTimeout((int) this.connectionTimeout.toMillis()); factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT); return new RestTemplate(factory); } /** * When RUNNING is false, set to true, */ private void start() {// It could be initialized multiple times, so you need to control that. if (RUNNING.compareAndSet(false, true)) { // fetch all group configs. FetchGroupConfig (configGroupenum.values ()); int threadSize = serverList.size(); this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), SoulThreadFactory.create("http-long-polling", true)); // start long polling, each server creates a thread to listen for changes. this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server))); } else { log.info("soul http long polling was started, executor=[{}]", executor); } } class HttpLongPollingTask implements Runnable { private String server; private final int retryTimes = 3; HttpLongPollingTask(final String server) { this.server = server; } @override public void run() {// When RUNNING is true, try again 3 times. Refresh the cache of soul-admin and soul-bootstrap APP_AUTH, PLUGIN, RULE, SELECTOR, and META_DATA. RUNNING is false only when soul-bootstrap is turned off. In addition, an infinite loop is enabled to detect changes made by soul-admin in real time and save them to the soul-bootstrap local cache. while (RUNNING.get()) { for (int time = 1; time <= retryTimes; time++) { try { doLongPolling(server); } catch (Exception e) { // print warnning log. if (time < retryTimes) { log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}", time, retryTimes - time, e.getMessage()); ThreadUtils.sleep(TimeUnit.SECONDS, 5); continue; } // print error, then suspended for a while. log.error("Long polling failed, try again after 5 minutes!" , e); ThreadUtils.sleep(TimeUnit.MINUTES, 5); } } } log.warn("Stop http long polling."); } } private void doLongPolling(final String server) { MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8); for (ConfigGroupEnum group : ConfigGroupEnum.values()) { ConfigData<? > cacheConfig = factory.cacheConfigData(group); String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime())); params.put(group.name(), Lists.newArrayList(value)); } HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); HttpEntity httpEntity = new HttpEntity(params, headers); String listenerUrl = server + "/configs/listener"; log.debug("request listener configs: [{}]", listenerUrl); JsonArray groupJson = null; try { String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody(); log.debug("listener result: [{}]", json); groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data"); } catch (RestClientException e) { String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage()); throw new SoulException(message, e); } if (groupJson ! = null) { // fetch group configuration async. ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class); if (ArrayUtils.isNotEmpty(changedGroups)) { log.info("Group config changed: {}", Arrays.toString(changedGroups)); this.doFetchGroupConfig(server, changedGroups); }}}Copy the code
  1. Start soul-bootstrap and debug verification.

conclusion

Now that we know how to use HTTP long polling, we have made several local Plugin, SelectorList, and RulesList changes to sole-bootstrap and soul-admin. The situation is as we saw above. Soul-admin initializes the cache of AppAuth, Rlugin, Rule, Selector, and Metadata, and enables the scheduled task to update the cache of AppAuth, Rlugin, Rule, Selector, and Metadata every 5 minutes. It also enables listening for changes to AppAuth, Rlugin, Rule, Selector, and Metadata. Soul-bootstrap will load the HTTP configuration. The first time RUNNING is set to true, When RUNNING is true, the AppAuth, Rlugin, Rule, Selector, and Metadata caches of soul-admin and soul-bootstrap are constantly refreshed. Soul-bootstrap starts an infinite loop that updates RUNNING to false when the project is closed.