1. Introduction

The system development process is often accompanied by a variety of configurations. Once the system configuration is set, it will not be changed, and the service configuration needs to be changed frequently as the service changes. If frequently changed configurations are written to the configuration file, service requirements cannot be timely responded to. Second, it will make the system lose flexibility, and every time you change the configuration, you need to re-publish the application.

Based on the existing pain points, there was an urgent need for a quick and easy configuration modification solution: Apollo

We also have some questions about Apollo:

  • How does the application get configuration information?
  • How does the application update the configuration information in a timely manner?

To solve the above confusion, start with the client side

2. The client

2.1 Active Pull configuration

An application client can proactively pull configuration information

2.1.1 Creating a remote configuration repository based on namespaces

public RemoteConfigRepository(String namespace) {
    gson = new Gson();
    // 1. Actively synchronize the configuration (performed when the client application is started)
    this.trySync();
    // 2. Fixed cycle active pull configuration (once every 5 minutes)
    this.schedulePeriodicRefresh();
    // 3. Enable long polling to ask whether the server has the latest configuration. If the server has the latest configuration, the client automatically pulls the latest configuration. Otherwise, wait a certain amount of time and ask again
    this.scheduleLongPollingRefresh();
}
Copy the code

2.1.2 queryConfig ServiceService address

private String assembleMetaServiceUrl(a) {
    // 1. Obtain metaServer address (specified in configuration file)
    String domainName = m_configUtil.getMetaServerDomainName();
    // 2. Unique id of the application
    String appId = m_configUtil.getAppId();
    // 3. Local IP
    String localIp = m_configUtil.getLocalIp();

    Map<String, String> queryParams = Maps.newHashMap();
    queryParams.put("appId", queryParamEscaper.escape(appId));
    if(! Strings.isNullOrEmpty(localIp)) { queryParams.put("ip", queryParamEscaper.escape(localIp));
    }
    // 4
    return domainName + "/services/config?" + MAP_JOINER.join(queryParams);
}
Copy the code

Since meta Server and Config Service are deployed together, The config service can be registered in Eureka by using the meta Server address + /services/config path. Mata Server is packaged with Eureka.

HttpResponse<List<ServiceDTO>> response = m_httpUtil.doGet(request, m_responseType);
Copy the code

After obtaining the service configuration list, pull the configuration information in a random way

private ApolloConfig loadApolloConfig(a) {
    Config service Specifies the service configuration list
    List<ServiceDTO> configServices = getConfigServices();
    String url = null;
    retryLoopLabel:
    for (int i = 0; i < maxRetries; i++) {
        List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
        // 2. Shuffle the service configuration list
        Collections.shuffle(randomConfigServices);
        //Access the server which notifies the client first
        if(m_longPollServiceDto.get() ! =null) {
            randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
        }
	// 3. Iterate through the service configuration list
        for (ServiceDTO configService : randomConfigServices) {
            / / 4. Build pull configuration url: configs/appId/cluster/namespace
            url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
                                         dataCenter, m_remoteMessages.get(), m_configCache.get());

            HttpRequest request = new HttpRequest(url);
            try {
		// 5. Pull the configuration
                HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
               	// 6. The client configuration is consistent with the server configuration, and the server returns 304
                if (response.getStatusCode() == 304) {
                    logger.debug("Config server responds with 304 HTTP status code.");
                    return m_configCache.get();
                }
		// 7. Obtain the server configuration
                ApolloConfig result = response.getBody();
                return result;
            } catch (ApolloConfigStatusCodeException ex) {}
        }

    }
}
Copy the code

2.1.3 Updating the Configuration

@Override
protected synchronized void sync(a) {
    try {
        ApolloConfig previous = m_configCache.get();
        ApolloConfig current = loadApolloConfig();

	// 1. The server returns the latest configuration
        if(previous ! = current) {// 2. Update client configurations
            m_configCache.set(current);
            // 3. This part of logic is temporarily omitted, which will be mentioned later
            this.fireRepositoryChange(m_namespace, this.getConfig()); }}catch (Throwable ex) {} 
}
Copy the code

2.1.4 summary

The client pull configuration process is as follows:

  • The queryconfig serviceService Configuration information
  • According to theconfig serviceService configuration information, callconfigs/appId/cluster/namespaceAddress, pull configuration information
  • The server returns the latest configuration, and the client overwrites the old configuration

2.2 long polling

Based on the above, it can be seen that the client application will actively pull the configuration information once during the startup process and then pull it every 5 minutes. If only relying on the active pull mode, the configuration update will be delayed. Therefore, the server also needs to notify the client to pull the configuration when the configuration changes, so that the configuration can be updated in time.

When creating RemoteConfigRepository section 2.1.1 long-polling enclosing scheduleLongPollingRefresh ();

private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
    final Random random = new Random();
    ServiceDTO lastServiceDto = null;
    while(! m_longPollingStopped.get() && ! Thread.currentThread().isInterrupted()) {Avoid frequent requests
        if(! m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
            //wait at most 5 seconds
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
            }
        }
        String url = null;
        try {
            if (lastServiceDto == null) {
                2. Obtain the config service configuration list
                List<ServiceDTO> configServices = getConfigServices();
                lastServiceDto = configServices.get(random.nextInt(configServices.size()));
            }
			
            / / 3. Request to http://10.100.40.103:8080/notifications/v2? Cluster = default&appId = boot - example - apollo&ip = 10.100.40.103 & notifications 5 b = % 22% % 7 b % 22 namespacename % 3 a % 22 application % 22% 2 C%22notificationId%22%3A-1%7D%5D
            url =
                assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
                                           m_notifications);

            HttpRequest request = new HttpRequest(url);
            // 4. Set the long polling request timeout period to 90s
            request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
	    // 5. Initiate a long polling request
            // 5.1 If the client configuration is not the latest, the server will immediately return the latest configuration
	    // 5.2 If the client configuration is the latest, the server will not immediately return the result, and the request will be suspended until the request times out, or the server will tell the client when it finds the latest configuration
            final HttpResponse<List<ApolloConfigNotification>> response =
                m_httpUtil.doGet(request, m_responseType);

            logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
            // 6. The latest configuration is available
            if (response.getStatusCode() == 200&& response.getBody() ! =null) {
                updateNotifications(response.getBody());
                updateRemoteNotifications(response.getBody());
                // 6.1 The client automatically pulls the latest configuration from the server
                notify(lastServiceDto, response.getBody());
            }

            // 7. After the request times out, the server still finds the latest configuration and starts the next request
            if (response.getStatusCode() == 304 && random.nextBoolean()) {
                lastServiceDto = null; }}catch (Throwable ex) {} 
    }
}
Copy the code

2.2.1 summary

  • The client initiates a long poll to the server
  • The client either waits for a timeout (after which the next request is opened) or waits for the server to inform it of the latest configuration (with the latest configuration, the client actively pulls the configuration, i.e2.1Chapter Content)

2.3 summarize

The client obtains configuration information in two ways: active pull and long poll. The long poll notifies the client of the latest configuration on the server, and the client immediately pulls the latest configuration to ensure the timeliness of configuration changes.

3. The service side

3.1 Handling Long Polling

public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification() {
    List<ApolloConfigNotification> notifications = null;

    try {
        notifications =
            gson.fromJson(notificationsAsString, notificationsTypeReference);
    } catch (Throwable ex) {

    }

    Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
    DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
    Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
    Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());


    // 1. Each key(AppID + Cluster + namespace) corresponds to a deferredResultWrapper
    for (String key : watchedKeys) {
        this.deferredResults.put(key, deferredResultWrapper);
    }

    /** * 2. Check whether the latest release record exists */
    List<ReleaseMessage> latestReleaseMessages =
        releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);

    List<ApolloConfigNotification> newNotifications =
        getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
                                     latestReleaseMessages);
	// 3. If there is a record of the latest release, respond to the client immediately
    if(! CollectionUtils.isEmpty(newNotifications)) { deferredResultWrapper.setResult(newNotifications); }// 4. Suspend the client request
    return deferredResultWrapper.getResult();
}
Copy the code

3.2 Scan for latest records and respond to pending requests

ReleaseMessageScanner implements the InitializingBean interface

@Override
public void afterPropertiesSet(a) throws Exception {
    databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
    maxIdScanned = loadLargestMessageId();
    // 1. Scan message records every 1s
    executorService.scheduleWithFixedDelay(() -> {
		scanMessages();
    }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

}
Copy the code
private boolean scanAndSendMessages(a) {
    //current batch is 500
    List<ReleaseMessage> releaseMessages =
        releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
    if (CollectionUtils.isEmpty(releaseMessages)) {
        return false;
    }
    // There is a new message recorded, responding to the client
    fireMessageScanned(releaseMessages);
    int messageScanned = releaseMessages.size();
    maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
    return messageScanned == 500;
}
Copy the code
@Override
public void handleMessage(ReleaseMessage message, String channel) {

    String content = message.getMessage();
    Tracer.logEvent("Apollo.LongPoll.Messages", content);

    // 1. Retrieve the namespace from boot-example-apollo+default+application
    String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
    
    // 2. Fetch DeferredResultWrapper from map
    List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

    ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
    configNotification.addMessage(content, message.getId());
    
    for (DeferredResultWrapper result : results) {
        // 3. Respond to a previously pending client request so that the client will actively pull the configuration upon receiving the request
        result.setResult(configNotification);
    }
    logger.debug("Notification completed");
}
Copy the code

3.3 summarize

The server can either respond immediately to the client’s long polling request (the client will actively pull the configuration) or suspend the request (the client will actively pull the configuration if the latest record is found during the subsequent scan).

4. Update the @value attribute

Listener events are triggered when the client actively pulls the configuration

protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
    for (final ConfigChangeListener listener : m_listeners) {
        m_executorService.submit(new Runnable() {
            @Override
            public void run(a) {
                try {
                    // 1. Trigger the listener onChange() method
                    listener.onChange(changeEvent);
                } catch (Throwable ex) {
                    
                } finally{}}}); }}Copy the code

4.1 Executing listener onChange()

@Override
public void onChange(ConfigChangeEvent changeEvent) {
    // 1. Obtain the changed key
    Set<String> keys = changeEvent.changedKeys();
    for (String key : keys) {
        Obtain the corresponding SpringValue based on the key. For details, see SpringValueProcessor. The SpringValueProcessor maps the configuration key to the SpringValue(containing the configuration key, configuration default value, bean corresponding to the field, bean name, and filed)
        Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
        if (targetValues == null || targetValues.isEmpty()) {
            continue;
        }

        // 2. Update the field value
        for(SpringValue val : targetValues) { updateSpringValue(val); }}}Copy the code