preface

This chapter learns about the Nacos registry version 1.4.1 client.

  • Service registration: How does the client register the service and how does the server perceive the client’s survival
  • Service logout: Reverse operation of service registration
  • Service discovery: Whether to read the registry in the local memory or the real-time registry on the server for service query. If the local memory table is read, how to ensure that the local memory table data is correct? Subscribe to a service. How do I feel the updated list of instances of the corresponding service

I. Use cases

Using the official example directly here, Nacos as a registry for clients meets several requirements:

  • The service registry
  • Service cancellation
  • Service discovery
public class NamingExample {

    public static void main(String[] args) throws NacosException, IOException {
        // Create naming service
        Properties properties = new Properties();
        properties.setProperty("serverAddr"."localhost");
        NamingService naming = NamingFactory.createNamingService(properties);
        // Service registration
        naming.registerInstance("nacos.test.3"."11.11.11.11".8888."TEST1");
        naming.registerInstance("nacos.test.3"."2.2.2.2".9999."DEFAULT");
        // Service discovery: Gets the list of service instances based on the service name
        System.out.println(naming.getAllInstances("nacos.test.3"));
        // Service logout
        naming.deregisterInstance("nacos.test.3"."2.2.2.2".9999."DEFAULT");
        // Service discovery: Gets the list of service instances based on the service name
        System.out.println(naming.getAllInstances("nacos.test.3"));
        // Service discovery: listens for service changes
        Executor executor = new ThreadPoolExecutor(1.1.0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("test-thread");
                        returnthread; }}); naming.subscribe("nacos.test.3".new AbstractEventListener() {
            @Override
            public Executor getExecutor(a) {
                return executor;
            }

            @Override
            public void onEvent(Event event) { System.out.println(((NamingEvent) event).getServiceName()); System.out.println(((NamingEvent) event).getInstances()); }}); }}Copy the code

Second, the model

Namspace (Tenant) : specifies the namespace (Tenant). The default namespace is public. A namespace can contain multiple groups.

Group: Group. The default Group is DEFAULT_GROUP.

Service: indicates the application Service.

Cluster: indicates a Cluster. The DEFAULT Cluster is DEFAULT.

Instance: indicates a service Instance.

The relationship between a Service, a Cluster, and an Instance is expressed as ServiceInfo on the client. The relationship is unique among groups, clusters, and services.

public class ServiceInfo {
    / / service name
    private String name;
    / / group name
    private String groupName;
    // Cluster, comma separated
    private String clusters;
    / / Instance Instance
    private List<Instance> hosts = new ArrayList<Instance>();
}
Copy the code

Instance. The value is unique by group + cluster + service + IP + port.

public class Instance {
    // Unique identifier = Group + Cluster + Service + IP address + port
    private String instanceId;
    private String ip;
    private int port;
    private double weight = 1.0 D;
    private boolean healthy = true;
    private boolean enabled = true;
    // Whether the node is temporary
    private boolean ephemeral = true;
    // Owning cluster
    private String clusterName;
    // Service name = group + service
    private String serviceName;
    / / metadata
    private Map<String, String> metadata = new HashMap<String, String>();
}
Copy the code

Third, NacosNamingService

NacosNamingService, the implementation class of NamingService, NacosNamingService. Implements all the functions required by the naming service.

The following are the NacosNamingService member variables.

public class NacosNamingService implements NamingService {
    // Namespace/tenant default public
    private String namespace;
    // A meta-server like Apollo provides the NACOS cluster address, ignoring the serverList if enabled
    private String endpoint;
    // nacos cluster address list, comma separated, passed in by the user
    private String serverList;
    // The local cache path
    private String cacheDir;
    // Log file name
    private String logName;
    // Service listener/service registry cache/service query
    private HostReactor hostReactor;
    // Heartbeat maintenance
    private BeatReactor beatReactor;
    // Name the service proxy
    private NamingProxy serverProxy;
}
Copy the code

1. Service registration

The service registration logic is in the registerInstance method of NacosNamingService.

// NacosNamingService
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    // For nacOS, serviceName = groupName + @@ + serviceName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 1. If it is a temporary instance, start the heartbeat task
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    // 2. POST /nacos/v1/ns/instance
    serverProxy.registerService(groupedServiceName, groupName, instance);
}
Copy the code

Focus on NamingUtils getGroupedName method, this method according to the grouping groupName and application serviceName generated Nacos actual serviceName service name in the registry. Nacos serviceName =groupName + @@ + serviceName.

public static String getGroupedName(final String serviceName, final String groupName) {
    if (StringUtils.isBlank(serviceName)) {
        throw new IllegalArgumentException("Param 'serviceName' is illegal, serviceName is blank");
    }
    final String resultGroupedName = groupName + Constants.SERVICE_INFO_SPLITER + serviceName;
    return resultGroupedName.intern();
}
Copy the code

Nacos service registration has two large branching logic:

  • Temporary Instance registration. By default, instance.ephemeral =true, the server registry is maintained by the client actively initiating the heartbeat. If no client heartbeat is received within 15 seconds, the server sets the instance to unhealthy. If no client heartbeat is received within 30 seconds, the server removes the instance from the registry. Serve side uses Distro protocol, AP. (See Distro in the next chapter)

  • Persistent Instance registration, instance.ephemeral =false, is an active health check on the client by the server. By default, the client is detected alive via TCP and the Instance is never removed, only marked as unhealthy. The server uses Raft protocol, CP.

Only temporary instance registrations are considered here, which is the default. There are two steps: start heartbeat task and send registration request to Nacos server.

The heartbeat task

The client needs to enable the heartbeat task BeatTask, which is described in the BeatReactor.

Take a look at BeatInfo first. Most of the BeatInfo member variables come from Instance.

// BeatReactor
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
    BeatInfo beatInfo = new BeatInfo();
    beatInfo.setServiceName(groupedServiceName);
    beatInfo.setIp(instance.getIp());
    beatInfo.setPort(instance.getPort());
    beatInfo.setCluster(instance.getClusterName());
    beatInfo.setWeight(instance.getWeight());
    beatInfo.setMetadata(instance.getMetadata());
    beatInfo.setScheduled(false);
    beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
    return beatInfo;
}
Copy the code

BeatReactor’s addBeatInfo method wraps BeatInfo into a BeatTask and submits it to the thread pool to perform a deferred task.

// BeatReactor
// The thread pool used to execute client heartbeat tasks
private final ScheduledExecutorService executorService;
// BeatInfo unique identifier - BeatInfo
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    #1 Heartbeat key
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    // Close an existing heartbeat task
    BeatInfo existBeat = null;
    if((existBeat = dom2Beat.remove(key)) ! =null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    // #2 Delay the task
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
}
Copy the code

Focus on the buildKey method, which constructs a unique identifier for each BeatInfo =groupName + @@ + serviceName + # + IP + # + port.

// BeatReactor
public String buildKey(String serviceName, String ip, int port) {
    return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER + ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;
}
Copy the code

BeatTask Heartbeat task process is as follows.

// BeatReactor
// Name the service proxy
private final NamingProxy serverProxy;
// Whether to include all BeatInfo information when sending heartbeat packets to the server
private boolean lightBeatEnabled = false;
// BeatReactor.BeatTask
class BeatTask implements Runnable {

    BeatInfo beatInfo;

    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }

    @Override
    public void run(a) {
        // 0. IsStopped Stops heartbeat tasks
        if (beatInfo.isStopped()) {
            return;
        }
        long nextTime = beatInfo.getPeriod();
        try {
            // Send a heartbeat request to the server
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            // 1. The server can determine the heartbeat interval of the client
            long interval = result.get("clientBeatInterval").asLong();
            if (interval > 0) {
                nextTime = interval;
            }
            // 2. The server can decide whether the client wants to send all BeatInfo messages
            boolean lightBeatEnabled = false;
            if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
            }
            BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
            
            int code = NamingResponseCode.OK;
            if (result.has(CommonParams.CODE)) {
                code = result.get(CommonParams.CODE).asInt();
            }
            // 3. If the current Instance is not found on the server, try to register it
            if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                Instance instance = new Instance();
                instance.setPort(beatInfo.getPort());
                instance.setIp(beatInfo.getIp());
                instance.setWeight(beatInfo.getWeight());
                instance.setMetadata(beatInfo.getMetadata());
                instance.setClusterName(beatInfo.getCluster());
                instance.setServiceName(beatInfo.getServiceName());
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(true);
                try {
                    serverProxy.registerService(beatInfo.getServiceName(),
                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                } catch (Exception ignore) {
                }
            }
        } catch (NacosException ex) {
            NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                    JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

        }
        4. Submit the next heartbeat task
        executorService.schedule(newBeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); }}Copy the code

Focus on a few points.

Heartbeat interval

BeatInfo. Period determines the default client heartbeat interval, the default interval is 5 s, comes from the Instance of the metadata, the key is preserved. Heart beat. The interval.

// Instance
private Map<String, String> metadata = new HashMap<String, String>();
public long getInstanceHeartBeatInterval(a) {
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, TimeUnit.SECONDS.toMillis(5));
}
private long getMetaDataByKeyWithDefault(final String key, final long defaultValue) {
    if (getMetadata() == null || getMetadata().isEmpty()) {
      return defaultValue;
    }
    final String value = getMetadata().get(key);
    if(! StringUtils.isEmpty(value) && value.matches(NUMBER_PATTERN)) {return Long.parseLong(value);
    }
    return defaultValue;
}
Copy the code

In addition, clientBeatInterval in the server heartbeat response packet can specify the client heartbeat frequency.

// BeatReactor.BeatTask
// 1. The server can determine the heartbeat interval of the client
long interval = result.get("clientBeatInterval").asLong();
if (interval > 0) {
  nextTime = interval;
}
Copy the code

Heart rate request

NamingProxy is a naming service proxy that encapsulates methods that request the Nacos server. The heartbeat request message simply tells the Nacos server which service has added an instance corresponding to which IP :port. Request address PUT/nacos/v1 / ns/instance/beat.

// NamingProxy
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if(! lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}
Copy the code

Note that the lightBeatEnabled parameter controls whether all BeatInfo needs to be sent in the request body. This lightBeatEnabled parameter can be controlled by the heartbeat response returned by the server, which is false by default and does not send the full BeatInfo.

// BeatReactor.BeatTask
// 2. The server can decide whether the client wants to send all BeatInfo messages
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
  lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
Copy the code

The server cannot find the client instance. How to handle the heartbeat

If the server does not find the corresponding client instance in the registry when the client sends heartbeat messages to the server, the client performs a special processing based on the server response status code RESOURCE_NOT_FOUND (20404) and attempts to send a registration request to the server.

// BeatReactor.BeatTask
// 3. If the current Instance is not found on the server, try to register it
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
    Instance instance = new Instance();
    instance.setPort(beatInfo.getPort());
    instance.setIp(beatInfo.getIp());
    instance.setWeight(beatInfo.getWeight());
    instance.setMetadata(beatInfo.getMetadata());
    instance.setClusterName(beatInfo.getCluster());
    instance.setServiceName(beatInfo.getServiceName());
    instance.setInstanceId(instance.getInstanceId());
    instance.setEphemeral(true);
    try {
        serverProxy.registerService(beatInfo.getServiceName(),
                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
    } catch (Exception ignore) {
    }
}
Copy the code

The service registry

NacosNamingService sends a registration request to the Nacos server and calls the registerService method of NamingProxy directly.

ServiceName = Nacos serviceName = groupName + @@ + serviceName.

// NamingProxy
private final String namespaceId;
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    final Map<String, String> params = new HashMap<String, String>(16);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
    // /nacos/v1/ns/instance
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}
Copy the code

Request POST /nacos/v1/ns/ Instance with data from Instance.

2. Service logout

The deregisterInstance method of NacosNamingService is responsible for deregistering the service, which is the reverse operation of service registration. It cancels the heartbeat task and then invokes the server to deregister the service.

// NacosNamingService
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    1. Cancel the heartbeat task
    if (instance.isEphemeral()) {
        beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(),
                instance.getPort());
    }
    // 2. Invoke server logout
    serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
}
Copy the code

The BeatReactor removes BeatInfo and sets the BeatInfo property to False to stop submitted Beattasks.

// BeatReactor
// BeatInfo unique identifier - BeatInfo
// Unique identifier of BeatInfo =groupName + @@ + serviceName + # + IP + # + port
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
public void removeBeatInfo(String serviceName, String ip, int port) {
    BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port));
    if (beatInfo == null) {
        return;
    }
    beatInfo.setStopped(true);
}
Copy the code

The deregisterService method of NamingProxy calls DELETE /nacos/v1/ns/instance.

// NamingProxy
public void deregisterService(String serviceName, Instance instance) throws NacosException {
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));

    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.DELETE);
}
Copy the code

3. Service discovery

Service query/subscription

The getAllInstances method of NacosNamingService is used as an example to obtain all service instances. Subscribe determines the logic according to the input parameter subscribe. When SUBSCRIBE is true, it indicates that the service needs to subscribe, and the query logic will be stored at three layers. When SUBSCRIBE is false, request server to fetch real-time registry directly. Only the former is concerned here, because the former contains the logic of the latter. All query logic is handled by HostReactor.

// NacosNamingService
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    if (subscribe) {
        // subscribe=true, go through three levels of storage query, subscribe service
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                StringUtils.join(clusters, ","));
    } else {
        // subscribe=false to get the server registry directly from NamingProxy
        serviceInfo = hostReactor
                .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                        StringUtils.join(clusters, ","));
    }
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}
Copy the code

HostReactor has three maps:

// HostReactor
// Service update task
private finalMap<String, ScheduledFuture<? >> futureMap =newHashMap<String, ScheduledFuture<? > > ();// Service registry
private final Map<String, ServiceInfo> serviceInfoMap;
// Service update table
private final Map<String, Object> updatingMap;
Copy the code
  • ServiceInfoMap: indicates the mapping between the unique identifier of a service and ServiceInfo. Unique service IDENTIFIER =groupName+@@+serviceName+@@+clusterName. The clusterName field can be empty.
  • UpdatingMap: Stores the unique identifier of the service performing the update operation.
  • FutureMap: The storage service uniquely identifies the mapping to the service update Future.

// HostReactor
private final FailoverReactor failoverReactor;// Failover processor
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    // 1. Determine whether the current state is in failover state first, which depends on the local configuration file
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 2. Query ServiceInfo in serviceInfoMap
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    if (null == serviceObj) {
        // 2-1. If the memory does not exist, send a query message to the server
        serviceObj = new ServiceInfo(serviceName, clusters);
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
				/ / update the ServiceInfo
        updatingMap.put(serviceName, new Object());
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);
    } else if (updatingMap.containsKey(serviceName)) {
        If the service in the registry is being updated, wait for the update
        if (UPDATE_HOLD_INTERVAL > 0) {
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch(InterruptedException e) { NAMING_LOGGER.error(); }}}}// 3. Submit a task and update serviceInfo periodically
    scheduleUpdateIfAbsent(serviceName, clusters);
    // 4. Return serviceInfo in memory map
    return serviceInfoMap.get(serviceObj.getKey());
}
Copy the code

The entire query process is shown in the above code. Note several key points when you look at failover later.

Memory registry

The Service registry serviceInfoMap is stored in HostReactor. So where is the data source for the memory registry?

// HostReactor
// Service registry
private final Map<String, ServiceInfo> serviceInfoMap;
Copy the code

Source 1: Initial update

For the first query of a Service, NamingProxy is invoked to query the server result in real time and update the HostReactor memory registry.

// HostReactor.getServiceInfo
/ / update the ServiceInfo
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
Copy the code

Immediately update logic is as follows, mainly is to call the GET/nacos/v1 / ns/instance/list, then update processServiceJson in-memory registry.

// HostReactor
private void updateServiceNow(String serviceName, String clusters) {
    try {
        updateService(serviceName, clusters);
    } catch (NacosException e) {
        NAMING_LOGGER.error("[NA] failed to update serviceName: "+ serviceName, e); }}public void updateService(String serviceName, String clusters) throws NacosException {
	// 1. Obtain the old ServiceInfo
	ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
  try {
    // 2. GET /nacos/v1/ns/instance/list
    String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
    // 3. Update registry
    if(StringUtils.isNotEmpty(result)) { processServiceJson(result); }}finally {
    // 4. Notify threads waiting on old serviceInfo (waiting because service exists in updatingMap)
    if(oldService ! =null) {
      synchronized(oldService) { oldService.notifyAll(); }}}}Copy the code

The logic for processServiceJson is as follows, updating the registry and writing the latest ServiceInfo to the local disk.

// HostReactor
public ServiceInfo processServiceJson(String json) {
    ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    boolean changed = false;
    // If ServiceInfo already exists, compare the new ServiceInfo with the old ServiceInfo
    if(oldService ! =null) {
        // Update the registry
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        / /... Omit n more logic
        Set<Instance> modHosts = new HashSet<Instance>();
        Set<Instance> newHosts = new HashSet<Instance>();
        Set<Instance> remvHosts = new HashSet<Instance>();
        if (newHosts.size() > 0) {
            changed = true;
        }
        if (remvHosts.size() > 0) {
            changed = true;
        }
        if (modHosts.size() > 0) {
            changed = true;
            // If instance corresponds to a heartbeat task, update the BeatInfo of the heartbeat task
            updateBeatInfo(modHosts);
        }
        serviceInfo.setJsonFromServer(json);
        // Issue InstancesChangeEvent to write serviceInfo to the local disk as a failover
        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
            NotifyCenter.publishEvent(newInstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); }}else {
        // If ServiceInfo does not exist
        changed = true;
        // Update the registry, publish InstancesChangeEvent, and write serviceInfo to the local disk for failover
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        serviceInfo.setJsonFromServer(json);
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}
Copy the code

Source 2: Update regularly

If the futureMap does not contain a Future update task for the service, submit a task to update the service registry.

// HostReactor.getServiceInfo
// 3. Submit a task to update serviceInfo asynchronously
scheduleUpdateIfAbsent(serviceName, clusters);
Copy the code
// HostReactor
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    if(futureMap.get(ServiceInfo.getKey(serviceName, clusters)) ! =null) {
      return;
    }
    synchronized (futureMap) {
      if(futureMap.get(ServiceInfo.getKey(serviceName, clusters)) ! =null) {
        return; } ScheduledFuture<? > future = addTask(newUpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); }}public synchronizedScheduledFuture<? > addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
Copy the code

The focus is on the logic of update Ask. Once a serivce query is executed, the UpdateTask task is executed continuously, and the server controls the scheduled task interval for 10s.

public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private final String clusters;
        private final String serviceName;
        private int failCount = 0;
        @Override
        public void run(a) {
            // The delay for the next service registry pull is 1s by default and 10s by the server
            long delayTime = DEFAULT_DELAY;
            try {
                // updateService Updates the service registry
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                if (serviceObj == null) {
                    // Get the real-time registry from the server and update it to memory
                    updateService(serviceName, clusters);
                    return;
                }
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    // Get the real-time registry from the server and update it to memory
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    refreshOnly(serviceName, clusters);
                }
                lastRefTime = serviceObj.getLastRefTime();
                if(! notifier.isSubscribed(serviceName, clusters) && ! futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) {return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }
                // Use the delay time defined by the server
                delayTime = serviceObj.getCacheMillis();
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
                // Submit the deferred task to perform the next pull service registry
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); }}}Copy the code

Source 3: Server push

When HostReactor is constructed, a PushReceiver is created.

public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart,
        boolean pushEmptyProtection, int pollingThreadCount) {
    this.pushReceiver = new PushReceiver(this);
    // ...
}
Copy the code

PushReceiver handles the ServiceInfo message pushed by the server. DatagramSocket indicates that the server is pushing UDP to the client.

public class PushReceiver implements Runnable.Closeable {
    private ScheduledExecutorService executorService;
    private DatagramSocket udpSocket;
    private HostReactor hostReactor;
    private volatile boolean closed = false;
    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            this.udpSocket = new DatagramSocket();
            this.executorService = new ScheduledThreadPoolExecutor(1.new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    returnthread; }});this.executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e); }}}Copy the code

PushReceiver implements the logic of the Runnable interface and finds that HostReactor’s processServiceJson method is still called to parse the message and update the registry.

@Override
public void run(a) {
    while(! closed) {try {
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            // Waiting for the server to push...
            udpSocket.receive(packet);
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                // HostReactor processes packets to update the memory registry
                hostReactor.processServiceJson(pushPacket.data);
                // send ack to server
                ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                        + \ \ "}";
            } else if ("dump".equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\" " + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                        + "\"}";
            } else {
                // do nothing send ack only
                // ...
            }
            // Send an ACK packet to the server
            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress()));
        } catch (Exception e) {
            if (closed) {
                return;
            }
            NAMING_LOGGER.error("[NA] error while receiving push data", e); }}}Copy the code

To summarize the update of the client registry, there are two main ways: push and pull:

  • Pull: Registry updates are triggered by querying services. Each service updates its registry every 10 seconds with an UpdateTask
  • Push: The server pushes registry information to the client over UDP

failover

The FailoverReactor is responsible for the failover discovered by the client service. Nacos allows clients to use registries on local disks to load the registries into memory if failover is enabled on the disks.

Both the loading switch and the failover registry are implemented through scheduled tasks, which are submitted during the construction of the Failover reactor.

public class FailoverReactor implements Closeable {
		// The local cache path
    private final String failoverDir;
    private final HostReactor hostReactor;
    private final ScheduledExecutorService executorService;
    // Failover memory registry
 		// key=cluster+group+service
    private Map<String, ServiceInfo> serviceMap = new ConcurrentHashMap<String, ServiceInfo>();
    public FailoverReactor(HostReactor hostReactor, String cacheDir) {
        this.hostReactor = hostReactor;
        this.failoverDir = cacheDir + "/failover";
        this.executorService = new ScheduledThreadPoolExecutor(1.new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.failover");
                returnthread; }});// Initialize method
        this.init(); }}Copy the code

Failover switch

The SwitchRefresher handles the refresh switch logic.

public void init(a) {
    // Refresh the fail-over switch according to the local file configuration
    executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L.5000L, TimeUnit.MILLISECONDS);
}
Copy the code

SwitchRefresher Determines whether failover is enabled based on the content of the 00-00– 000-vipsrv_failover_switch-000 –00-00 file in cacheDir. The value of the file 1 indicates that failover is enabled. The value of the file 0 indicates that failover is disabled. If this function is enabled, FailoverFileReader loads the registry on disk to the memory serviceMap.

class SwitchRefresher implements Runnable {

    long lastModifiedMillis = 0L;

    @Override
    public void run(a) {
        try {
            // cacheDir/00-00---000-VIPSRV_FAILOVER_SWITCH-000---00-00
            File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
            if(! switchFile.exists()) { switchParams.put("failover-mode"."false");
                return;
            }
            long modified = switchFile.lastModified();
            if (lastModifiedMillis < modified) {
                lastModifiedMillis = modified;
                String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
                        Charset.defaultCharset().toString());
                if(! StringUtils.isEmpty(failover)) { String[] lines = failover.split(DiskCache.getLineSeparator());for (String line : lines) {
                        String line1 = line.trim();
                        if ("1".equals(line1)) {
                            switchParams.put("failover-mode"."true");
                            // If failover is enabled, load the registry on the disk to the memory
                            new FailoverFileReader().run();
                        } else if ("0".equals(line1)) {
                            switchParams.put("failover-mode"."false"); }}}else {
                    switchParams.put("failover-mode"."false"); }}}catch (Throwable e) {
            NAMING_LOGGER.error("[NA] failed to read failover switch.", e); }}}Copy the code

The memory registry fails

DiskFileWriter is responsible for removing the memory registry from the disk.

public void init(a) {
    // ...
    // Write the service registry in memory to the local disk once a day
    executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);

    // 10 seconds later, the service registry in the memory is written to the local disk
    executorService.schedule(new Runnable() {
        @Override
        public void run(a) {
           // ...
           new DiskFileWriter().run();
           // ...}},10000L, TimeUnit.MILLISECONDS);
}
Copy the code

DiskFileWriter Uses serviceInfoMap in HostReactor to drop a disk.

class DiskFileWriter extends TimerTask {
    @Override
    public void run(a) {
        Map<String, ServiceInfo> map = hostReactor.getServiceInfoMap();
        for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
            ServiceInfo serviceInfo = entry.getValue();
            / /... Omit some filtering logicDiskCache.write(serviceInfo, failoverDir); }}}Copy the code

Subscribe or query

The user code calls the getAllInstances method of NacosNamingService. If SUBSCRIBE =true, it follows the subscribe logic. Subscribe =false, follow the query logic.

// NacosNamingService
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
        boolean subscribe) throws NacosException {
    if (subscribe) {
        // subscribe=true, go through three levels of storage query, subscribe service
    } else {
        // subscribe=false to get the server registry directly from NamingProxy}}Copy the code

To subscribe to and query calls are actually GET/nacos/v1 / ns/instance/list, the difference is that in the subscription request udpPort parameters, with the client UDP port number, and query request, the UDP port number is zero, this server will see in the next chapter.

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));
    return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
Copy the code

Service to monitor

The service subscription is to update the client’s memory registry.

In addition, user code can implement its own business logic by listening for service registry changes using the SUBSCRIBE method of NacosNamingService.

// Example
nacosNamingService.subscribe("nacos.test.3".new AbstractEventListener() {
    @Override
    public Executor getExecutor(a) {
        return executor;
    }
    @Override
    public void onEvent(Event event) { System.out.println(((NamingEvent) event).getServiceName()); System.out.println(((NamingEvent) event).getInstances()); }});Copy the code

NacosNamingService internally delegates to HostReactor.

// NacosNamingService
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","),
            listener);
}
Copy the code

HostReactor first calls InstancesChangeNotifier to register the listener, then executes a getServiceInfo query (described above) to ensure that the registry information is initialized and the scheduled task pull registry is enabled.

// HostReactor
public void subscribe(String serviceName, String clusters, EventListener eventListener) {
    // 1. InstancesChangeNotifier registers listeners
    notifier.registerListener(serviceName, clusters, eventListener);
    // 2. Query ServiceInfo
    getServiceInfo(serviceName, clusters);
}
Copy the code

InstancesChangeNotifier

InstancesChangeNotifier is responsible for registering listeners and callback listeners.

The registerListener method stores the service identifier and corresponding listener into the map.

public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
    // Listen on the registry
    // service Uniquely identifies groupName+@@+serviceName+@@+clusterName - Listener
    private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();

    private final Object lock = new Object();

    public void registerListener(String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        if (eventListeners == null) {
            synchronized (lock) {
                eventListeners = listenerMap.get(key);
                if (eventListeners == null) {
                    eventListeners = newConcurrentHashSet<EventListener>(); listenerMap.put(key, eventListeners); } } } eventListeners.add(listener); }}Copy the code

In addition, InstancesChangeNotifier implements the Subscriber interface, which processes InstancesChangeEvent service instance change events.

// InstancesChangeNotifier
@Override
public void onEvent(InstancesChangeEvent event) {
    String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters());
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (CollectionUtils.isEmpty(eventListeners)) {
        return;
    }
    for (final EventListener listener : eventListeners) {
        final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
        if (listener instanceofAbstractEventListener && ((AbstractEventListener) listener).getExecutor() ! =null) {
            ((AbstractEventListener) listener).getExecutor().execute(new Runnable() {
                @Override
                public void run(a) { listener.onEvent(namingEvent); }});continue; } listener.onEvent(namingEvent); }}Copy the code

When the memory registry is updated above, the service instance change event is triggered.

HostReactor#processServiceJson updates the registry.

// HostReactor
public ServiceInfo processServiceJson(String json) {
    ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
    ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
    boolean changed = false;
    // If ServiceInfo already exists, compare the new ServiceInfo with the old ServiceInfo
    if(oldService ! =null) {
        // ...
        // Issue InstancesChangeEvent to write serviceInfo to the local disk as a failover
        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
            NotifyCenter.publishEvent(newInstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); }}else {
        // If ServiceInfo does not exist
        changed = true;
        // Update the registry, publish InstancesChangeEvent, and write serviceInfo to the local disk for failover
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
        serviceInfo.setJsonFromServer(json);
        DiskCache.write(serviceInfo, cacheDir);
    }
    return serviceInfo;
}
Copy the code

conclusion

  • The service registry

    For temporary instances (the default), instance.ephemeral =true, the client registers its Instance information with the server via POST /nacos/v1/ns/ Instance.

    Since then, the client default every 5 s (instance metadata preserved. Heart. Beat. Interval) to the server by heartbeat request PUT/nacos/v1 / ns/instance/beat.

    If the server returns a RESOURCE_NOT_FOUND (20404) exception during heartbeat sending, the Instance has not been registered. In this case, the client initiates a registration request to the server during heartbeat sending.

  • Service cancellation

    The reverse operation of service registration will first cancel the scheduled heartbeat task, and then call server DELETE /nacos/v1/ns/instance to unregister the current instance from the service list.

  • Service discovery

    For the service query, the user can either subscribe logic or real-time query logic, depending on the fourth input of the getAllInstances method of NacosNamingService, SUBSCRIBE =true means that the service subscription process, Subscribe =false indicates to directly query the latest service registry of the server without going through the service subscription process. The former contains all the logic of the latter.

    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) 
    Copy the code

    The service subscription process is as follows:

    1. When failover is enabled, the registry in the local file system is read and loaded to the serviceMap variable of the FailoverReactor. During the query, the FailoverReactor’s serviceMap is read to obtain the service information.
    2. Generally, when failover is disabled, HostReactor. ServiceMap memory registry is read preferentially.
    3. If the service is not read in HostreActor.Servicemap, the Nacos server is requested. On the one hand, the service registration information is obtained and the hostreActor. serviceMap memory registry is updated. On the other hand, the query request informs the server of the UDP port started locally and informs the server to subscribe to the service.
    4. For each subscription service, the client ensures that the service UpdateTask UpdateTask is enabled to periodically request the server to obtain the latest registry. By default, the client pulls data once every second. However, the server returns a packet to control the client to pull data once every 10 seconds.

    The memory registry is updated in three scenarios:

    1. First service query, call the GET/nacos/v1 / ns/instance/list for real-time registry, updates to the local memory registry
    2. Query service will trigger the registry update, each service will correspond to a UpdateTask every 10 s call GET/nacos/v1 / ns/instance/list for real-time registry, the corresponding registry update service
    3. The server pushes registry information to the client over UDP

    Aware of service changes: InstancesChangeEvent is published when the memory registry changes. The InstancesChangeNotifier handles this event, notifying all client code that is registered to listen through the Subscribe method of NamingService.