Nacos Client Config related source code parsing

Overall overview

Recently, I have been preparing ACM service developed by the company, but ACM has not planned to provide a complete JAVA SDK in the short term, so I need to study how Nacos is implemented to develop the supporting SDK. The analysis in this paper is based on Nacos 1.4.2, since the company also provides the HTTP interface for the long polling mechanism. The research on NACOS-Spring, NACOS-spring-boot and NACOS-spring-Cloud will be continued in the future.

Nacos-client relies on nacOS-API and NACOS-Common modules. The NACOS-API module is mainly used to define the functional interface and extension interface provided by NACOS. The NACOS-Common module is mainly used to define the common tool classes in nacOS projects. The NACOS-client module is responsible for the implementation of configuration management and service registration.

In the NACOS project, ConfigService defines the add, delete, change and check of configuration management and the interface of configuration listening. NacosConfigService is responsible for the implementation of specific functions. In NacosConfigService, two classes are relied upon for configuration management-related implementation:

  • ServerHttpAgent: Responsible for remote communication with Nacos Server, using HTTP requests to query and publish configurations

  • ClientWorker: Manages registered configuration items in applications and updates to their data. Each configuration item corresponds to a CacheData. A CacheData is responsible for data storage and event notification for configuration items

NacosConfigService

NacosConfigService, as a specific implementation class of ConfigService, mainly relies on ClientWorker and ServerHttpAgent to provide configuration query, publish, delete and monitor for applications. Most of the interfaces are implemented by directly calling the methods of these two classes, which we’ll examine each in the following sections. The focus here is on the special logic that NacosConfigService uses to get the configuration. The configuration is stored on the remote server. To avoid server failure or network failure, the nacOS Client provides failover and Snapshot when obtaining the configuration.

First, if the application environment has a configuration file in a specific directory (default: {user.home}/nacos/config/serverName_nacos/data/config-data), the configuration file will be directly loaded and returned. Second, if there is no local configuration for failover, then the server queries the remote configuration using HTTP requests. If the remote configuration cannot be obtained within the specified time, the system checks whether the snapshot configuration exists in the current service. The snapshot configuration is stored in a specific directory similar to failover configuration.

In addition, notice that the ConfigFilterChainManager takes care of filtering the current configuration once it gets the configuration. ConfigFilterChainManager maintains a List of iconFigFilters automatically loaded by the ServiceLoader. Users can implement the IConfigFilter to complete configuration filtering. For example, decrypting the configuration (each step loads the EncryptedDataKey from the getConfigInner function, but the associated IConfigFilter is actually not currently decrypted in nacos-Client)

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
    group = blank2defaultGroup(group);
    ParamUtils.checkKeyParam(dataId, group);
    ConfigResponse cr = new ConfigResponse();
    
    cr.setDataId(dataId);
    cr.setTenant(tenant);
    cr.setGroup(group);
    
    // Local configuration is preferred
    String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
    if(content ! =null) {
        LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
                dataId, group, tenant, ContentUtils.truncateContent(content));
        cr.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cr.setEncryptedDataKey(encryptedDataKey);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }
    
    try {
        ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs);
        cr.setContent(response.getContent());
        cr.setEncryptedDataKey(response.getEncryptedDataKey());
        
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        
        return content;
    } catch (NacosException ioe) {
        if (NacosException.NO_RIGHT == ioe.getErrCode()) {
            throw ioe;
        }
        LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
                agent.getName(), dataId, group, tenant, ioe.toString());
    }
    
    LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
            dataId, group, tenant, ContentUtils.truncateContent(content));
    content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
    cr.setContent(content);
    String encryptedDataKey = LocalEncryptedDataKeyProcessor
            .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
    cr.setEncryptedDataKey(encryptedDataKey);
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
}
Copy the code

All functions related to Failover and Snapshot are provided by LocalConfigInfoProcessor. The developer is responsible for generating and saving the configuration file of Failover, while LocalConfigInfoProcessor is only responsible for reading. Snapshot is saved, read, and cleaned by LocalConfigInfoProcessor. Snapshot represents a Snapshot of the current configuration of the current remote service, saved at each time the remote configuration is retrieved, i.e. when ClientWorker’s getServerConfig function is called. When the remote request succeeds in obtaining the configuration or when the remote request returns that the configuration does not exist, the local configuration snapshot file is refreshed using LocalConfigInfoProcessor.

public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout)
        throws NacosException {
    HttpRestResult<String> result = null;
    try{... result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH,null, params, agent.getEncode(), readTimeout);
    } catch (Exception ex) {
				.....
    }
    
    switch (result.getCode()) {
        case HttpURLConnection.HTTP_OK:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());
            configResponse.setContent(result.getData());
            String configType;
            if(result.getHeader().getValue(CONFIG_TYPE) ! =null) {
                configType = result.getHeader().getValue(CONFIG_TYPE);
            } else {
                configType = ConfigType.TEXT.getType();
            }
            configResponse.setConfigType(configType);
            String encryptedDataKey = result.getHeader().getValue(ENCRYPTED_DATA_KEY);
            LocalEncryptedDataKeyProcessor
                    .saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);
            configResponse.setEncryptedDataKey(encryptedDataKey);
            return configResponse;
        case HttpURLConnection.HTTP_NOT_FOUND:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
            LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, null);
            returnconfigResponse; . }}Copy the code

ClientWorker

ClientWorker In addition to the remote server configuration mentioned in the previous section, the most important thing is to provide configuration listening for the NACOS client. ClientWorker contains three key member variables: an executor that periodically checks the number of LongPolling tasks, an executorService that actually performs the LongPolling tasks, and a cacheMap that stores configuration items.

First let’s look at how clientworkers add configuration items. The main logic is in the addCacheDataIfAbsent function. In the nacOS definition, namespace/group/dataId uniquely identifies a configuration item resource. The difference is that on the nacOS client side, tenat corresponds to the concept of namespace. In this function, we first construct a CacheData based on the identity, and if cacheMap has not already added this configuration item, we use the configuration to decide whether to synchronize to get the latest configuration remotely. The CacheData is then assigned a TaskId based on the current number of tasks. On the nacOS-Client side, instead of each configuration item corresponding to a LongPolling task that listens for configuration changes, all the configuration items are grouped together. A LongPolling task is responsible for checking the configuration items that belong to the same group (of course, it is possible to control one LongPolling task for one configuration item with a parameter). Finally, the state of CacheData is initialized to indicate that the current configuration item needs to be initialized later.

public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
    String key = GroupKey.getKeyTenant(dataId, group, tenant);
    CacheData cacheData = cacheMap.get(key);
    if(cacheData ! =null) {
        return cacheData;
    }
    
    cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
    CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
    if (lastCacheData == null) {
        if (enableRemoteSyncConfig) {
            ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
            cacheData.setContent(response.getContent());
        }
        int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
        cacheData.setTaskId(taskId);
        lastCacheData = cacheData;
    }
    
    lastCacheData.setInitializing(true);
    //....
    
    return lastCacheData;
}
Copy the code

Looking at the Executor checkConfigInfo that periodically checks the number of LongPolling tasks, each CacheData configuration item is assigned a taskId when added to cacheMap, as described earlier. LongPolling uses this taskId to identify the CacheData it is working with. A new LongPollingRunnable is generated as the number of task clusters increases, but idle tasks are not closed as the number of tasks decreases.

public void checkConfigInfo(a) {
    int listenerSize = cacheMap.size();
    // Count the number of tasks that should currently be generated
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        // Generate a new task
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            executorService.execute(newLongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; }}Copy the code

Finally, take a look at the key executorService implementer, LongPollingRunnable. As mentioned earlier, each LongPollingRunnable is only responsible for the CacheData of the same taskId.

@Override
    public void run(a) {
        
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // check failover config
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if(cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); }}catch (Exception e) {
                        LOGGER.error("get local config info error", e); }}}// check server config
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
            if(! CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
            }
            
            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
                    CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(response.getContent());
                    cache.setEncryptedDataKey(response.getEncryptedDataKey());
                    if (null! = response.getConfigType()) { cache.setType(response.getConfigType()); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                            agent.getName(), dataId, group, tenant, cache.getMd5(),
                            ContentUtils.truncateContent(response.getContent()), response.getConfigType());
                } catch (NacosException ioe) {
                    String message = String
                            .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); }}for (CacheData cacheData : cacheDatas) {
                if(! cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();
            
            executorService.execute(this);
            
        } catch (Throwable e) {
            
            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); }}Copy the code

A call to checkLocalConfig is first made to check for Failover files in CacheData in one of three cases:

  • If CahceData does not use a local configuration file but a Failover file exists, the current configuration item is marked as using local configuration and local data is loaded
  • If no Failover file exists but CacheData was previously configured to use local configuration, the flag is cleared so that CacheData is added to a later remote refresh
  • If you use the local configuration, but the local configuration is updated, reload the local configuration file

After checking the local configuration, the MD5 of all CahceData that uses the local configuration is checked. When the MD5 changes, the listener held in CacheData is triggered

A call to checkUpdateDataIds is then made to the remote Server to fetch CacheData with configuration updates, primarily using the Listener interface in ConfigController on the Server side. This interface provides the capability of LongPolling, but it is important to note that if local configuration is enabled, the remote configuration will not be pulled, and that if you want to listen for something that contains an initialized CacheData, the listener must return immediately.

Map<String, String> params = new HashMap<String, String>(2);
// Configuration items to be checked
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap<String, String>(2);
// Long poll timeout
headers.put("Long-Pulling-Timeout"."" + timeout);

// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
    headers.put("Long-Pulling-Timeout-No-Hangup"."true");
}
Copy the code

Finally, the checkUpdateDataIds are fetched to all changed data, the configuration is fetched from the remote server in turn, the CacheData is updated, and the listener in the CacheData is notified when data changes or when a CacheData is added for the first time.

The execution of a Listener is not simply the execution of a callback function in the Listener. First, CacheData provides the ability to specify a Listener in a specific executor, thus avoiding blocking the LongPolling task thread. Second, when the listener is executed, the thread class loader of the thread executing the listener is set to the corresponding class loader of the listener to avoid exceptions or misuses when the SPI interface is invoked in the listener callback function.

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
        final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
    final Listener listener = listenerWrap.listener;
    
    Runnable job = new Runnable() {
        @Override
        public void run(a) {
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // Set the thread classloader as the classloader of the specific WebApp before executing the callback, so that the spi interface invocation in the callback method will not be abnormal or misused (this problem can only occur in multi-application deployment).
                Thread.currentThread().setContextClassLoader(appClassLoader);
                
                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                cr.setEncryptedDataKey(encryptedDataKey);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                listener.receiveConfigInfo(contentTmp);
                
                // compare lastContent and content
                if (listener instanceof AbstractConfigChangeListener) {
                    // Resolve configuration changes
                    Map data = ConfigChangeHandler.getInstance()
                            .parseChangeData(listenerWrap.lastContent, content, type);
                    ConfigChangeEvent event = new ConfigChangeEvent(data);
                    ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                    listenerWrap.lastContent = content;
                }
                
                listenerWrap.lastCallMd5 = md5;
                LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                        listener);
            } catch (NacosException ex) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                        name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                        group, md5, listener, t.getCause());
            } finally{ Thread.currentThread().setContextClassLoader(myClassLoader); }}};final long startNotify = System.currentTimeMillis();
    try {
        if (null! = listener.getExecutor()) { listener.getExecutor().execute(job); }else{ job.run(); }}catch (Throwable t) {
        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                group, md5, listener, t.getCause());
    }
    final long finishNotify = System.currentTimeMillis();
    LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
            name, (finishNotify - startNotify), dataId, group, md5, listener);
}
Copy the code

ServerHttpAgent

ServerHttpAgent is responsible for HTTP communication with the remote Server. It adds dynamic service address and other functions to the original HTTP request. Take Get request as an example, it is divided into the following steps:

  • Add AccessToken and Namespace identifiers to request parameters. AccessToken is maintained by SecruityProxy and updated periodically

  • Obtain the current Server address through ServerListManager. The ServerListManager is initialized by configuration and can be set to a fixed address or configured to obtain the Server address through a remote Server

  • Send Http requests through NacosTemplate

  • If the request fails, the current address is set to the current Server

The NacosRestTemplate is similar to the RestTemplate we often use in Spring projects without in-depth analysis.

In addition, Nacos also adds flow limiting control on the client side, mainly using RateLimiter in Guava to prevent the client from frequently calling the remote Server interface. The Request interceptor is added to the NacosRestTemplate when The ConfigHttpClientManager is initialized.

public HttpRestResult<String> httpGet(String path, Map<String, String> headers, Map<String, String> paramValues,
        String encode, long readTimeoutMs) throws Exception {
    final long endTime = System.currentTimeMillis() + readTimeoutMs;
    injectSecurityInfo(paramValues);
    String currentServerAddr = serverListMgr.getCurrentServerAddr();
    int maxRetry = this.maxRetry;
    HttpClientConfig httpConfig = HttpClientConfig.builder()
            .setReadTimeOutMillis(Long.valueOf(readTimeoutMs).intValue())
            .setConTimeOutMillis(ConfigHttpClientManager.getInstance().getConnectTimeoutOrDefault(100)).build();
    do {
        try {
            Header newHeaders = getSpasHeaders(paramValues, encode);
            if(headers ! =null) {
                newHeaders.addAll(headers);
            }
            Query query = Query.newInstance().initParams(paramValues);
            HttpRestResult<String> result = NACOS_RESTTEMPLATE
                    .get(getUrl(currentServerAddr, path), httpConfig, newHeaders, query, String.class);
            if (isFail(result)) {
                LOGGER.error("[NACOS ConnectException] currentServerAddr: {}, httpCode: {}",
                        serverListMgr.getCurrentServerAddr(), result.getCode());
            } else {
                // Update the currently available server addr
                serverListMgr.updateCurrentServerAddr(currentServerAddr);
                returnresult; }}catch (ConnectException connectException) {
            LOGGER.error("[NACOS ConnectException httpGet] currentServerAddr:{}, err : {}",
                    serverListMgr.getCurrentServerAddr(), connectException.getMessage());
        } catch (SocketTimeoutException socketTimeoutException) {
            LOGGER.error("[NACOS SocketTimeoutException httpGet] currentServerAddr:{}, err :{}",
                    serverListMgr.getCurrentServerAddr(), socketTimeoutException.getMessage());
        } catch (Exception ex) {
            LOGGER.error("[NACOS Exception httpGet] currentServerAddr: " + serverListMgr.getCurrentServerAddr(),
                    ex);
            throw ex;
        }
        
        if (serverListMgr.getIterator().hasNext()) {
            currentServerAddr = serverListMgr.getIterator().next();
        } else {
            maxRetry--;
            if (maxRetry < 0) {
                throw new ConnectException(
                        "[NACOS HTTP-GET] The maximum number of tolerable server reconnection errors has been reached"); } serverListMgr.refreshCurrentServerAddr(); }}while (System.currentTimeMillis() <= endTime);
    
    LOGGER.error("no available server");
    throw new ConnectException("no available server");
}
Copy the code