In the previous two articles, I worked with you on the basic concepts of Nacos and persisting the configuration into MySQL with Nacos as the configuration hub. In this article, we will learn about Nacos as a service registry.

Environment introduction:

  • Jdk 1.8
  • Nacos server -- 1.4.2
  • Spring - the boot - 2.3.5. RELEASE
  • spring-cloud-Hoxton.SR8
  • Spring - cloiud alibab -- 2.2.5. RELEASE

Nacos service architecture

Spring-boot is used as the service foundation to build the platform. The position of Nacos in the service architecture is shown in the following figure:

In general, middleware with similar functions to Nacos include Eureka, Zookeeper, Consul, Etcd, etc. One of the biggest features of Nacos is its ability to support both AP and CP patterns, using Raft protocol for partition consistency.

Nacos client

Let’s take the spring-cloud-starter-Alibaba-Nacos-Discovery dependency as an example

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
Copy the code

It is a standard spring-Cloud plug-in, and we can start by looking at the spring.Factories file to find its startup configuration classes

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\ com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
Copy the code

We usually see “middleware” + AutoConfiguration at the beginning of class, thus I can locate the NacosDiscoveryAutoConfiguration as we look at class.

Service registration client

Add the dependent

Nacos service registration is initiated by the client and completed during Spring startup using the extension method provided by Spring. First we need to import the spring-cloud-starter-Alibaba-nacos-Discovery dependency

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
Copy the code

Analysis of the source code

For the spring-boot component we first look for the meta-INF/spring.Factories file

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\ com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
Copy the code

Through the analysis of I found NacosServiceRegistryAutoConfiguration is the core of our service registry configuration class, the class defines three core Bean object:

  • NacosServiceRegistry
  • NacosRegistration
  • NacosAutoServiceRegistration

NacosAutoServiceRegistration

NacosAutoServiceRegistration realize the function of service for registered Nacos, AbstractAutoServiceRegistration it inherits from the abstract class.

In abstract classes AbstractAutoServiceRegistration ApplicationContextAware, ApplicationListener < WebServerInitializedEvent > interface. The onApplicationEvent method is called when the container is started and the context is ready

public void onApplicationEvent(WebServerInitializedEvent event) {
   bind(event);
}
Copy the code

The bind(event) method is actually called

public void bind(WebServerInitializedEvent event) {
   ApplicationContext context = event.getApplicationContext();
   if (context instanceof ConfigurableWebServerApplicationContext) {
      if ("management".equals(((ConfigurableWebServerApplicationContext) context)
            .getServerNamespace())) {
         return; }}this.port.compareAndSet(0, event.getWebServer().getPort());
   this.start();
}
Copy the code

The start() method is then called

public void start(a) {
	if(! isEnabled()) {if (logger.isDebugEnabled()) {
			logger.debug("Discovery Lifecycle disabled. Not starting");
		}
		return;
	}

	// only initialize if nonSecurePort is greater than 0 and it isn't already running
	// because of containerPortInitializer below
	if (!this.running.get()) {
		this.context.publishEvent(
				new InstancePreRegisteredEvent(this, getRegistration()));
		register();
		if (shouldRegisterManagement()) {
			registerManagement();
		}
		this.context.publishEvent(
				new InstanceRegisteredEvent<>(this, getConfiguration()));
		this.running.compareAndSet(false.true); }}Copy the code

Finally, register() is called; Internally call the Serviceregistry.register () method to complete the service registration.

private final ServiceRegistry<R> serviceRegistry;

protected void register(a) {
   this.serviceRegistry.register(getRegistration());
}
Copy the code

NacosServiceRegistry

The main purpose of the NacosServiceRegistry class is to

public void register(Registration registration) {

   if (StringUtils.isEmpty(registration.getServiceId())) {
      log.warn("No service to register for nacos client...");
      return;
   }
	 / / by default, will return by reflecting a ` com. Alibaba. Nacos. Client. Naming. NacosNamingService ` instance
   NamingService namingService = namingService();
   // Get the serviceId. The default configuration is Spring.application.name
   String serviceId = registration.getServiceId();
   // Get group, DEFAULT_GROUP by default
   String group = nacosDiscoveryProperties.getGroup();

   // Create instance instance
   Instance instance = getNacosInstanceFromRegistration(registration);

   try {
      // Register an instance
      namingService.registerInstance(serviceId, group, instance);
      log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
            instance.getIp(), instance.getPort());
   }
   catch (Exception e) {
      log.error("nacos registry, {} register failed... {},", serviceId,
            registration.toString(), e);
      // rethrow a RuntimeException if the registration is failed.
      // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132rethrowRuntimeException(e); }}Copy the code

We can see the call is namingService registerInstance (serviceId, group, and the instance); Methods.

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    serverProxy.registerService(groupedServiceName, groupName, instance);
}
Copy the code

Then call serverProxy. RegisterService (groupedServiceName, groupName, instance); The beatreactor.addBeatInfo () method creates a schedule to send heartbeat data to the server every 5s

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
            instance);
    
    final Map<String, String> params = new HashMap<String, String>(16);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
    
    // POST: /nacos/v1/ns/instance for service registration
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    
}
Copy the code

The service registers the server

Nacos as a service registry can implement both AP and CP architecture. To maintain the service list of our service center. Here is a simple data model for our list of services:

In fact, it is the same process as nacosService registry to build Instance instances. Returning to our source code analysis, let’s look directly at the server side /nacos/v1/ns/instance interface, which is defined in the InstanceController#register method.

The service registry

In InstanceController# register method, the main is to parse the request parameters and then call serviceManager. RegisterInstance, if return ok registered successfully.

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    final Instance instance = parseInstance(request);
    
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}
Copy the code

Call to the registerInstance method

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    
    Service service = getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
Copy the code

Call the addInstance() method

@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    
    Service service = getService(namespaceId, serviceName);
    
    synchronized (service) {
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        Instances instances = newInstances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); }}Copy the code

Call consistencyService. Put (key, the instances); Refresh all instances in service. Through the definition of consistencyService can we know whether it will call the DelegateConsistencyServiceImpl class put method. There is an AP/CP mode option at this point that we can go through

@Override
public void put(String key, Record value) throws NacosException {
    mapConsistencyService(key).put(key, value);
}

// Selection of Distro or CP mode: AP mode uses Distro protocol while CP mode uses Raft protocol.
private ConsistencyService mapConsistencyService(String key) {
    return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
Copy the code

AP mode

Nacos defaults to the AP mode implemented using Distro protocol. Implementation of the interface is EphemeralConsistencyService persistence of node information is mainly call the put method

@Override
public void put(String key, Record value) throws NacosException {
    // Data persistence
    onPut(key, value);
    // Notify other service nodes
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}
Copy the code

Call doPut to save data and send notifications

public void onPut(String key, Record value) {
    
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        // Data persistence
        dataStore.put(key, datum);
    }
    
    if(! listeners.containsKey(key)) {return;
    }
    
    notifier.addTask(key, DataOperation.CHANGE);
}
Copy the code

Tasks. Offer (Pair. With (datumKey, action)); Puts registered instance information into the blocking queue Tasks. The Notifier#run method is used for asynchronous operations to ensure efficiency

public class Notifier implements Runnable {
    
    @Override
    public void run(a) {
        Loggers.DISTRO.info("distro notifier started");
        
        for(; ;) {try {
                Pair<String, DataOperation> pair = tasks.take();
                handle(pair);
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); }}}private void handle(Pair<String, DataOperation> pair) {
 				// Omit some code
            for (RecordListener listener : listeners.get(datumKey)) {
                count++;
                try {
                    if (action == DataOperation.CHANGE) {
                        listener.onChange(datumKey, dataStore.get(datumKey).value);
                        continue;
                    }
                    if (action == DataOperation.DELETE) {
                        listener.onDelete(datumKey);
                        continue; }}catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); }}}}Copy the code

OnChange (datumKey, datastore.get (datumKey).value); Our listener is actually our Service object.

public void onChange(String key, Instances value) throws Exception {
    
    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    
    for (Instance instance : value.getInstanceList()) {
        
        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }
        
        if (instance.getWeight() > 10000.0 D) {
            instance.setWeight(10000.0 D);
        }
        
        if (instance.getWeight() < 0.01 D && instance.getWeight() > 0.0 D) {
            instance.setWeight(0.01 D);
        }
    }
    
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    
    recalculateChecksum();
}
Copy the code

The updateIPs method updates the service instance information to memory in the registry and notifies subscribers of the current service over UDP.

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }
            
            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
            
            if(! clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }
            
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
            
            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: "+ instance, e); }}for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        // Update the list of services
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }
    
    setLastModifiedMillis(System.currentTimeMillis());
    // Push service subscriber messages
    getPushService().serviceChanged(this);
    StringBuilder stringBuilder = new StringBuilder();
    
    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    }
    
    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
            stringBuilder.toString());
    
}
Copy the code

The CP mode

Nacos is implemented using Raft protocol using CP mode by default. The implementation class is PersistentConsistencyServiceDelegateImpl

So first let’s look at his put method

public void put(String key, Record value) throws NacosException {
    checkIsStopWork();
    try {
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft put failed.", e);
        throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:"+ value, e); }}Copy the code

Call raftCore. SignalPublish (key, value); The main steps are as follows

  • Determine whether it is the Leader node. If not, the Leader node forwards the request to the Leader node for processing.
  • If it is the Leader node, it is executed firstonPublish(datum, peers.local());Method, inside first throughraftStore.updateTerm(local.term.get());Method is persisted to a file and then passedNotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());Asynchronously update to memory;
  • A half-length mechanism is implemented through CountDownLatchnew CountDownLatch(peers.majorityCount())Success is returned only if the successful node is greater than N/2 + 1.
  • To call other Nacos nodes/raft/datum/commitExample Synchronize instance information.
public void signalPublish(String key, Record value) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    if(! isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
        
        final RaftPeer leader = getLeader();
        
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }
    
    OPERATE_LOCK.lock();
    try {
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }
        
        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
        
        onPublish(datum, peers.local());
        
        final String content = json.toString();
        
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        for (final String server : peers.allServersIncludeMyself()) {
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            final String url = buildUrl(server, API_ON_PUB);
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if(! result.ok()) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                        datum.key, server, result.getCode());
                        return;
                    }
                    latch.countDown();
                }
                
                @Override
                public void onError(Throwable throwable) {
                    Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                }
                
                @Override
                public void onCancel(a) {}}); }if(! latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {// only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }
        
        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally{ OPERATE_LOCK.unlock(); }}Copy the code

Determine the AP mode or CP mode

If the client node registering NACOS is ephemeral=true, then the effect of the NACOS cluster on the client node is distro’s adoption of AP, When the client node registering nacOS is ephemeral=false, the effect of the NACOS cluster on this node is to adopt RAFT for CP. According to the attributes of the client during registration, the AP and CP are mixed at the same time, but the effect is different for different client nodes

Nacos source debugging

Nacos startup file

First we need to find the startup class of Nacos, first we need to find the jar to start.

Then we unzip target/nacos-server.jar

Decompression command:

#Extract the jar package
tar -zxvf nacos-server.jar

#Check out the manifest.mf contentCat meta-INF/manifest.mf manifest-version: 1.0 implementation-title: nacos-console 1.4.2 implementation-version: 1.4.2 Archiver-version: Plexus Archiver built-in: Xiweng. Yy Spring-boot-layer-index: BOOT-INF/layers.idx Specification-Vendor: Alibaba Group Specification-Title: Nacos-console 1.4.2 implementation-vendor-id: com.alibaba. Nacos spring-boot-version: 2.5.0 -rc1 implementation-vendor: Alibaba Group Main-Class: org.springframework.boot.loader.PropertiesLauncher Spring-Boot-Classpath-Index: BOOT-INF/classpath.idx Start-Class: com.alibaba.nacos.Nacos Spring-Boot-Classes: BOOT-INF/classes/ Spring-Boot-Lib: Boots-inf /lib/ created-by: Apache Maven 3.6.3 build-JDK: 1.8.0_231 Specification-Version: 1.4.2Copy the code

This Class is the startup Class of the spring-boot project, com.baba.nacos.nacos

Nacos debugging

Through the startup class of com.alibaba.nacos.Nacos, we can start and debug in Idea through this class.

Refer to the link

nacos.io

Github.com/alibaba/nac…