preface

In this chapter, we learn about the registry for VERSION 2.0 of nacOS and compare it with version 1.4.1.

Nacos source code (seven) 1.4.1 Registry client

Nacos source (8) 1.4.1 Registry server

1. Model change

The client

For the client to call the API doesn’t change, named after the service implementation class is still a NacosNamingService, service Instance is still com. Alibaba. Nacos. API. Naming. Pojo. The Instance.

public class Instance implements Serializable {
    private String instanceId;
    private String ip;
    private int port;
    private double weight = 1.0 D;
    private boolean healthy = true;
    private boolean enabled = true;
    private boolean ephemeral = true;
    private String clusterName;
    private String serviceName;
    private Map<String, String> metadata = new HashMap<String, String>();
}
Copy the code

ServiceInfo remains the same. A group + service + cluster corresponds to a group of Instances.

public class ServiceInfo {
    / / service name
    private String name;
    / / group name
    private String groupName;
    // Cluster, comma separated
    private String clusters;
    / / Instance Instance
    private List<Instance> hosts = new ArrayList<Instance>();
}
Copy the code

The container of the storage service registry is changed from HostReactor to ServiceInfoHolder, which is responsible for managing the failover registry and memory registry. 1. The HostReactor logic for calling the remote interface is put into NamingClientProxy.

public class ServiceInfoHolder implements Closeable {
    // key=groupName@@serviceName@@clusterName
    private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;

    private final FailoverReactor failoverReactor;
}
Copy the code

2.0ServiceInfoHolder has simple logic.

// ServiceInfoHolder
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    String key = ServiceInfo.getKey(groupedServiceName, clusters);
    // 1. If failover is enabled, obtain the failover registry
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 2. Memory registry
    return serviceInfoMap.get(key);
}
Copy the code

The service side

model

The server uses Service, Instance, Client, Connection instead of Service, Cluster, and Instance.

2.0 Service using com. Alibaba. Nacos. Naming. Core. V2. Pojo. Service.

public class Service {
    private final String namespace;
    private final String group;
    private final String name;
    private final boolean ephemeral;
}
Copy the code

2.0Service differs from 1.x:

  • The concepts of temporary and persistent instances float up to services, depending on whether the first Instance registered is temporary.
  • 1. X uses Service to manage the following Cluster. 2.0Service is irrelevant to the Cluster, and the Cluster sinks to Instance.
  • 1. X service. name=groupName @@servicename, 2.0 separate groupName from serviceName.

2.0 hosting Service container is still a ServiceManager, but in com. Alibaba. Nacos. Naming. Core. The v2 package, container Service is a singleton.

public class ServiceManager {
    private static final ServiceManager INSTANCE = new ServiceManager();
    // Singleton Service, see equals and hasCode methods of Service
    private final ConcurrentHashMap<Service, Service> singletonRepository;
    // namespace- All subordinate services
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
}
Copy the code

Note the equals and hasCode methods of the Service. Namespace + Group +name is a singleton Service on the server side.

@Override
public boolean equals(Object o) {
    if (this == o) {
        return true;
    }
    if(! (oinstanceof Service)) {
        return false;
    }
    Service service = (Service) o;
    return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
}

@Override
public int hashCode(a) {
    return Objects.hash(namespace, group, name);
}
Copy the code

The Cluster model is no longer used in 2.0. Cluster becomes an attribute in Instance. The server Instance model is InstancePublishInfo.

public class InstancePublishInfo implements Serializable {
    private String ip;
    private int port;
    private boolean healthy;
    private String cluster;
    // Metadata
    private Map<String, Object> extendDatum;
}
Copy the code

There is no relationship between Service and Instance. 2.0 New Added the Client model. A Client gRPC long connection corresponds to one Client, and each Client has its own unique ID (clientId). Client is responsible for managing a Client’s service instance register Publish and Subscribe.

/** * Nacos naming client. * * 

The abstract concept of the client stored by on the server of Nacos naming module. It is used to store which * services the client has published and subscribed. */

public interface Client { // Client ID/connectionId of gRPC String getClientId(a); // Whether the client is temporary boolean isEphemeral(a); // Client update time void setLastUpdatedTime(a); long getLastUpdatedTime(a); // Service instance registration/deregistration/query boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo); InstancePublishInfo removeServiceInstance(Service service); InstancePublishInfo getInstancePublishInfo(Service service); Collection<Service> getAllPublishedService(a); // Service subscribe/unsubscribe/query subscription boolean addServiceSubscriber(Service service, Subscriber subscriber); boolean removeServiceSubscriber(Service service); Subscriber getSubscriber(Service service); Collection<Service> getAllSubscribeService(a); // Generate client data to synchronize to other nodes ClientSyncData generateSyncData(a); // Whether to expire boolean isExpire(long currentTime); // Release resources void release(a); } Copy the code

ConnectionManager Manages all connections.

@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
    // clientid -> connection
    Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
}
Copy the code

The service registry

How to register Instance and establish the relationship among the three?

From the traditional concept of a registry, Service registration registers an Instance with a Service, and Service discovery finds an Instance based on a Service. 2.0 nacOS does not associate an Instance directly with a Service.

See how the relationship between Service and Instance is established from the server Service registration process.

InstanceRequestHandler receives InstanceRequest and accesses service registration and deregistration requests.

@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest.InstanceResponse> {
    private final EphemeralClientOperationServiceImpl clientOperationService;
    @Override
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            / / register
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            / / logout
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw newNacosException(NacosException.INVALID_PARAM); }}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        return newInstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE); }}Copy the code

EphemeralClientOperationServiceImpl actual handles the service registry.

@Override
public void registerInstance(Service service, Instance instance, String clientId) {
    // 1. Ensure that Service singletons exist. Note the equals and hasCode methods of Service
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    // 2. Locate the client based on the long connection ID. This relationship is stored when the connection is established
    Client client = clientManager.getClient(clientId);
    // 3. Client Instance model is converted to server Instance model
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    // 4. Add Instance to Client
    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();
    Service ->clientId
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter
            .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
Copy the code

The first step:ServiceManagerManage Service singletons

Map stores singleton Services. Unlike the 1.x version, which requires a concatenated key, Service overrides the equals and hasCode methods as keys.

// ServiceManager.java
// Singleton Service, see equals and hasCode methods of Service
private final ConcurrentHashMap<Service, Service> singletonRepository;
// namespace- All subordinate services
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
public Service getSingleton(Service service) {
    singletonRepository.putIfAbsent(service, service);
    Service result = singletonRepository.get(service);
    namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
    namespaceSingletonMaps.get(result.getNamespace()).add(result);
    return result;
}
Copy the code

The second step:ConnectionBasedClientManagerResponsible for managing the mapping relationship between long connection clientId and Client model

Client instance (ConnectionBasedClient) is in the save gRPC long when the connection is established to ConnectionBasedClientManager, omit code here.

@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
    private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
    // Long connection setup, storage
    public boolean clientConnected(Client client) {
        if(! clients.containsKey(client.getClientId())) { clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client); }return true;
    }
  	// Query clients according to clientId
    public Client getClient(String clientId) {
        returnclients.get(clientId); }}Copy the code

Step 3:Client Instance model, converted to server Instance model InstancePublishInfo

Original 1.0 metadata->2.0 extendDataum. Move properties such as enable and weight to metadata.

// ClientOperationService
default InstancePublishInfo getPublishInfo(Instance instance) {
    InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
    if (null! = instance.getMetadata() && ! instance.getMetadata().isEmpty()) { result.getExtendDatum().putAll(instance.getMetadata()); }if (StringUtils.isNotEmpty(instance.getInstanceId())) {
        result.getExtendDatum().put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
    }
    if(Constants.DEFAULT_INSTANCE_WEIGHT ! = instance.getWeight()) { result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight()); }if(! instance.isEnabled()) { result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled()); } String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME : instance.getClusterName(); result.setHealthy(instance.isHealthy()); result.setCluster(clusterName);return result;
}
Copy the code

Step 4:Establish the Client->Service->Instance relationship

ConnectionBasedClient’s abstract parent class is responsible for storing the current client’s Service registry, i.e. the relationship between Service and Instance. Note that only one instance of a service can be registered for a single client.

public abstract class AbstractClient implements Client {
    
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16.0.75 f.1);
    
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        return true; }}Copy the code

Step 5:Establish the relationship between Service and Client

Why do we have this step?

In fact, the above steps can ensure that the Service registry can be queried (service-instance).

Such as traversal ConnectionBasedClientManager all Client, and then by AbstractClient to each Client to register service, filtering the target service get the final list Instance. The pseudocode is as follows:

for (Client c : ConnectionBasedClientManager.clients.values()) { // Iterate over all clients
  for (Entry e : c.publishers.entrySet()) { // Iterate over the service-instance registered under Client
    if (e.key == targetService) { / / filtering service
      result.add(e.value); / / get the instance}}}Copy the code

Obviously time complexity is too high. In addition, if the Service changes, the relationship between the Service and the subscribing Client needs to be obtained directly for the Service subscription. The relationship between the Service and the Client is established to speed up the query.

Step 5 release ClientRegisterServiceEvent events, monitoring, ClientServiceIndexesManager ClientServiceIndexesManager maintain two indexes:

  • Service and publish clientId
  • Service and subscription clientId
// ClientServiceIndexesManager.java
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
private void handleClientOperation(ClientOperationEvent event) {
    Service service = event.getService();
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
        addPublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
        removePublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
        addSubscriberIndexes(service, clientId);
    } else if (event instanceofClientOperationEvent.ClientUnsubscribeServiceEvent) { removeSubscriberIndexes(service, clientId); }}// Establish the relationship between Service and publishing Client
private void addPublisherIndexes(Service service, String clientId) {
  publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
  publisherIndexes.get(service).add(clientId);
  NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
Copy the code

Once this index relationship is established, ServiceChangedEvent is also triggered to represent service registry changes. There are two things to do immediately following registry changes: 1) notify subscriber clients and 2) synchronize Nacos cluster data. We’ll talk about these two things later.

What about persistent instance registration?

Registering a persistent instance requires going through the Raft process. The other logic is exactly the same as the temporary instance, but the model is different, such as the Client instance is IpPortBasedClient.

PersistentClientOperationServiceImpl# onInstanceRegister method, when walk the Raft process application log to the current node.

// PersistentClientOperationServiceImpl
private void onInstanceRegister(Service service, Instance instance, String clientId) {
  // 1. Register service
  Service singleton = ServiceManager.getInstance().getSingleton(service);
  // 2. Register the client
  Client client = clientManager.computeIfAbsent(clientId, () -> new IpPortBasedClient(clientId, false));
  // 3
  InstancePublishInfo instancePublishInfo = getPublishInfo(instance);
  // 4. Register instance
  client.addServiceInstance(singleton, instancePublishInfo);
  client.setLastUpdatedTime();
  // 5. Build index
  NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
}
Copy the code

Service discovery

How do I get all instances based on service?

For services to monitor, the client must be eventually will Service subscription Service and clientId stored in ClientServiceIndexesManager, can refer to the section above.

For Service query, although ClientServiceIndexesManager established Service relationship with the Client, but directly through the Service to check the Instance or too slow, pseudo code is as follows:

// 1. Cycle to register all clients of this service
for (String clientId : ClientServiceIndexesManager.publisherIndexes.get(targetService)) {
  Client client = ConnectionBasedClientManager.get(clientId);
    // 2. Get all instances of the service from the client
  result.addAll(client.publishers.values());
}
Copy the code

It is also possible to get the list of instances directly from a Service and skip the Client association.

The Instance under ServiceQueryRequestHandler is responsible for handling the client query Service. There are three steps:

  • Query instance according to service
  • NamingMetadataManager (NamingMetadataManager, NamingMetadataManager, NamingMetadataManager, NamingMetadataManager, NamingMetadataManager, NamingMetadataManager, NamingMetadataManager, NamingMetadataManager, NamingMetadataManager
  • Instance filtering, including available filtering, health filtering, and protection mode logic
public class ServiceQueryRequestHandler extends RequestHandler<ServiceQueryRequest.QueryServiceResponse> {
    private final ServiceStorage serviceStorage;
    private final NamingMetadataManager metadataManager;
    public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {
        String namespaceId = request.getNamespace();
        String groupName = request.getGroupName();
        String serviceName = request.getServiceName();
        Service service = Service.newService(namespaceId, groupName, serviceName);
        String cluster = null == request.getCluster() ? "" : request.getCluster();
        boolean healthyOnly = request.isHealthyOnly();
        // 1. Query Instance according to service
        ServiceInfo result = serviceStorage.getData(service);
        // 2. Query the metadata of service based on service
        ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);
        // 3. Instance filter health? Available? , etc.
        result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true);
        returnQueryServiceResponse.buildSuccessResponse(result); }}Copy the code

ServiceStorage will query instance (ServiceInfo) of the service based on the preceding pseudo-code logic and cache the query result.

// service-instance
private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;
// service-cluster
private final ConcurrentMap<Service, Set<String>> serviceClusterIndex;
public ServiceInfo getData(Service service) {
    // It exists in the cache
    return serviceDataIndexes.containsKey(service)
      ? serviceDataIndexes.get(service) 
      : getPushData(service);
}

public ServiceInfo getPushData(Service service) {
    ServiceInfo result = emptyServiceInfo(service);
    if(! ServiceManager.getInstance().containSingleton(service)) {return result;
    }
    // 1. Add instance to the result set
    result.setHosts(getAllInstancesFromIndex(service));
    Service ->instance
    serviceDataIndexes.put(service, result);
    return result;
}
Copy the code

Two, long connection

Registry client 2.0 uses gRPC instead of HTTP to establish a long connection with the server, but still retains support for the old HTTP client.

The NamingClientProxy interface is responsible for the underlying communication and invokes the server-side interface. There are three implementation classes:

  • NamingClientProxyDelegate: the proxy class, method of all NacosNamingService agent, according to the actual situation to choose the HTTP request to the server or gRPC agreement.
  • NamingGrpcClientProxy: Underlying communication is based on gRPC long connection.
  • NamingHttpClientProxy: The underlying communication is based on HTTP short connections. I’m just using old code that’s basically the same as the original 1.0NamingProxy.

Client service registry, for example, methods for registerService NamingClientProxyDelegate agent.

// NacosNamingService.java
private NamingClientProxy clientProxy; // NamingClientProxyDelegate
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    clientProxy.registerService(serviceName, groupName, instance);
}
Copy the code

NamingClientProxyDelegate will according to the instance instance is temporary node and choose a different protocol.

  • Temporary instance: gRPC
  • Persistent instance: HTTP
public class NamingClientProxyDelegate implements NamingClientProxy {
   private final NamingHttpClientProxy httpClientProxy;
   private final NamingGrpcClientProxy grpcClientProxy;
   @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
      getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    }
  // Temporary node, go GRPC long connection; Persistent node, through HTTP short connection
  private NamingClientProxy getExecuteClientProxy(Instance instance) {
      returninstance.isEphemeral() ? grpcClientProxy : httpClientProxy; }}Copy the code

Service query and subscription all go gRPC.

// NamingClientProxyDelegate
/ / subscribe
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
  String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
  String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
  // 1. Obtain the serviceInfo corresponding to the service in the cache
  ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
  // 2. If not in the cache, subscribe to the service
  if (null == result) {
    result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
  }
  // 3. Enable the scheduled task to continuously update the subscription service registry
  serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
  // 4. Update the cache serviceInfo to notify the service change
  serviceInfoHolder.processServiceInfo(result);
  return result;
}

/ / query
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
                                           boolean healthyOnly) throws NacosException {
  return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}
Copy the code

Three, health check

Review version 1.x.

Distro protocol memory storage for temporary instances, clients send heartbeats to the registry to maintain their healthy state;

Raft protocol is used to store persistent instances. The server periodically establishes TCP connections with clients for health check.

Version 2.0 is old logic for persistent instances. Persistent instances are rarely used. Forget it.

2. X temporary instances no longer use heartbeat, but are judged to be healthy by whether long connections survive.

The server is proactive

ConnectionManager manages long connections for all clients.

Every 3s, detect all the clients that have not communicated for more than 20 seconds, and initiate ClientDetectionRequest to the client. If the client responds successfully within 1s, the detection passes; otherwise, execute unregister to remove the Connection.

// ConnectionManager
// clientid -> connection
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
@PostConstruct
public void start(a) {
    RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run(a) {
            try {
                Set<Map.Entry<String, Connection>> entries = connections.entrySet();
                // Statistics are out of date (20S) connections
                Set<String> outDatedConnections = new HashSet<>();
                long now = System.currentTimeMillis();
                for (Map.Entry<String, Connection> entry : entries) {
                    Connection client = entry.getValue();
                    // ...
                     if(now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { outDatedConnections.add(client.getMetaInfo().getConnectionId()); }}// ...
                ClientDetectionRequest asynchronously requests all connections that need to be detected
                if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                    Set<String> successConnections = new HashSet<>();
                    final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                    for (String outDateConnectionId : outDatedConnections) {
                        try {
                            Connection connection = getConnection(outDateConnectionId);
                            if(connection ! =null) {
                                ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                                connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                    @Override
                                    public Executor getExecutor(a) {
                                        return null;
                                    }

                                    @Override
                                    public long getTimeout(a) {
                                        return 1000L;
                                    }

                                    @Override
                                    public void onResponse(Response response) {
                                        latch.countDown();
                                        // Count client ids that successfully respond to probe requests and refresh activeTime
                                        if(response ! =null&& response.isSuccess()) { connection.freshActiveTime(); successConnections.add(outDateConnectionId); }}@Override
                                    public void onException(Throwable e) { latch.countDown(); }}); }else{ latch.countDown(); }}catch (ConnectionAlreadyClosedException e) {
                            latch.countDown();
                        } catch (Exception e) {
                            latch.countDown();
                        }
                    }
                    latch.await(3000L, TimeUnit.MILLISECONDS);
                    // For clients that fail to respond, execute the unregister method
                    for (String outDateConnectionId : outDatedConnections) {
                        if(! successConnections.contains(outDateConnectionId)) { unregister(outDateConnectionId); }}}}catch (Throwable e) {
                Loggers.REMOTE.error("Error occurs during connection check... ", e); }}},1000L.3000L, TimeUnit.MILLISECONDS);

}
/ / remove the connection
public synchronized void unregister(String connectionId) {
    Connection remove = this.connections.remove(connectionId);
    if(remove ! =null) {
        // ...remove.close(); clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); }}Copy the code

After removing the connection, inherit ClientConnectionEventListener ConnectionBasedClientManager will remove the Client, release ClientDisconnectEvent events.

// clientId -> client
private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
@Override
public boolean clientDisconnected(String clientId) {
  ConnectionBasedClient client = clients.remove(clientId);
  if (null == client) {
    return true;
  }
  client.release();
  NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
  return true;
}
Copy the code

ClientDisconnectEvent fires several points:

1) Distro protocol: sync removed client data

2) the removal of two indexes cache: release ClientServiceIndexesManager Service and Client relationship; The relationship between Service and Instance in ServiceStorage

3) Service Subscription: ClientDisconnectEvent Indirectly triggers the ServiceChangedEvent event to notify the client of service changes.

Does Connection need to be checked?

Whether Connection needs to be checked depends on the lastActiveTime property in its metadata, which represents the lastActiveTime.

public class ConnectionMeta {
    // Last active time
    long lastActiveTime;
}
Copy the code

If the client is constantly communicating with the server, the server does not need to actively probe. See GrpcRequestAcceptor calling refreshActiveTime of the ConnectionManager to refresh the lastActivetime of the connection.

// GrpcRequestAcceptor
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
    String type = grpcRequest.getMetadata().getType();
    // Request handler
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    // Parse the request parameters
    Object parseObj = GrpcUtils.parse(grpcRequest);
  	// ..
    Request request = (Request) parseObj;
    try {
        Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        // Refresh the active time of the long link
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());
        Response response = requestHandler.handleRequest(request, requestMeta);
        Payload payloadResponse = GrpcUtils.convert(response);
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    } catch (Throwable e) {
       / /...}}Copy the code

When the client subscribes to the service, the service registry is updated periodically. In 1.x, the server is invoked to check the registry every 10 seconds. This means that the server health check is not triggered because it is shorter than 20 seconds. However, in 2.x, the client’s service registry UpdateTask UpdateTask was changed to 60s.

// UpdateTask
public void run(a) {
    long delayTime = DEFAULT_DELAY;
    try {
        if(! changeNotifier.isSubscribed(groupName, serviceName, clusters) && ! futureMap.containsKey(serviceKey)) {return;
        }
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (serviceObj == null) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0.false);
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }

        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0.false);
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            return;
        }
        // Server controls cacheMillis=10s times 6 so 60s is updated once
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
    } finally {
        // Execute again after 60 seconds
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); }}Copy the code

Thus, the server may perform health checks 2-3 times per minute on all long connections registered on the current node. Well, that’s not the case. Let’s see.

The client is reconnected

Not only does the server proactively check whether the client is alive, but also the client proactively checks whether the idle long connection is alive every five seconds.

When the long connection is down, the client initiates a reconnection to the server.

public final void start(a) throws NacosException {
    // 5s Sends a HealthCheckRequest and reconnects if the response fails
    clientEventExecutor.submit(new Runnable() {
        @Override
        public void run(a) {
            while (true) {
                try {
                    / / 5 s timeout
                    ReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS);
                    if (reconnectContext == null) {
                        // More than 5s idle
                        if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                            // HealthCheckRequest
                            boolean isHealthy = healthCheck();
                            if(! isHealthy) {if (currentConnection == null) {
                                    continue;
                                }
                                rpcClientStatus.set(RpcClientStatus.UNHEALTHY);
                                reconnectContext = new ReconnectContext(null.false);

                            } else {
                                lastActiveTimeStamp = System.currentTimeMillis();
                                continue; }}else {
                            continue; }}// Initiate reconnection and select the next nacOS node
                    // ...
                    reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
                } catch (Throwable throwable) {
                    //Do nothing}}}}); }Copy the code

To sum up, 2.x uses the long connection to determine the survival of the peer through two-way health check. The client initiates a health check in 5s and the server in 20s.

Client reconnection also needs to resolve issues such as re-registration and re-listening. So each time a client initiates these operations it will be recorded in memory and all registration and listening requests will be replayed when reconnected.

public class NamingGrpcConnectionEventListener implements ConnectionEventListener {
    // Service-instance registered by the current client
    private final ConcurrentMap<String, Set<Instance>> registeredInstanceCached = new ConcurrentHashMap<String, Set<Instance>>();
    // The current client subscribes to the service
    private final Set<String> subscribes = new ConcurrentHashSet<String>();
    
    @Override
    public void onConnected(a) {
        // 1. Re-subscribe
        redoSubscribe();
        // 2. Re-registerredoRegisterEachService(); }}Copy the code

4. Client data synchronization

The client data synchronization still adopts the push-pull mode.

Pull: The client performs an UpdateTask every 60 seconds to query the server registry and update a subscription server. The logic is the same as that of 1.x, but the time is changed from 10s to 60s.

Push: Notifies all clients listening for Service changes. The 1.x UDP protocol is no longer used to push long-link packets.

The ServiceChangedEvent is triggered when a Service changes. Submit PushDelayTask NamingSubscriberServiceV2Impl ServiceChangedEvent event monitoring,.

// NamingSubscriberServiceV2Impl
public void onEvent(Event event) {
    if (event instanceof ServiceEvent.ServiceChangedEvent) {
        ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
        Service service = serviceChangedEvent.getService();
        delayTaskEngine.addTask(service, new PushDelayTask(service, 500L));
    } 
    // ...
}
Copy the code

PushExecuteTask performs client data synchronization, first fetching ServiceInfo via ServiceStorage and updating the ServiceStorage cache.

// PushExecuteTask
public void run(a) {
    try {
        Update the service->instance cache in ServiceStorage and query ServiceInfo
        PushDataWrapper wrapper = generatePushData();
        // 2. Loop all the subscription clients and send ServiceInfo to the subscription clients
        for (String each : getTargetClientIds()) {
            Client client = delayTaskEngine.getClientManager().getClient(each);
            Subscriber subscriber = delayTaskEngine.getClientManager().getClient(each).getSubscriber(service);
            delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                    newNamingPushCallback(each, subscriber, wrapper.getOriginalData())); }}catch (Exception e) {
        delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L)); }}private PushDataWrapper generatePushData(a) {
  // Query ServiceInfo and update the ServiceStorage cache
  ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);
  ServiceMetadata serviceMetadata = delayTaskEngine.getMetadataManager().getServiceMetadata(service).orElse(null);
  serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, false.true);
  return new PushDataWrapper(serviceInfo);
}
Copy the code

5. Cluster data synchronization

When a Client publishes or deregisters a Service, Instance information corresponding to the published Service is stored in the Client model.

public abstract class AbstractClient implements Client {
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16.0.75 f.1);
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        return true;
    }
    
    @Override
    public InstancePublishInfo removeServiceInstance(Service service) {
        InstancePublishInfo result = publishers.remove(service);
        if (null! = result) { NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        }
        returnresult; }}Copy the code

Responsibility node

After completion of local written registration information trigger ClientChangedEvent events, DistroClientDataProcessor can only deal with the current node is responsible for the client.

// DistroClientDataProcessor.java
private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null== client || ! client.isEphemeral() || ! clientManager.isResponsibleClient(client)) {return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        // The client is disconnected
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        // Add/modify the client
        DistroKey distroKey = newDistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE); }}Copy the code

DistroProtocol cycle all other nacos nodes, submit an asynchronous task, the asynchronous task may delay 1 s (nacos. Core. Protocol. The distro. Data. Sync_delay_ms) is carried out.

// DistroProtocol.java
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for(Member each : memberManager.allMembersWithoutSelf()) { syncToTarget(distroKey, action, each.getAddress(), delay); }}public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
            targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
}
Copy the code

The DELETE operation is handled by DistroSyncDeleteTask;

The CHANGE operation is handled by DistroSyncChangeTask.

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    private static final DataOperation OPERATION = DataOperation.CHANGE;
    / / no callback
    @Override
    protected boolean doExecute(a) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            return true;
        }
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    }
    // 有callback
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    / / get DistroData from DistroClientDataProcessor
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null! = result) { result.setType(OPERATION); }returnresult; }}Copy the code

From DistroClientDataProcessor DistroData, from ClientManager real-time access to the Client.

// DistroClientDataProcessor
public DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}
Copy the code

AbstractClient provide DistroClientDataProcessor Client registered all the information, including the Client registered what namespace, which group, which the service, which the instance.

// AbstractClient
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16.0.75 f.1);
public ClientSyncData generateSyncData(a) {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        namespaces.add(entry.getKey().getNamespace());
        groupNames.add(entry.getKey().getGroup());
        serviceNames.add(entry.getKey().getName());
        instances.add(entry.getValue());
    }
    return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}
Copy the code

Eventually DistroClientTransportAgent encapsulated as DistroDataRequest call other Nacos node.

// DistroClientTransportAgent
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
    if (isNoExistTarget(targetServer)) {
        callback.onSuccess();
    }
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    Member member = memberManager.find(targetServer);
    try {
        clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback));
    } catch(NacosException nacosException) { callback.onFailed(nacosException); }}Copy the code

Non-responsible node

The non-responsible node processes the Client data synchronized from the responsible node.

DistroClientDataProcessor handle responsibility node synchronization of data.

// DistroClientDataProcessor
public boolean processData(DistroData distroData) {
  switch (distroData.getType()) {
    case ADD:
    case CHANGE:
      ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
        .deserialize(distroData.getContent(), ClientSyncData.class);
      handlerClientSyncData(clientSyncData);
      return true;
    case DELETE:
      String deleteClientId = distroData.getDistroKey().getResourceKey();
      clientManager.clientDisconnected(deleteClientId);
      return true;
    default:
      return false; }}private void handlerClientSyncData(ClientSyncData clientSyncData) {
  // 1. Save the ConnectionBasedClient. The isNative of this class of ConnectionBasedClient is false
  clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
  Client client = clientManager.getClient(clientSyncData.getClientId());
  // 2. Update Client information
  upgradeClient(client, clientSyncData);
}
Copy the code

Note that the implementation class of the Client is still the ConnectionBasedClient, but its isNative property is false, which is the main difference between a non-responsible node and a responsible node.

DistroClientDataProcessor upgradeClient method, update the registry in the Client information, release the corresponding event.

// DistroClientDataProcessor
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    Set<Service> syncedService = new HashSet<>();
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i);
        if(! instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) { client.addServiceInstance(singleton, instancePublishInfo); NotifyCenter.publishEvent(newClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId())); }}for (Service each : client.getAllPublishedService()) {
        if(! syncedService.contains(each)) { client.removeServiceInstance(each); NotifyCenter.publishEvent(newClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId())); }}}Copy the code

DistroFilter?

1. In version X, all client write requests go through the DistroFilter.

If hash(service name)%nacos node list size == the subscript of the current node, the current node is the responsible node, processing client write requests.

// DistroMapper
public boolean responsible(String responsibleTag) {
    final List<String> servers = healthyList;
    if(! switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;
    }
    if (CollectionUtils.isEmpty(servers)) {
        return false;
    }
    int index = servers.indexOf(EnvUtil.getLocalAddress());
    int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
    if (lastIndex < 0 || index < 0) {
        return true;
    }
    
    int target = distroHash(responsibleTag) % servers.size();
    return target >= index && target <= lastIndex;
}
Copy the code

Otherwise, in 1.x, the client request must be forwarded to the responsible node for processing. After the responsible node processes the request, the current node returns the request to the client.

In 2.x, the DistroFilter is useless for the Client because the Client and the server establish a long connection. Whether the current NACOS node is a responsible node depends on the isNative property of the Client. If the client registers directly with the ConnectionBasedClient on the NACOS node, its isNative property is true; If Distro protocol is used, sync to the ConnectionBasedClient on this NACOS node, whose isNative property is false.

public class ConnectionBasedClient extends AbstractClient {
    / * * * {@code true} means this client is directly connect to current server. {@code false} means this client is synced
     * from other server.
     */
    private final boolean isNative;
}
Copy the code

To sum up, 2.x reduces the steps of write request forwarding in 1.x. The node on which the long connection is established is the responsible node, and the client will only send requests to this responsible node.

Verify

Distro’s efforts to ensure data consistency across clusters do not just rely on real-time synchronization as data changes, there are scheduled tasks in the background to do this.

In 1.x, the responsible node synchronizes the digest (MD5) of the list of instances of all services to the non-responsible node every 5s.

The non-responsible node compares the MD5 of the local service with the MD5 of the peer service. If the service changes, you need to check the responsible node.

In 2.x, this process is modified. The responsible node sends full data to the Client, and the non-responsible node periodically checks whether the synchronized Client expires, reducing the reverse check in 1.x.

  • The responsible node sends DistroData of type DataOperation=VERIFY to other nodes every 5s to keep the Client data of the non-responsible node from expiring.
public class DistroVerifyTimedTask implements Runnable {
    @Override
    public void run(a) {
        try {
            // 1. All other nodes
            List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
            for (String each : distroComponentHolder.getDataStorageTypes()) {
                // 2. Send client. isNative=true DistroData, type = VERIFY to these nodesverifyForDataStorage(each, targetServer); }}catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e); }}}Copy the code
  • The non-responsible node scans clients whose isNative=false every 5s. If the client’s lease renewal date has not been updated by VERIFY’s DistroData within 30 seconds, the synchronized client data will be deleted.
private static class ExpiredClientCleaner implements Runnable {
    private final ConnectionBasedClientManager clientManager;
    @Override
    public void run(a) {
        long currentTime = System.currentTimeMillis();
        for (String each : clientManager.allClientId()) {
            ConnectionBasedClient client = (ConnectionBasedClient) clientManager.getClient(each);
            if (null! = client && isExpireClient(currentTime, client)) { clientManager.clientDisconnected(each); }}}private boolean isExpireClient(long currentTime, ConnectionBasedClient client) {
        // Synchronize the client and the lease is not renewed within 30 seconds
        return !client.isNative() && currentTime - client.getLastRenewTime() > Constants.DEFAULT_IP_DELETE_TIMEOUT;
    }
}
Copy the code

conclusion

Model: Compared with 1.x, the main optimization point of 2.x is to use long connections instead of short ones. Based on long connections, the model of the registry is adjusted.

  • Model:
    • Service: indicates the Service. Namespace + Group +name= singleton Service. A Service and Instance are managed by a ServiceManager
    • Instance: Instance, InstancePublishInfo, managed by Client.
    • Client: One Client corresponds to one long connection. One Client holds the Service and Instance registered and monitored by the corresponding Client. Client associates Service and Instance and is managed by ClientManager.
    • Connection: a long Connection. One Connection corresponds to one Client and is managed by the ConnectionManager.
  • Model indexing: To speed up queries, two indexing services are provided
    • ClientServiceIndexesManager: Service – > Client, Service and the Service & Service and monitoring the Service Client relationships.
    • ServiceStorage: Service->Instance: indicates the relationship between a Service and instances under the Service.

Long connections: Not all client requests go through gRPC, but persistent instance registrations go through HTTP.

Health check:

  • 1. X: The temporary Instance client periodically sends heartbeat messages to the server to ensure Instance survival.
  • 2. X: The temporary instance Client establishes a long-term connection with the server and performs bidirectional health check to ensure the Client’s survival. The server detects the 20s idle connection and sends a probe request to the client. If the client responds within 1s, the health check passes. The client checks the idle connection for 5s and sends a health check request to the server. If the server responds within 3s, the health check passes.
  • 2. If a Client is found to be unhealthy, the X server removes the Client from the ClientManager, triggering various events (cluster/Client data synchronization).

Client data synchronization: still a combination of push and pull.

  • Pull: The client performs an UpdateTask every 60 seconds to query the server registry and update a subscription server. The logic is the same as that of 1.x, but the time is changed from 10s to 60s.

  • Push: Notifies all clients listening for Service changes.

Cluster data synchronization:

  • DistroFilter: 1.x uses DistroFilter to ensure that the write request can only be processed by the corresponding responsible node of the service. 2 x because using a long connection, as long as ConnectionBasedClient. IsNative = true, direct, on behalf of the Client and the node is the responsibility of the Client’s node node, reduce the loss of the written request to redirect the other node.
  • After the Client data of the responsible node is changed, the system synchronizes the full data of the Client to other non-responsible nodes. Non-responsible nodes update Client information in ClientManager.
  • In order to avoid inconsistent data of isNative=false clients of non-responsible nodes, the responsible nodes send VERIFY data to the non-responsible nodes every 5s to renew the lease of these clients, which contains all data of the clients. Non-responsible nodes periodically scan the data of clients whose isNative value is false. If the lease is not renewed within 30 seconds, remove these non-native clients.