preface

Starting with this chapter, I will spend a total of 10 chapters reading the Nacos source code, including:

  • Nacos1.4.1 Configuration center
  • sofa-jraft
  • Nacos2.0 configuration center
  • Nacos1.4.1 Registry
  • Nacos2.0 registry
  • conclusion

This chapter analyzes the configuration center client for nacos1.4.1.

  • How to use
  • Nacos configuration Center model (Namespace, Group, dataId)
  • Nacos ConfigService parsing of the client, including configuration query and configuration listening

I. Use cases

public class MyConfigExample {
    public static void main(String[] args) throws NacosException {
        String serverAddr = "localhost";
        String dataId = "cfg0"; // dataId
        String group = "DEFAULT_GROUP"; // group
        Properties properties = new Properties();
        / / nacos - server address
        properties.put("serverAddr", serverAddr);
        // namespace/tenant
        properties.put("namespace"."dca8ec01-bca3-4df9-89ef-8ab299a37f73"); 
        ConfigService configService = NacosFactory.createConfigService(properties);
        // 1. Register listener
        configService.addListener(dataId, group, new AbstractListener() {
            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("Receive config info:"+ configInfo); }});// 2. Query the initial configuration
        String config = configService.getConfig(dataId, group, 3000);
        System.out.println("init config : " + config);
        // 3. Modify configurations
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String next = scanner.next();
            if ("exit".equals(next)) {
                break; } configService.publishConfig(dataId, group, next); }}}Copy the code

Configure the central model

**Namspace (Tenant) : namespace (Tenant), ** The default namespace is public. A namespace can contain more than one Group, and in the Nacos source code some variables are called tenants, which are the same thing as namespaces.

Group: Group. The default Group is DEFAULT_GROUP. A group can contain more than one dataId.

DataId: translates as DataId. In nacos, DataId represents an entire configuration file and is the smallest unit of configuration. Unlike Apollo, the smallest unit of Apollo is a configuration key.

Third, ConfigService

ConfigService is the configuration service interface that Nacos exposes to clients. One Nacos configuration center + one Namespace= one ConfigService instance.

Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
properties.put("namespace"."dca8ec01-bca3-4df9-89ef-8ab299a37f73");
ConfigService configService = NacosFactory.createConfigService(properties);
Copy the code
public class NacosFactory {
    /** * Create config service. */
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        returnConfigFactory.createConfigService(properties); }}public class ConfigFactory {
    /** * Create Config. */
    public static ConfigService createConfigService(Properties properties) throws NacosException { Class<? > driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
      Constructor constructor = driverImplClass.getConstructor(Properties.class);
      ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
      returnvendorImpl; }}Copy the code

Why create the NacosConfigService implementation class through reflection? The main purpose is to separate out the API layer.

The ConfigService provides the following functions:

  • Configure add delete change check
  • Configuring listener Registration
public interface ConfigService {
    // Configure add, delete, change, and check
    String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
    String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener) throws NacosException;
    boolean publishConfig(String dataId, String group, String content) throws NacosException;
    boolean publishConfig(String dataId, String group, String content, String type) throws NacosException;
    boolean removeConfig(String dataId, String group) throws NacosException;
  	// Register listener
    void addListener(String dataId, String group, Listener listener) throws NacosException;
    void removeListener(String dataId, String group, Listener listener);
    // NacosConfigServer is UP/DOWN
    String getServerStatus(a);
    // The resource is closed
    void shutDown(a) throws NacosException;
}
Copy the code

1. Configuration query

The addition, deletion, modification and query of configurations are implemented by NacosConfigService.

The entry method for the Nacos client to get the configuration is NacosConfigService#getConfigInner.

private final ClientWorker worker;
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
    group = null2defaultGroup(group); // group The default value is DEFAULT_GROUP
    ConfigResponse cr = new ConfigResponse();
    cr.setDataId(dataId);
    cr.setTenant(tenant);
    cr.setGroup(group);

    // LEVEL1: Use the local file system failover configuration
    String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
    if(content ! =null) {
        cr.setContent(content);
        content = cr.getContent();
        return content;
    }

    // LEVEL2: Read config-server real-time configuration and save snapshot to local file system
    try {
        String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
        cr.setContent(ct[0]);
        content = cr.getContent();
        return content;
    } catch (NacosException ioe) {
        if (NacosException.NO_RIGHT == ioe.getErrCode()) {
            throw ioe;
        }
        // A non-403 error enters LEVEL3LOGGER.warn(...) ; }// LEVEL3: If non-403forbidden error occurs while reading config-server, use local snapshot
    content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
    cr.setContent(content);
    content = cr.getContent();
    return content;
}
Copy the code

The failover file has the highest priority in Nacos. If the failover file exists, the configuration of the Nacos server will not be used, but the failover file will always be used, even if the configuration of the server is changed. Similar to Apollo, the LOCAL configuration file is only used when -denv =LOCAL. Note that the contents of the failover file of Nacos do not have an update entry, which means that the file can only be modified to take effect in the file system during the long polling process.

The path to the failover file is as follows:

  • Default namespace: /{user.home}/{agentName}_nacos/data/config-data/{group}/{dataId}
  • Specify namespace: /{user.home}/{agentName}_nacos/data/config-data-tenant/{namespace}/{group}/{dataId}
// LocalConfigInfoProcessor
static File getFailoverFile(String serverName, String dataId, String group, String tenant) {
    File tmp = new File(LOCAL_SNAPSHOT_PATH, serverName + "_nacos");
    tmp = new File(tmp, "data");
    if (StringUtils.isBlank(tenant)) {
        tmp = new File(tmp, "config-data");
    } else {
        tmp = new File(tmp, "config-data-tenant");
        tmp = new File(tmp, tenant);
    }
    return new File(new File(tmp, group), dataId);
}
Copy the code

In general, failover file does not exist, then will go ClientWorker. GetServerConfig method. This method queries the latest configuration of the NACOS server on one hand and updates the snapshot file on the other.

// ClientWorker
public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {
    String[] ct = new String[2];
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }

    HttpRestResult<String> result = null;
    try {
        Map<String, String> params = new HashMap<String, String>(3);
        if (StringUtils.isBlank(tenant)) {
            params.put("dataId", dataId);
            params.put("group", group);
        } else {
            params.put("dataId", dataId);
            params.put("group", group);
            params.put("tenant", tenant);
        }
        Request /v1/cs/configs
        result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
    } catch (Exception ex) {
        throw new NacosException(NacosException.SERVER_ERROR, ex);
    }
    // 2. Process the return result, if 200 and 404, and update the local snapshot file
    switch (result.getCode()) {
        case HttpURLConnection.HTTP_OK:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());
            ct[0] = result.getData();
            if(result.getHeader().getValue(CONFIG_TYPE) ! =null) {
                ct[1] = result.getHeader().getValue(CONFIG_TYPE);
            } else {
                ct[1] = ConfigType.TEXT.getType();
            }
            return ct;
        case HttpURLConnection.HTTP_NOT_FOUND:
            LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
            return ct;
        / /... Omitting any other state throws a NacosException}}Copy the code

If ClientWorker getServerConfig failure, and a 403 error, will read out the snapshot files.

Path to snapshot file:

  • Default namespace: /{user.home}/{agentName}_nacos/snapshot/{group}/{dataId}
  • Specify namespace: /{user.home}/{agentName}_nacos/snapshot-tenant/{namespace}/{group}/{dataId}
static File getSnapshotFile(String envName, String dataId, String group, String tenant) {
    File tmp = new File(LOCAL_SNAPSHOT_PATH, envName + "_nacos");
    if (StringUtils.isBlank(tenant)) {
        tmp = new File(tmp, "snapshot");
    } else {
        tmp = new File(tmp, "snapshot-tenant");
        tmp = new File(tmp, tenant);
    }
    return new File(new File(tmp, group), dataId);
}
Copy the code

In summary, the ConfigService obtains the configuration without using the memory cache, either by reading the files in the file system or by querying the real-time configuration of nacOS-Server.

2. Configure listening

Core classes

ClientWorker

ClientWorker’s main task is executive polling.

public class ClientWorker implements Closeable {
	  // Tests whether the longPolling task needs to be submitted to the executorService, and if so
    final ScheduledExecutorService executor;
    // The listener callback is normally executed in this thread
    final ScheduledExecutorService executorService;
    // groupKey -> cacheData
    private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<String, CacheData>();
    // Think it's an httpClient
    private final HttpAgent agent;
    // Hook manager
    private final ConfigFilterChainManager configFilterChainManager;
    // nacos server is healthy
    private boolean isHealthServer = true;
    // The default long polling timeout is 30s
    private long timeout;
    // Number of current long polling tasks
    private double currentLongingTaskCount = 0;
    // The long poll is abnormal, and the next long poll is delayed for 2s by default
    private int taskPenaltyTime;
    // Whether to proactively obtain the latest configuration when adding listeners
    private boolean enableRemoteSyncConfig = false;
}
Copy the code
  • GroupKey: The groupKey is used to determine the only configuration file under a ConfigService. GroupKey =dataId+group{+namespace}

  • CacheMap: Stores the mapping between groupKey and CacheData.

  • Agent: common HttpClient.

Two actuators are created when a ClientWorker is constructed.

  • Executor: Checks the current status (cacheMap size and number of long polling tasks that have been submitted) and whether new long polling tasks need to be submitted to the executorService. Number of fixed threads =1.
  • ExecutorService: Responsible for executive polling tasks, number of fixed threads = number of cores.
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;
    // Initialize some parameters, such as timeout
    init(properties);
		// single thread executor
    this.executor = Executors.newScheduledThreadPool(1.new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            returnt; }});// Execute the actuator of LongPollingRunnable, fixed number of threads = number of cores
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    returnt; }});// Inspect and submit LongPollingRunnable to this. ExecutorService
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run(a) {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); }}},1L.10L, TimeUnit.MILLISECONDS);
}
Copy the code

CacheData

CacheData is an abstraction of a configuration file, one for each groupKey. Listeners are registered with CacheData and are triggered when the configuration of CacheData changes.

// agentName
private final String name;
// dataId
public final String dataId;
// group
public final String group;
// namespace
public final String tenant;
// Register listeners on this configuration
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
// Md5 is configured
private volatile String md5;
// Whether to use the failover configuration file
private volatile boolean isUseLocalConfig = false;
// Timestamp of the last update of the failover configuration file
private volatile long localConfigLastModified;
/ / configuration
private volatile String content;
// Id of the owning long polling task
private int taskId;
// Whether initialization is in progress
private volatile boolean isInitializing = true;
// The configuration file type is TEXT, JSON, or YAML
private String type;
// Provide hook handling for requests and responses to query configurations
private final ConfigFilterChainManager configFilterChainManager;
Copy the code

When constructed, CacheData loads the configuration of the local file system into Conent and computes its MD5.

public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group, String tenant) {
    if (null == dataId || null == group) {
        throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);
    }
    this.name = name;
    this.configFilterChainManager = configFilterChainManager;
    this.dataId = dataId;
    this.group = group;
    this.tenant = tenant;
    listeners = new CopyOnWriteArrayList<ManagerListenerWrap>();
    this.isInitializing = true;
    // The configuration is loaded from the local file system. Failover > snapshot
    this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);
    this.md5 = getMd5String(content);
}
Copy the code

Register to monitor

An example of registering a listener is as follows.

configService.addListener(dataId, group, new AbstractListener() {
    @Override
    public void receiveConfigInfo(String configInfo) {
        System.out.println("Receive config info:"+ configInfo); }});Copy the code

NacosConfigService delegates the listener to the ClientWorker to register with CacheData.

// ClientWorker
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
        throws NacosException {
    group = null2defaultGroup(group);
    String tenant = agent.getTenant();
    / / get CacheData
    CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
    // Register a listener for CacheData
    for(Listener listener : listeners) { cache.addListener(listener); }}Copy the code

First, if the CacheData for the current groupKey does not exist, it will be created. Otherwise, the corresponding CacheData is returned.

public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
    String key = GroupKey.getKeyTenant(dataId, group, tenant);
    // 1 If it already exists in the cache, return it directly
    CacheData cacheData = cacheMap.get(key);
    if(cacheData ! =null) {
        return cacheData;
    }
    // 2 Create CacheData, which is set to the initial configuration using the local configuration file
    cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
    // 3 Multi-threaded operation cacheMap verifies again that cacheData has been cached
    CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);
    // 4 cacheData is returned if the current thread successfully sets key-cacheData
    if (lastCacheData == null) {
        if (enableRemoteSyncConfig) { // Whether to allow real-time configuration synchronization when adding listeners. Default: false
            String[] ct = getServerConfig(dataId, group, tenant, 3000L);
            cacheData.setContent(ct[0]);
        }
        // Calculate the id of the owning long polling task
        int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();
        cacheData.setTaskId(taskId);
        lastCacheData = cacheData;
    }
  	CacheData is being initialized so that the next sub-long poll returns results immediately
    lastCacheData.setInitializing(true);
    // 5 Otherwise the cacheData returned is the old cacheData
    return lastCacheData;
}
Copy the code

Ensure that after CacheData is created, register a Listener to CacheData.

// CacheData
// Register a listener on the tenant-group-datAID configuration
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
public void addListener(Listener listener) {
    ManagerListenerWrap wrap =
            (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content)
                    : new ManagerListenerWrap(listener, md5);

    if (listeners.addIfAbsent(wrap)) {
        LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group, listeners.size()); }}Copy the code

Long polling

Start time of long polling

When the ClientWorker is constructed, a scheduled task is started and the checkConfigInfo method is executed every 10ms.

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
     // ...
    // Inspect and submit LongPollingRunnable to this. ExecutorService
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run(a) {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); }}},1L.10L, TimeUnit.MILLISECONDS);
}
Copy the code

CheckConfigInfo Checks the current number of CacheData and whether to enable a long polling task. If the current number of long polling tasks is < math. ceil(cacheMap size / 3000), a new long polling task is started. In the case of few configuration files, at most one long polling task.

// ClientWorker
public void checkConfigInfo(a) {
    / / cacheMap size
    int listenerSize = cacheMap.size();
    // cacheMap size / 3000 rounded up
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    // Count longingTaskCount greater than the actual number of long polling tasks
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // Start a new long polling task
            executorService.execute(newLongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; }}Copy the code

Therefore, CacheData is created after a listener is registered. When a checkConfigInfo scheduled task detects that a new long polling task needs to be enabled, the checkConfigInfo scheduled task triggers the submission of a long polling task.

Long poll overall process

The main process of LongPollingRunnable long polling task is as follows.

class LongPollingRunnable implements Runnable {
    private final int taskId;
    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }
    @Override
    public void run(a) {
        // The CacheData collection for which the current long polling task is responsible
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        // The CacheData being initialized is the CacheData that was just built, and the internal content is still the Snapshot version
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // 1. Process the failover configuration file
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        // To check whether failover is required for cacheData, set isUseLocalConfigInfo
                        // Update the configuration in memory if necessary
                        checkLocalConfig(cacheData);
                        // The failover configuration detects whether the content has changed and notifies the listener if it has
                        if(cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); }}catch (Exception e) {
                        LOGGER.error("get local config info error", e); }}}// 2. For all non-failover configurations, the CEO polls and returns the changed groupKey
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    // 3. Query the real-time configuration and save the snapshot
                    String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                    // 4. Update memory configurations
                    CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(ct[0]);
                    if (null! = ct[1]) {
                        cache.setType(ct[1]); }}catch(NacosException ioe) { LOGGER.error(message, ioe); }}// 5. For non-failover configurations, the listener is triggered
            for (CacheData cacheData : cacheDatas) {
                // Remove the failover file
                if(! cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// Check whether THE MD5 is changed and notify the listener if the change occurs
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();
            // 6-1. Submit the long polling task again
            executorService.execute(this);
        } catch (Throwable e) {
            // 6-2. If the long poll execution is abnormal, delay the execution of the next long poll for 2s
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); }}}Copy the code

The long polling task has several steps:

  • Handling failover configuration: Judge whether the current CacheData use failover configuration (ClientWorker checkLocalConfig), if you use a failover configuration, then check the local configuration file content is changed, Change is triggered the listener (CacheData checkListenerMd5). This step has nothing to do with long polling.
  • For all the failover configuration, executive polling, return change of groupKey (ClientWorker. CheckUpdateDataIds).
  • According to return groupKey, real-time query server configuration and save the snapshot (ClientWorker. GetServerConfig)
  • Updated the memory CacheData configuration content.
  • Check whether the configuration is changed, notify the listener (CacheData. CheckListenerMd5).
  • If the long poll is successfully executed, submit the long poll task immediately and execute the next long poll. An exception occurred, delaying the submission of the long poll task for 2s.

When is failover local configuration used

The long polling task not only sends a request to the server to obtain the groupKey whose configuration is changed, but also performs a failover to monitor the local configuration.

ClientWorker. CheckLocalConfig judge whether the current CacheData need to use failover local configuration, the configuration will not obtain, from the service side can only manual update in the file system.

private void checkLocalConfig(CacheData cacheData) {
    final String dataId = cacheData.dataId;
    final String group = cacheData.group;
    final String tenant = cacheData.tenant;
    File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);

    // If isUseLocalConfigInfo=false and the failover configuration file exists, use the failover configuration file and update the configuration in the memory
    if(! cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        return;
    }

    // If isUseLocalConfigInfo=true and the failover configuration file does not exist, the failover configuration file is not used
    if(cacheData.isUseLocalConfigInfo() && ! path.exists()) { cacheData.setUseLocalConfigInfo(false);
        return;
    }

    // If isUseLocalConfigInfo=true and the failover configuration file exists, use the failover configuration file and update the configuration in memory
    if(cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() ! = path .lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); }}Copy the code

If the failover configuration file exists in the specified path of the file system, the system preferentially uses the failover configuration file. When the failover configuration file is deleted, the system switches to the server configuration. Also, if the failover configuration file is used, the configuration in CacheData is updated here.

Sends a long poll request to the Server

For all the failover configuration, through ClientWorker checkUpdateDataIds launched a long polling requests.

All non-failover configurations are counted and service request packets are combined:

  • CacheData: dataId Group MD5 namespace
  • None CacheData: dataId Group MD5

In addition, CacheData that is being initialized is filtered out; that is, CacheData has just been built and its internal content is still the local snapshot version, and this part of the configuration will be treated specially.

List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
    StringBuilder sb = new StringBuilder();
    // Collate all non-failover cacheData into "dataId group MD5 "or "dataId Group MD5 namespace"
    for (CacheData cacheData : cacheDatas) {
        if(! cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            // Place the CacheData first listened on in inInitializingCacheList
            if(cacheData.isInitializing()) { inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); }}}booleanisInitializingCacheList = ! inInitializingCacheList.isEmpty();// Actually initiate the request
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
Copy the code

The checkUpdateConfigStr method is responsible for calling the server **/v1/cs/configs/listener** long polling interface and parsing the message back. Focus on a few points:

  • The request parameter listening-configs is the service message concatenated above
  • The default Long polling Timeout is 30s, which is stored in the request header long-pulling -Timeout
  • If the Long poll contains the configuration item listening for the first time, set long-pulling – timeout-no-hangup =true in the request header to make the server immediately return the poll result
  • The server /v1/cs/ Configs/Listener interface handles long polling requests
  • ParseUpdateDataIdResponse method parses the server returns a message
// ClientWorker
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    Map<String, String> params = new HashMap<String, String>(2);
    // Spliced service packet key = listening-configs
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    // Long polling timeout is 30s
    headers.put("Long-Pulling-Timeout"."" + timeout);
    // Tell the server that this long poll contains the configuration item that was listened on for the first time, do not hold the request and return immediately
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup"."true");
    }
    // If there is no need to listen
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }

    try {
        // readTimeout = 45s
        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        / / request/v1 / cs/configs/listener
        HttpRestResult<String> result = agent
                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                        readTimeoutMs);

        if (result.ok()) {
            setHealthServer(true);
            // Parse the returned message
            return parseUpdateDataIdResponse(result.getData());
        } else {
            setHealthServer(false); }}catch (Exception e) {
        setHealthServer(false);
        throw e;
    }
    return Collections.emptyList();
}
Copy the code

ParseUpdateDataIdResponse parsing the server returns a message, each message represents a groupKey configuration changes.

private List<String> parseUpdateDataIdResponse(String response) {
    if (StringUtils.isBlank(response)) {
        return Collections.emptyList();
    }
    response = URLDecoder.decode(response, "UTF-8");
    List<String> updateList = new LinkedList<String>();
    // Split by row
    for (String dataIdAndGroup : response.split(LINE_SEPARATOR)) {
        if(! StringUtils.isBlank(dataIdAndGroup)) {// Each line is separated by space and spliced into dataId+ Group +namespace or dataId+ Group
            String[] keyArr = dataIdAndGroup.split(WORD_SEPARATOR);
            String dataId = keyArr[0];
            String group = keyArr[1];
            if (keyArr.length == 2) {
                updateList.add(GroupKey.getKey(dataId, group));
            } else if (keyArr.length == 3) {
                String tenant = keyArr[2];
                updateList.add(GroupKey.getKeyTenant(dataId, group, tenant));
            } else{ LOGGER.error(); }}}return updateList;
}
Copy the code

Validates MD5 changes and triggers listeners

After receiving the changed configuration item returned by the server, the client obtains the corresponding configuration through the **/v1/cs/configs interface and saves the configuration to the local file system as snapshot**. This has been seen in the configuration query section. Both call the ClientWorker#getServerConfig method. Finally, the configuration is updated to the Content field of CacheData.

After completion of the above steps to deal with, through CacheData. CheckListenerMd5 check whether the configuration change, and trigger a listener.

// CacheData.java
// Register listeners on this CacheData configuration
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
// Md5 is configured
private volatile String md5;
void checkListenerMd5(a) {
    for (ManagerListenerWrap wrap : listeners) {
        // Compare md5 in CacheData to the last MD5 in Listener
        if(! md5.equals(wrap.lastCallMd5)) {// If not, the listener is triggeredsafeNotifyListener(dataId, group, content, type, md5, wrap); }}}Copy the code

The safeNotifyListener method is the main logic for notifying listeners. If the Listener has configured its own Executor, the Listener will execute the listening logic in its own thread service. By default, the long polling thread is used to execute the listening logic.

// CacheData.java
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
        final String md5, final ManagerListenerWrap listenerWrap) {
    final Listener listener = listenerWrap.listener;

    Runnable job = new Runnable() {
        @Override
        public void run(a) {
            try {
              	If AbstractSharedListener is used, place dataId and Group in its member variables
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                }

                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                // The hook for the user is ignored
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                // Trigger the receiveConfigInfo method of the listener
                listener.receiveConfigInfo(contentTmp);

                / / if it is AbstractConfigChangeListener instance, triggering receiveConfigChange method
                if (listener instanceof AbstractConfigChangeListener) {
                    Map data = ConfigChangeHandler.getInstance()
                            .parseChangeData(listenerWrap.lastContent, content, type);
                    ConfigChangeEvent event = new ConfigChangeEvent(data);
                    ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                    listenerWrap.lastContent = content;
                }
                // Update the last MD5 value of the listener
                listenerWrap.lastCallMd5 = md5;
            } catch (NacosException ex) {
                LOGGER.error();
            } catch (Throwable t) {
                LOGGER.error();
            } finally{ Thread.currentThread().setContextClassLoader(myClassLoader); }}};try {
        // If the listener is configured with executor, use the configured executor to perform the above tasks
        if (null! = listener.getExecutor()) { listener.getExecutor().execute(job); }else {
            // Otherwise execute directly, i.e. execute in a long polling threadjob.run(); }}catch(Throwable t) { LOGGER.error(); }}Copy the code

Different Listener methods are triggered depending on the type of Listener. If the Listener is a normal Listener, the receiveConfigInfo method is triggered and a String is returned, which is the changed value.

public interface Listener {
    Executor getExecutor(a);
    void receiveConfigInfo(final String configInfo);
}
Copy the code

If it is AbstractConfigChangeListener listener, trigger receiveConfigChange method, get a ConfigChangeEvent.

public abstract class AbstractConfigChangeListener extends AbstractListener {
    public abstract void receiveConfigChange(final ConfigChangeEvent event);
    ReceiveConfigInfo is an empty implementation
    @Override
    public void receiveConfigInfo(final String configInfo) {}}Copy the code

But AbstractConfigChangeListener listening is a prerequisite for, must be the yaml configuration file format or properties format, otherwise it will not trigger the Listener logic! See the parseChangeData method of ConfigChangeHandler, which returns an empty map if no parser is found.

public Map parseChangeData(String oldContent, String newContent, String type) throws IOException {
    for (ConfigChangeParser changeParser : this.parserList) {
        // Determine if the configuration file type can be resolved. Currently, only properties and YAMl are supported
        if (changeParser.isResponsibleFor(type)) {
            returnchangeParser.doParse(oldContent, newContent, type); }}return Collections.emptyMap();
}
Copy the code

The ConfigChangeEvent constructed in the safeNotifyListener part of the logic will not contain any data.

if (listener instanceof AbstractConfigChangeListener) {
    Map data = ConfigChangeHandler.getInstance()
            .parseChangeData(listenerWrap.lastContent, content, type);
    // If the map is empty, the data in the event constructed here is also empty, and the listener is not aware of the configuration change
    ConfigChangeEvent event = new ConfigChangeEvent(data);
    ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
    listenerWrap.lastContent = content;
}
Copy the code
public class ConfigChangeEvent {
    private final Map<String, ConfigChangeItem> data;
    public ConfigChangeEvent(Map<String, ConfigChangeItem> data) {
        this.data = data;
    }
    public ConfigChangeItem getChangeItem(String key) {
        return data.get(key);
    }
    public Collection<ConfigChangeItem> getChangeItems(a) {
        returndata.values(); }}Copy the code

conclusion

  • How to use it: You can use ConfigService to add, delete, modify, query, and listen configurations.
  • Nacos configuration model
    • Namspace (Tenant) : specifies the namespace (Tenant). The default namespace is public. A namespace can contain more than one Group, and in the Nacos source code some variables are called tenants, which are the same thing as namespaces.
    • Group: Group. The default Group is DEFAULT_GROUP. A group can contain more than one dataId.
    • DataId: translates as DataId. In nacos, DataId represents an entire configuration file and is the smallest unit of configuration.
  • One NacOS-server and one namespace correspond to one ConfigService.
  • Nacos client queries the configuration
    • Local configuration is preferred for failover
    • Next, call the server to query /v1/cs/configs
    • The snapshot configuration will be updated each time the server /v1/cs/configs is successfully called to obtain the configuration
  • The Nacos client listens to the configuration and processes it through LongPollingRunnable. For non-failover configuration items, the following figure shows the processing process.