preface

This chapter analyzes the Nacos1.4.1 registry server, focusing on temporary instances (instance.ephemeral =true, default)

  • Server models: ServiceManager, Service, Cluster, and Instance
  • Distro protocol: AP protocol serving the registry
  • Service query: UDP listening and protection mode
  • Nacos Cluster management: Cluster initialization, cluster health check
  • How Distro protocol handles write requests
  • Service registration: node local data update, cluster data synchronization, UDP push client
  • Client heartbeat: Handling heartbeat timeout
  • Cluster data synchronization: How does Nacos ensure consistent data for each node

First, server model

Logically, the naming service is modeled as follows.

The main difference between the server and the client is that the NamingService on the client is created for a Namespace of a CERTAIN Naocs cluster, and the Namespace usage process on the client remains unchanged. From a server-side implementation point of view, this is as follows.

ServiceManager singleton: Manages namespace-group-service mapping. The Key of Map

is groupName @@Servicename.
,>

@Component
public class ServiceManager implements RecordListener<Service> {
    // namespace - groupName@@serviceName - Service
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
}
Copy the code

Com. Alibaba. Nacos. Naming. Core. The Service stores the Namespace and Group and all of the information Service.

// com.alibaba.nacos.naming.core.Service
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record.RecordListener<Instances> {
    / / belongs to the namespace
    private String namespaceId;
}
// com.alibaba.nacos.api.naming.pojo.Service
public class Service implements Serializable {
    // groupName@@serviceName
    private String name;
    // Service protection threshold. When most services go offline, the current registry node is considered faulty and all instances, including unhealthy ones, are returned
    private float protectThreshold = 0.0 F;
    / / group
    private String groupName;
}
Copy the code

In addition com. Alibaba. Nacos. Naming. Core. Its all Cluster Service management.

// Cluster registry where key is the Cluster name
private Map<String, Cluster> clusterMap = new HashMap<>();
Copy the code

Cluster manages all persistent and temporary instances.

// com.alibaba.nacos.naming.core.Cluster
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    / / persistent Instance
    private Set<Instance> persistentInstances = new HashSet<>();
    / / temporary Instance
    private Set<Instance> ephemeralInstances = new HashSet<>();
    / / Service they belong to
    private Service service;
}
// com.alibaba.nacos.api.naming.pojo.Cluster
public class Cluster implements Serializable {
    /** * Name of belonging service. */
    private String serviceName;
    /** * Name of cluster. */
    private String name;
}
Copy the code

Instance Service Instance. Unit for which the client service is registered.

// com.alibaba.nacos.naming.core.Instance
public class Instance extends com.alibaba.nacos.api.naming.pojo.Instance implements Comparable {
    // Last heartbeat time
    private volatile long lastBeat = System.currentTimeMillis();
    // namespace
    private String tenant;
}
// com.alibaba.nacos.api.naming.pojo.Instance
public class Instance implements Serializable {
    // Unique identifier = Group + Cluster + Service + IP address + port
    private String instanceId;
    private String ip;
    private int port;
    private double weight = 1.0 D;
    private boolean healthy = true;
    private boolean enabled = true;
    // Whether the node is temporary
    private boolean ephemeral = true;
    // Owning cluster
    private String clusterName;
    // Service name = group + service
    private String serviceName;
    / / metadata
    private Map<String, String> metadata = new HashMap<String, String>();
}
Copy the code

Ii. Distro Agreement

The Nacos registry uses distro protocol for the registry, AP.

For clients, the server is peer to peer, and read and write requests can be processed regardless of which node they are sent to. If a node fails to process, the client selects a new node request. Client requests include service registration, service query, service listening, and heartbeat requests.

// NamingProxy#reqApi
public String reqApi(String api, Map
       
         params, Map
        
          body, List
         
           servers, String method)
         
        ,>
       ,> throws NacosException {
    // Select a random node as the first request node
    Random random = new Random(System.currentTimeMillis());
    int index = random.nextInt(servers.size());
    for (int i = 0; i < servers.size(); i++) {
      String server = servers.get(index);
      try {
        return callServer(api, params, body, server, method);
      } catch (NacosException e) {
        exception = e;
      }
      // An exception occurs. Select the next node to try the request
      index = (index + 1) % servers.size();
    }
   // ...
}
Copy the code

For the server side the following operations are handled:

  • Read: Each node in the cluster stores all data. Each node can process read requests and return data in the current node registry, regardless of whether the data is up to date. (GET/nacos/v1 / ns/instance/list)
  • Write: In the unit of services, each node is responsible for some services. From the service dimension, if the current node is responsible for the service, then this node is called the responsible node. After a client sends a write request to a random server, the server checks whether it is the responsible node. If yes, it processes the request. If no, the request is forwarded to another node. (DistroFilter)
  • ** Client heartbeat: ** If the server does not receive the client heartbeat for a long time, the service instance (ClientBeatCheckTask, ClientBeatProcessor) is offline. Client server if received heart, but the service does not exist, perform registration logic (BeatReactor, PUT/nacos/v1 / ns/instance/beat). Note: The temporary instance registry is maintained by the client actively sending heartbeat. Persistent instances are maintained through active server health checks, and only temporary instance registrations are considered here.
  • ** Cluster management: ** Each server node proactively sends health check to other nodes, and the node that responds successfully is regarded as a healthy node by the node; Health check is also responsible for synchronizing cluster members (MemberInfoReportTask).
  • Data synchronization: Based on the service, the responsible node asynchronously synchronizes data to other nodes after processing the write request to ensure that the registry information of all nodes is consistent (DistroProtocol).

Cluster management

Cluster management is a general feature of Nacos, not unique to registries.

ServerMemberManager is responsible for Nacos cluster management.

@Component(value = "serverMemberManager")
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {
    // All nacOS nodes
    private volatile ConcurrentSkipListMap<String, Member> serverList;
    // How does NacOS discover the NacOS service itself
    private MemberLookup lookup;
    // Current nacOS node
    private volatile Member self;
    // Set of node addresses in the health state
    private volatile Set<String> memberAddressInfos = new ConcurrentHashSet<>();
    // Cluster member information broadcast task
    private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();
    private volatile boolean isInIpList = true;
    private int port;
    private String localAddress;
}
Copy the code

Member stands for Nacos cluster node.

public class Member implements Comparable<Member>, Cloneable {
    // ip
    private String ip;
    // port
    private int port = -1;
    / / state
    private volatile NodeState state = NodeState.UP;
    // Extend the information
    private Map<String, Object> extendInfo = Collections.synchronizedMap(new TreeMap<>());
    // ip:port
    private String address = "";
    // Number of consecutive health check failures
    private transient int failAccessCnt = 0;
}
Copy the code

NodeState: node status.

public enum NodeState {
    STARTING,
    UP,
    SUSPICIOUS,
    DOWN,
    ISOLATION,
}
Copy the code

Only the UP, SUSPICIOUS, and DOWN states can be used.

  • UP: The health check passes.
  • SUSPICIOUS: The health check fails and the number of failures is less than a certain threshold.
  • DOWN: indicates that the health check fails and the number of failures exceeds a certain threshold.

1. Initialize the cluster

, a new node MemberLookup are needed to initialize the in-memory cluster list ServerMemberManager. ServerList, can be sent to join the cluster for subsequent heartbeat.

// ServerMemberManager
protected void init(a) throws NacosException {
    this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
    this.localAddress = InetUtils.getSelfIP() + ":" + port;
    this.self = MemberUtil.singleParse(this.localAddress);
    this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
    serverList.put(self.getAddress(), self);
    // register MembersChangeEvent publisher IPChangeEvent subscriber/publisher
    registerClusterEvent();
    // Initialize the cluster list
    initAndStartLookup();
}
Copy the code

There are three MemberLookup implementation classes:

  • StandaloneMemberLookup: When the node is started as a standalone, it takes itself directly as the cluster list.
  • FileConfigMemberLookup: take nacos. Home/conf/cluster. The conf list the contents of the configuration file as a cluster.
  • AddressServerMemberLookup: use the external address services for nacos cluster capability of providing the service discovery, initialize cluster list. Request http://{address.server.domain}:{address.server.port}/{address_server_url} to obtain cluster.conf.

ServerMemberManager selects the appropriate MemberLookup implementation based on the current situation and executes the start method.

// ServerMemberManager
private void initAndStartLookup(a) throws NacosException {
    this.lookup = LookupFactory.createLookUp(this);
    this.lookup.start();
}
Copy the code

Read FileConfigMemberLookup, for example, start method {nacos. Home} / conf/cluster. The conf, through memberManager. MemberChange callback memberManager (members). In addition, the WatchService implementation file of the JDK is used to listen, and the cluster list callback of MemberManager will be reloaded when cluster.conf changes.

// FileConfigMemberLookup
@Override
public void start(a) throws NacosException {
    if (start.compareAndSet(false.true)) {
        readClusterConfFromDisk();
        try {
            // Use the WatchService file of the JDK to listen
            WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
        } catch (Throwable e) {
            Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage()); }}}private void readClusterConfFromDisk(a) {
    // 1. Read {nacos.home}/conf/cluster.conf to load tmpMembers
    Collection<Member> tmpMembers = new ArrayList<>();
    try {
        List<String> tmp = EnvUtil.readClusterConf();
        tmpMembers = MemberUtil.readServerConf(tmp);
    } catch (Throwable e) {
        Loggers.CLUSTER
                .error("nacos-XXXX [serverlist] failed to get serverlist from disk! , error : {}", e.getMessage());
    }
    // 2. this.memberManager.memberChange(members)
    afterLookup(tmpMembers);
}
Copy the code

The memberChange method of ServerMemberManager updates the list of NACOS nodes in memory, publishing the MembersChangeEvent event.

// Set of node addresses in the health state
private volatile Set<String> memberAddressInfos = new ConcurrentHashSet<>();
// All nacOS nodes
private volatile ConcurrentSkipListMap<String, Member> serverList;
synchronized boolean memberChange(Collection<Member> members) {
    boolean isContainSelfIp = members.stream()
            .anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));
    if (isContainSelfIp) {
        isInIpList = true;
    } else {
        isInIpList = false;
        members.add(this.self);
    }

    booleanhasChange = members.size() ! = serverList.size(); ConcurrentSkipListMap<String, Member> tmpMap =new ConcurrentSkipListMap<>();
    Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
    for (Member member : members) {
        final String address = member.getAddress();

        if(! serverList.containsKey(address)) { hasChange =true;
        }

        tmpMap.put(address, member);
        if (NodeState.UP.equals(member.getState())) {
            tmpAddressInfo.add(address);
        }
    }
    serverList = tmpMap;
    memberAddressInfos = tmpAddressInfo;

    Collection<Member> finalMembers = allMembers();

    if (hasChange) {
        MemberUtil.syncToFile(finalMembers);
        Event event = MembersChangeEvent.builder().members(finalMembers).build();
        NotifyCenter.publishEvent(event);
    }

    return hasChange;
}
Copy the code

2. Cluster health check

ServerMemberManager implements the interface ApplicationListener, attention WebServerInitializedEvent events.

After Tomcat is started, it calls back to ServerMemberManager and starts a MemberInfoReportTask that broadcasts information about the current node.

// ServerMemberManager
// Cluster member information broadcast task
private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();
public void onApplicationEvent(WebServerInitializedEvent event) {
    getSelf().setState(NodeState.UP);
    if(! EnvUtil.getStandaloneMode()) { GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
    }
    EnvUtil.setPort(event.getWebServer().getPort());
    EnvUtil.setLocalAddress(this.localAddress);
}
Copy the code

MemberInfoReportTask executes POST /v1/core/cluster/report every 2 seconds to send the current node information (Member) to the random nodes in the cluster (including DOWN). On the one hand, it synchronizes the current node information and on the other hand, it is also a health check.

class MemberInfoReportTask extends Task {
    private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() {
    };
    private int cursor = 0;
    @Override
    protected void executeBody(a) {
        // Get all nodes except the current one, including down ones
        List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
        if (members.isEmpty()) {
            return;
        }
        // Poll the selection
        this.cursor = (this.cursor + 1) % members.size();
        Member target = members.get(cursor);
        // call /v1/core/cluster/report to pass getSelf's Member information to the peer
        final String url = HttpUtils
                .buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT, "/cluster/report");

        try {
            asyncRestTemplate
                    .post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),
                            Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {
                                // Communication succeeded
                                @Override
                                public void onReceive(RestResult<String> result) {
                                    if (result.ok()) {
                                        // Business is successful
                                        MemberUtil.onSuccess(ServerMemberManager.this, target);
                                    } else {
                                        // The service fails
                                        MemberUtil.onFail(ServerMemberManager.this, target); }}// Communication failed
                                @Override
                                public void onError(Throwable throwable) {
                                    MemberUtil.onFail(ServerMemberManager.this, target, throwable); }}); }catch (Throwable ex) {
            Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(), ExceptionUtil.getAllExceptionMsg(ex)); }}// execute once
    @Override
    protected void after(a) {
        GlobalExecutor.scheduleByCommon(this.2_000L); }}Copy the code

The current node requests other nodes /v1/core/cluster/ Report successfully (communication and service are successful), execute MemberUtil. OnSuccess method, set peer member to health, and trigger MembersChangeEvent.

// MemberUtil
public static void onSuccess(final ServerMemberManager manager, final Member member) {
    final NodeState old = member.getState();
    manager.getMemberAddressInfos().add(member.getAddress());
    member.setState(NodeState.UP);
    member.setFailAccessCnt(0);
    if (!Objects.equals(old, member.getState())) {
        manager.notifyMemberChange();
    }
}
Copy the code

The current node fails to request /v1/core/cluster/report from other nodes (communication or service failure). If three consecutive failures or connection refused occur, set member to DOWN. A few occasional failed health checks won’t cause member to be directly marked DOWN as unavailable, and the SUSPICIOUS state will still participate in Distro’s protocol, taking care of part of the registry’s write requests (see DistroFilter below). Finally, the MembersChangeEvent event is emitted.

public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {
    manager.getMemberAddressInfos().remove(member.getAddress());
    final NodeState old = member.getState();
    // Occasionally fail, set it to an intermediate state, between UP and DOWN, and allow participation in Distro protocol as a healthy node
    member.setState(NodeState.SUSPICIOUS);
    member.setFailAccessCnt(member.getFailAccessCnt() + 1);
    // The node fails for three consecutive times or connection refused sets the node to DOWN
    int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);

    if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils
            .containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
        member.setState(NodeState.DOWN);
    }
    if (!Objects.equals(old, member.getState())) {
        manager.notifyMemberChange();
    }
}
Copy the code

Let’s look at how the peer node being health checked handles /v1/core/cluster/report requests.

Call the Update method of MemberManager.

// NacosClusterController
@PostMapping(value = {"/report"})
public RestResult<String> report(@RequestBody Member node) {
    if(! node.check()) {return RestResultUtils.failedWithMsg(400."Node information is illegal");
    }
    node.setState(NodeState.UP);
    node.setFailAccessCnt(0);
    boolean result = memberManager.update(node);
    return RestResultUtils.success(Boolean.toString(result));
}
Copy the code

MemberManager updates the Member information in the serverList and publishes the MembersChangeEvent event. The health check is bidirectional, updating the cluster node status in memory for both the requesting and responding nodes.

// MemberManager
public boolean update(Member newMember) {
    String address = newMember.getAddress();
    // Member not in the configuration file will not be added to the cluster
    if(! serverList.containsKey(address)) {return false;
    }
    serverList.computeIfPresent(address, (s, member) -> {
        if (NodeState.DOWN.equals(newMember.getState())) {
            memberAddressInfos.remove(newMember.getAddress());
        }
        boolean isPublishChangeEvent = MemberUtil.isBasicInfoChanged(newMember, member);
        newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());
        MemberUtil.copy(newMember, member);
        if (isPublishChangeEvent) {
            notifyMemberChange();
        }
        return member;
    });
    return true;
}
Copy the code

In summary, each NACOS node will poll and select other nodes every 2s, report its own node information, and update the Member information in the serverList of both parties. If the peer health check fails, the peer node is marked as SUSPICIOUS, indicating that the peer may be offline. If more than three consecutive health check failures occur, the peer node is marked as DOWN. In addition, the health check is bidirectional, with each node actively initiating and passively receiving the health check.

Service subscription/query

GET/nacos/v1 / ns/instance/service subscription list/query, logic in com. Alibaba. Nacos. Naming. Controllers. InstanceController# doSrvIpxt method.

The difference between subscription and ordinary query is whether the udpPort sent by the client is 0. If the udpPort is 0, it indicates only query. If the udpPort is greater than 0, it indicates subscription.

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
  ObjectNode result = JacksonUtils.createEmptyJsonNode();
  // 1. Locate Service Obtain Service based on namespace + groupName @@servicename
  Service service = serviceManager.getService(namespaceId, serviceName);
  long cacheMillis = switchDomain.getDefaultCacheMillis();
  try {
    // Add a new udp push client
    if (udpPort > 0 && pushService.canEnablePush(agent)) {
      pushService
        .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                   pushDataSource, tid, app);
      cacheMillis = switchDomain.getPushCacheMillis(serviceName); // 10s}}catch (Exception e) {
    Loggers.SRV_LOG.error();
    cacheMillis = switchDomain.getDefaultCacheMillis();
  }
  if (service == null) {
    // ...
    result.replace("hosts", JacksonUtils.createEmptyArrayNode());
    return result;
  }
  // service.enabled=false Throws an exception
  checkIfDisabled(service);

  // 2. Service Locates Instance
  List<Instance> srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
  if (CollectionUtils.isEmpty(srvedIPs)) {
    // ...
    result.set("hosts", JacksonUtils.createEmptyArrayNode());
    return result;
  }
  // For instance grouping, healthy and unhealthy
  Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
  ipMap.put(Boolean.TRUE, new ArrayList<>());
  ipMap.put(Boolean.FALSE, new ArrayList<>());
  for (Instance ip : srvedIPs) {
    ipMap.get(ip.isHealthy()).add(ip);
  }
  // 3. Protected mode
  double threshold = service.getProtectThreshold();
  if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
    ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
    ipMap.get(Boolean.FALSE).clear();
  }

  // 4. Result assembly
  ArrayNode hosts = JacksonUtils.createEmptyArrayNode();

  for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
    List<Instance> ips = entry.getValue();
    if(healthyOnly && ! entry.getKey()) {continue;
    }
    for (Instance instance : ips) {
      if(! instance.isEnabled()) {continue;
      }
      ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
      ipObj.put("ip", instance.getIp());
      ipObj.put("port", instance.getPort());
      // ...
      hosts.add(ipObj);
    }
  }
  result.replace("hosts", hosts);
  // ...
  return result;
}
Copy the code

The query logic is a bit long. The main logic is to locate the Service Instance based on the namespace, group, and Service, locate the Service Instance based on the ClusterName, and return the Instance list in the Cluster.

// ServiceManager.java
// namespace - groupName@@serviceName - Service
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
public Service getService(String namespaceId, String serviceName) {
    if (serviceMap.get(namespaceId) == null) {
        return null;
    }
    return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
  	return serviceMap.get(namespaceId);
}
// Service.java
// Cluster registry where key is the Cluster name
private Map<String, Cluster> clusterMap = new HashMap<>();
public List<Instance> srvIPs(List<String> clusters) {
  if (CollectionUtils.isEmpty(clusters)) {
    clusters = new ArrayList<>();
    clusters.addAll(clusterMap.keySet());
  }
  return allIPs(clusters);
}
public List<Instance> allIPs(List<String> clusters) {
  List<Instance> result = new ArrayList<>();
  for (String cluster : clusters) {
    Cluster clusterObj = clusterMap.get(cluster);
    if (clusterObj == null) {
      continue;
    }
    result.addAll(clusterObj.allIPs());
  }
  return result;
}
Copy the code

Client registers UDP listener

InstanceController#doSrvIpxt Registers a listener in the PushService of the server, regardless of whether the service queried by the client exists. When the service changes, the server pushes the service to the client through UDP. The udpPort is the UDP port number of the client, which is passed in when the client initiates a query, as shown in the previous chapter.

@Autowired
private PushService pushService;
// InstanceController#doSrvIpxt
try {
  // Add a new udp push client
  if (udpPort > 0 && pushService.canEnablePush(agent)) {
    pushService
      .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                 pushDataSource, tid, app);
    cacheMillis = switchDomain.getPushCacheMillis(serviceName); // 10s}}catch (Exception e) {
  cacheMillis = switchDomain.getDefaultCacheMillis();
}
Copy the code
@Component
public class PushService implements ApplicationContextAware.ApplicationListener<ServiceChangeEvent> {
    // The first key is namespace+groupService and the second key is pushClient.tostring
    private static ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();
}
Copy the code

InstanceController#doSrvIpxt controls the client to pull the service registry at an interval of cacheMillis=10s. See UpdateTask for client service discovery in the previous chapter.

Protected mode

InstanceController#doSrvIpxt has a common logical action when handling a list of instances. When a large number of instances of a service go offline (instance. healthy=false), the protected mode is enabled, and the server considers that network partitions have occurred and returns the health status of all instances to the client. This is a representative feature of the AP schema registry, such as Eureka.

What is a lot?

A protectThreshold is maintained for each Service instance to calculate whether a large number of services are offline. The default value is 0.

public class Service implements Serializable {
    // Service protection threshold. When most services go offline, the current registry node is considered faulty and all instances, including unhealthy ones, are returned
    private float protectThreshold = 0.0 F;
}
Copy the code

If the active instance (ipmap.get (boolea.true).size())/total instance (srvedips.size) <= protectThreshold, the registry is considered faulty and enters the protected mode. All instances under the service are returned. When the default value is 0, if all instances of a service cannot communicate with Nacos, all instances of that service are returned.

// InstanceController#doSrvIpxt
// For instance grouping, healthy and unhealthy
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
for (Instance ip : srvedIPs) {
  ipMap.get(ip.isHealthy()).add(ip);
}
// 3. Protected mode
double threshold = service.getProtectThreshold();
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
  ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
  ipMap.get(Boolean.FALSE).clear();
}
Copy the code

Write a request

For client write requests (such as service registration), the server is peer to the client and any node can respond normally to the request.

On the server side, however, not all write requests are handled by the current node.

Methods such as /v1/ns/instance that handle client service registration and whose methods are annotated by CanDistro will be passed through DistroFilter.

// InstanceController
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
   / /...
}
Copy the code

ControllerMethodsCache returns RequestMapping annotated methods to DistroFilter based on the request path, request method, and request parameters. DistroMapper locates the responsible node based on the serviceName (groupName @@servicename).

If the current node is the responsible node, proceed with the subsequent logic; Otherwise, the current node forwards the write request to the responsible node and responds to the client with the response packet from the responsible node.

public class DistroFilter implements Filter {
    @Autowired
    private DistroMapper distroMapper;
    @Autowired
    private ControllerMethodsCache controllerMethodsCache;

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
            throws IOException, ServletException {
        ReuseHttpRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest);
        HttpServletResponse resp = (HttpServletResponse) servletResponse;
        try {
            String path = new URI(req.getRequestURI()).getPath();
            String serviceName = req.getParameter(CommonParams.SERVICE_NAME);
            // Find the method according to the request
            Method method = controllerMethodsCache.getMethod(req);
            // serviceName Version adaptation: use groupName @@servicenameString groupName = req.getParameter(CommonParams.GROUP_NAME); String groupedServiceName = ... ;// If annotated by CanDistro, and the current node is not responsible for groupedServiceName
            if(method.isAnnotationPresent(CanDistro.class) && ! distroMapper.responsible(groupedServiceName)) { String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {
                    resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
                            "receive invalid redirect request from peer " + req.getRemoteAddr());
                    return;
                }
								// Get the target node that is actually responsible for the service
                final String targetServer = distroMapper.mapSrv(groupedServiceName);
								// Assemble the request parameters
                List<String> headerList = new ArrayList<>(16);
                Enumeration<String> headers = req.getHeaderNames();
                while (headers.hasMoreElements()) {
                    String headerName = headers.nextElement();
                    headerList.add(headerName);
                    headerList.add(req.getHeader(headerName));
                }

                final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());
                final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());

                // Request the actual responsible node
                RestResult<String> result = HttpClient
                        .request("http://" + targetServer + req.getRequestURI(), headerList, paramsValue, body,
                                PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());
                String data = result.ok() ? result.getData() : result.getMessage();
                try {
                    // Get the response message from the responsible node to respond to the client
                    WebUtils.response(resp, data, result.getCode());
                } catch (Exception ignore) {
                    Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: "+ distroMapper.mapSrv(groupedServiceName) + urlString); }}else {
                // The current node is responsible for groupedServiceNameOverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(req); requestWrapper.addParameter(CommonParams.SERVICE_NAME, groupedServiceName); filterChain.doFilter(requestWrapper, resp); }}catch (AccessControlException e) {
            resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));
        } catch (NoSuchMethodException e) {
            resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,
                    "no such api:" + req.getMethod() + ":" + req.getRequestURI());
        } catch (Exception e) {
            resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                    "Server failed,"+ ExceptionUtil.getAllExceptionMsg(e)); }}}Copy the code

The key is how DistroMapper assigns which nodes are responsible for which services.

@Component("distroMapper")
public class DistroMapper extends MemberChangeListener {
    // Health node
    private volatile List<String> healthyList = new ArrayList<>();
    // Switch the service
    private final SwitchDomain switchDomain;
    // Node management
    private final ServerMemberManager memberManager;
}
Copy the code

DistroMapper maintains a list of cluster health nodes internally, which is updated when MembersChangeEvent is received. According to cluster Management in Section 4, MembersChangeEvent is triggered when the health status of a cluster node changes.

Focus on the onEvent method, which filters out nodes in UP and SUSPICIOUS states as Distro’s healthy nodes.

//DistroMapper
public void onEvent(MembersChangeEvent event) {
    List<String> list = MemberUtil.simpleMembers(MemberUtil.selectTargetMembers(event.getMembers(),
            member -> NodeState.UP.equals(member.getState()) || NodeState.SUSPICIOUS.equals(member.getState())));
    Collections.sort(list);
    Collection<String> old = healthyList;
    healthyList = Collections.unmodifiableList(list);
}
Copy the code

If hash(serviceName) % healthlist. size == the healthList subscript of the current node, the current node is considered to be the node responsible for the service. IndexOf +lastIndexOf == target

//DistroMapper
// Health node
private volatile List<String> healthyList = new ArrayList<>();
public boolean responsible(String serviceName) {
    final List<String> servers = healthyList;
    // If distro is turned off or the standalone startup thinks the current node can handle the write request
    if(! switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;
    }
    if (CollectionUtils.isEmpty(servers)) {
        return false;
    }
    // Servers subscript of the current node
    int index = servers.indexOf(EnvUtil.getLocalAddress());
    int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
    if (lastIndex < 0 || index < 0) {
        return true;
    }
    // Hash %servers size
    int target = distroHash(serviceName) % servers.size();
    return target >= index && target <= lastIndex;
}
private int distroHash(String serviceName) {
  return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
}
Copy the code

The same logic applies to obtaining the address of the responsible node from serviceName.

//DistroMapper
// Health node
private volatile List<String> healthyList = new ArrayList<>();
public String mapSrv(String serviceName) {
    final List<String> servers = healthyList;
    if(CollectionUtils.isEmpty(servers) || ! switchDomain.isDistroEnabled()) {return EnvUtil.getLocalAddress();
    }
    try {
        int index = distroHash(serviceName) % servers.size();
        return servers.get(index);
    } catch (Throwable e) {
        returnEnvUtil.getLocalAddress(); }}Copy the code

6. Service registration

With that in mind, take a look at the logic for service registration, POST /v1/ns/instance.

// InstanceController
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    final Instance instance = parseInstance(request);
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}
Copy the code

The ServiceManager registers an Instance in two steps: Ensure that the Service exists and add Instance to the Service.

// ServiceManager
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // 1. If the Service is registered for the first time, the Service is created and managed by the ServiceManager
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // Get the Service instance
    Service service = getService(namespaceId, serviceName);
    // 2. Add Instance to Service
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
Copy the code

Create a service

Because the Service Instance is maintained under the Service Service, ensure that a Service exists in the ServiceManager’s serviceMap first.

// ServiceManager
// namespace - groupName@@serviceName - Service
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
    createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
    Service service = getService(namespaceId, serviceName);
    // The serviceMap does not have a corresponding service instance before executing the subsequent logic
    if (service == null) {
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        // ...
				// Core logic
        putServiceAndInit(service);
         // Non-temporary node, persisting Service; Temporary nodes do not persist services
        if(! local) { addOrReplaceService(service); }}}Copy the code

Ignore the non-temporary node logic of local=false and focus on the putServiceAndInit method. In this case, the Service is written to the serviceMap (without Instance), and the Init method of the service is used to enable the heartbeat detection of the client corresponding to the service. Finally, the ConsistencyService listens on both temporary and non-temporary nodes of the service.

// ServiceManager
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
private void putServiceAndInit(Service service) throws NacosException {
    // 1. Write data to the serviceMap memory
    putService(service);
    // 2. Enable heartbeat detection on the client
    service.init();
    / / 3. Listen
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
}
Copy the code

Client heartbeat check later, let’s see what this listener does.

ConsistencyService Defines some common functions of KV storage, including add, delete, modify, check, and listen.

public interface ConsistencyService {
    void put(String key, Record value) throws NacosException;
    void remove(String key) throws NacosException;
    Datum get(String key) throws NacosException;
    void listen(String key, RecordListener listener) throws NacosException;
    void unListen(String key, RecordListener listener) throws NacosException;
    boolean isAvailable(a);
}
Copy the code

The implementation classes of ConsistencyService fall into two categories.

One is the agent class, which determines which ConsistencyService implementation class is actually used for processing based on the key pattern. Such as DelegateConsistencyServiceImpl according to whether the key matches the pattern of temporary node, or persistence decided to take the temporary node implementation.

@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {
    // Raft CP PersistentServiceProcessor
    private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;
    // Distro AP DistroConsistencyServiceImpl
    private final EphemeralConsistencyService ephemeralConsistencyService;
    @Override
    public void put(String key, Record value) throws NacosException {
        mapConsistencyService(key).put(key, value);
    }
    @Override
    public void listen(String key, RecordListener listener) throws NacosException {
        / /...
        mapConsistencyService(key).listen(key, listener);
    }
		// ...
    private ConsistencyService mapConsistencyService(String key) {
        returnKeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }}Copy the code

The other class is the real implementation class.

PersistentServiceProcessor is based on JRaft to realize the consistency of the service, before watching configuration center know, divided into consistent writing and reading (linear). For registries, persistent nodes go Raft consistency services.

// PersistentServiceProcessor
@Override
public void put(String key, Record value) throws NacosException {
    final BatchWriteRequest req = new BatchWriteRequest();
    Datum datum = Datum.createDatum(key, value);
    req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
    final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
            .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
    try {
        protocol.write(request);
    } catch (Exception e) {
        throw newNacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage()); }}@Override
public Datum get(String key) throws NacosException {
  final List<byte[]> keys = new ArrayList<>(1);
  keys.add(ByteUtils.toBytes(key));
  final ReadRequest req = ReadRequest.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP)
    .setData(ByteString.copyFrom(serializer.serialize(keys))).build();
  try {
    Response resp = protocol.getData(req);
    if (resp.getSuccess()) {
      BatchReadResponse response = serializer
        .deserialize(resp.getData().toByteArray(), BatchReadResponse.class);
      final List<byte[]> rValues = response.getValues();
      return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key));
    }
    throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg());
  } catch (Throwable e) {
    throw newNacosException(ErrorCode.ProtoReadError.getCode(), e.getMessage()); }}Copy the code

DistroConsistencyServiceImpl Distro based protocol, the consistency is temporary node would leave if service, will only store the data in memory. The listener is a Service that implements the RecordListener interface.

// DistroConsistencyServiceImpl
private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();
@Override
public void listen(String key, RecordListener listener) throws NacosException {
  if(! listeners.containsKey(key)) { listeners.put(key,new ConcurrentLinkedQueue<>());
  }
  if (listeners.get(key).contains(listener)) {
    return;
  }
  listeners.get(key).add(listener);
}
Copy the code

Walked so far to create a Service process, main is to create a Service instance in ServiceManager memory map, open the client Service heartbeat detection, finally in DistroConsistencyServiceImpl registration Service instance change monitoring.

Registered instance

The second step in Service registration is to update the list of instances within the Service, adding new instances to the list of instances.

// ServiceManager
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    Service service = getService(namespaceId, serviceName);
    synchronized (service) {
        // Get the current Instance list from the underlying store, add the newly added Instance and return it
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        // Write to the underlying storageconsistencyService.put(key, instances); }}Copy the code

Focus on the implementation of ConsistencyService.put. Focus on temporary node registered, the implementation class here is DistroConsistencyServiceImpl.

// DistroConsistencyServiceImpl
public void put(String key, Record value) throws NacosException {
    // Write data
    onPut(key, value);
    // Write data to all Member
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
}
Copy the code

The local update

First, the onPut method writes data to the underlying storage. DataStore is a memory-based KV storage. Datum encapsulates the KV structure. A task is then submitted through Notifier to notify the instance of a service instance change.

// DistroConsistencyServiceImpl
private volatile Notifier notifier = new Notifier();
private final DataStore dataStore;
public void onPut(String key, Record value) {
    // 1. If it is a temporary node, write it to the memory map
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        dataStore.put(key, datum);
    }
    if(! listeners.containsKey(key)) {return;
    }
    // 2. Add a key change task and notify listeners later
    notifier.addTask(key, DataOperation.CHANGE);
}
Copy the code

Notifier is a simple production consumption model implementing a Runnable interface that will change the service to call the corresponding RecordListener.

// DistroConsistencyServiceImpl.Notifier
// Task queue
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
// Production task
public void addTask(String datumKey, DataOperation action) {
  // ...
  tasks.offer(Pair.with(datumKey, action));
}
// Consume tasks
@Override
public void run(a) {
  for(; ;) { Pair<String, DataOperation> pair = tasks.take(); handle(pair); }}/ / call the Listener
private void handle(Pair<String, DataOperation> pair) {
    String datumKey = pair.getValue0();
    DataOperation action = pair.getValue1();
    for (RecordListener listener : listeners.get(datumKey)) {
      if (action == DataOperation.CHANGE) {
        listener.onChange(datumKey, dataStore.get(datumKey).value);
      }
      if(action == DataOperation.DELETE) { listener.onDelete(datumKey); }}}Copy the code

Service implements the RecordListener interface, which receives a callback whenever an Instance in the underlying storage changes. Update the ClusterMap in the memory and push the changed information to all clients listening to the current service through UDP.

// Service
public void onChange(String key, Instances value) throws Exception {
    // ...
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    // ...
}
// Cluster registry where key is the Cluster name
private Map<String, Cluster> clusterMap = new HashMap<>();
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
  Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
  for (Instance instance : instances) {
    // ...
  }
  for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
    List<Instance> entryIPs = entry.getValue();
    clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
  }
  setLastModifiedMillis(System.currentTimeMillis());
  // UDP pushes Service changes to the client
  getPushService().serviceChanged(this);
}
Copy the code

The logic of the UDP push client is simple. For details, see the logic of the CLIENT receiving UDP push in Section 2 of this chapter to register the LOGIC of the UDP listening client during service query.

Cluster Data Synchronization

DistroConsistencyServiceImpl after writing data to the underlying storage, data will be written to delay 1 s (nacos. Naming. Distro. TaskDispatchPeriod / 2 = 2 s / 2 = 1 s) pushed to every node in the cluster. (This means that if a client senses a change in the service registry and immediately queries the registry with another node in the cluster, it may return inconsistent data)

// DistroConsistencyServiceImpl
public void put(String key, Record value) throws NacosException {
    // Write data
    onPut(key, value);
    // Write data to all Member
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
}
Copy the code

DistroProtocol Submits a DistroDelayTask to all members of the loop node (including unhealthy ones) for each cluster node.

// DistroProtocol
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        DistroDelayTask distroDelayTask = newDistroDelayTask(distroKeyWithTarget, action, delay); distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); }}Copy the code

The subsequent process is quite long, which is mainly to call the **/v1/ NS /distro/datum** interface of each node after re-querying the underlying storage by key to obtain the latest data.

Each node to receive data synchronization request that, in the end, is to call DistroConsistencyServiceImpl processData method, after the transformation parameters or call a local updating method for onPut, refer to the previous section.

// DistroConsistencyServiceImpl
public boolean processData(DistroData distroData) {
    DistroHttpData distroHttpData = (DistroHttpData) distroData;
    Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
    onPut(datum.key, datum.value);
    return true;
}
Copy the code

7. Heartbeat of the client

Heartbeat timeout detection task

During Service registration, each Service starts a scheduled task to check whether Instance of the current Service sends heartbeat messages on time. The scheduled task is started when the init method of the Service is called and is executed every 5s.

// Service
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
// Cluster registry where key is the Cluster name
private Map<String, Cluster> clusterMap = new HashMap<>();
public void init(a) {
    // Submit the client heartbeat detection task every 5s
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    // ...
}
Copy the code

ClientBeatCheckTask A client heartbeat timeout detection task that loops through all temporary nodes. If no heartbeat is received within 15 seconds, Instance is marked as unhealthy (note that DataStore is not updated). If no heartbeat is received within 30 seconds, call DELETE /v1/ns/instance on the current node to DELETE the instance. (The DELETE /v1/ns/instance process is the opposite of registering an instance, but also updates a Service instance.)

public class ClientBeatCheckTask implements Runnable {
    private Service service;
    @Override
    public void run(a) {
        try {
            // distro if the current node is not responsible for the service, no processing
            if(! getDistroMapper().responsible(service.getName())) {return;
            }
            if(! getSwitchDomain().isHealthCheckEnabled()) {return;
            }
            // All temporary instances
            List<Instance> instances = service.allIPs(true);
            If no heartbeat is received after 15s, it is marked as unhealthy
            for (Instance instance : instances) {
                
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if(! instance.isMarked()) {if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            // UDP push clientgetPushService().serviceChanged(service); }}}}if(! getGlobalConfig().isExpireInstance()) {return;
            }
            // If the heartbeat is not received for more than 30 seconds, it is deleted from the registry directly
            for (Instance instance : instances) {
                if (instance.isMarked()) {
                    continue;
                }
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // Invoke the current node DELETE /v1/ns/instance through HTTPdeleteIp(instance); }}}catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); }}}Copy the code

The heartbeat timeout two threshold from the Instance of the metadata, preserved.. Heart beat. A timeout (the default 15 s) and preserved. IP. Delete. The timeout 30 s (the default), unit is milliseconds.

// Instance
public long getInstanceHeartBeatTimeOut(a) {
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_TIMEOUT,
            Constants.DEFAULT_HEART_BEAT_TIMEOUT);
}

public long getIpDeleteTimeout(a) {
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.IP_DELETE_TIMEOUT,
            Constants.DEFAULT_IP_DELETE_TIMEOUT);
}
Copy the code

Instance metadata can be set at service registration time or at the console in JSON format or in k1=v1,k2=v2 format.

Handle client heartbeat

Client heart PUT/nacos/v1 / ns/instance/beat. Because the client heartbeat is a write operation (updating the last heartbeat time of an instance in memory), it is annotated by @candistro and handled by the responsible node in the cluster.

// com.alibaba.nacos.naming.controllers.InstanceController
@CanDistro
@PutMapping("/beat")
public ObjectNode beat(HttpServletRequest request) throws Exception {
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // The default control client heartbeat is 5 seconds
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    / / from the request for clientBeat, namespace, serviceName, clusterName...
   	String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    if (StringUtils.isNotBlank(beat)) {
      clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    // 1. If the service is not registered, perform registration logic
    if (instance == null) {
        // If lightBeatEnabled=true and the client is not registered when the heartbeat is sent, the client must initiate the registration
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }
        // Server register logic
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        // ...
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    Service service = serviceManager.getService(namespaceId, serviceName);
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
       // ...
    }
    // 2. Update instance health status
    service.processClientBeat(clientBeat);
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    . / / if the instance is set heartbeat interval preserved. Heart beat. The interval, give priority to use the instance the heartbeat interval setting
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    // Server controls whether to allow light Beat. Default is true
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}
Copy the code

What can I do if the server does not receive the client service registration request when the client sends heartbeat messages?

After reviewing the previous chapter, when the client sends a heartbeat, determine the server response code = NamingResponseCode. RESOURCE_NOT_FOUND, will initiate the registration request at a time.

But it depends on lightBeatEnabled.

LightBeatEnabled =false: disables lightBeat and requires the client to send full heartbeat information. When the server finds that the client is not registered, it uses the beat in the request parameter to deserialize to RsInfo to register the client.

LightBeatEnabled =true, the default option, indicates that the client does not send complete heartbeat information. When the server return when they see the client has not yet registered. Code = NamingResponseCode RESOURCE_NOT_FOUND.

In summary, as described in the previous chapter, by default, when a client sends a heartbeat, the server does not receive a client service registration request, requiring the client to initiate a registration request.

What is the interval between heartbeats?

As mentioned in the previous chapter, by default the client initiates a heartbeat request every 5s.

Look from the service side, heartbeat interval and heartbeat timeout threshold, can be configured Instance metadata control, key = preserved.. Heart beat. The interval, the server’s default time is 5 s.

// Instance
public long getInstanceHeartBeatInterval(a) {
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
            Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
Copy the code

What to do with a heartbeat?

The client heartbeat is processed by the Service, and the RsInfo input contains key information such as the client IP address, port, and cluster.

// Service
public void processClientBeat(final RsInfo rsInfo) {
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
Copy the code

Submit a ClientBeatProcessor task to be executed asynchronously and immediately, updating the health of the Instance corresponding to the heartbeat and the last heartbeat time. Udp notifies the listening client if the health status changes.

// ClientBeatProcessor
public void run(a) {
  Service service = this.service;
  String ip = rsInfo.getIp();
  String clusterName = rsInfo.getCluster();
  int port = rsInfo.getPort();
  Cluster cluster = service.getClusterMap().get(clusterName);
  // Get all temporary nodes
  List<Instance> instances = cluster.allIPs(true);
  for (Instance instance : instances) {
    // Find the heartbeat instance
    if (instance.getIp().equals(ip) && instance.getPort() == port) {
      // Update the last heartbeat time
      instance.setLastBeat(System.currentTimeMillis());
      if(! instance.isMarked()) {// Update health status
        if(! instance.isHealthy()) { instance.setHealthy(true);
          // Udp push client
          getPushService().serviceChanged(service);
        }
      }
    }
  }
}
Copy the code

Note that the ConsistencyService is not used to update the Instance of the underlying store (note that the DataStore is not updated).

8. Cluster data synchronization

Whether service registration POST /v1/ns/instance or service derestration DELETE /v1/ns/instance, the responsible node asynchronously synchronizes the change registration information to other non-responsible nodes.

However, in heartbeat processing, Instance health status changes and data is not synchronized between clusters (except in the case where Instance is deleted and DELETE /v1/ns/ Instance is deleted when heartbeat timed out for 30s). Even the Health status of the ServiceManager on the responsible node is different from that of the Instance in the DataStore. Why is that?

Inconsistent data for the cluster. If the service instance times out for 15 seconds, the network jitter is temporary. Therefore, the service instance does not need to be synchronized to other nodes. Only when the 30 s, overtime think instance really get offline, real removed from the Service, through the DELETE/v1 / ns/instance real execution Service cancellation process (ServiceManager update/DataStore/cluster synchronization).

The internal data of the node is inconsistent. All nodes provide query Service interfaces through ServiceManager. Instance in Service is set to false, which is correct for clients. DataStore belongs to Nacos internal logic, used for cluster data synchronization.

In addition, to ensure the consistency of data on all nodes, a scheduled task, DistroVerifyTask, is actually submitted during the construction of DistroProtocol. The responsible node synchronizes its responsible service information to other nodes every 5s. This DistroData is of type VERIFY and contains only a summary (MD5) of the list of instances that the service contains.

public class DistroVerifyTask implements Runnable {
    private final ServerMemberManager serverMemberManager;
    private final DistroComponentHolder distroComponentHolder;
    
    @Override
    public void run(a) {
        List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
        for(String each : distroComponentHolder.getDataStorageTypes()) { verifyForDataStorage(each, targetServer); }}private void verifyForDataStorage(String type, List<Member> targetServer) {
        DistroData distroData = distroComponentHolder.findDataStorage(type).getVerifyData();
        distroData.setType(DataOperation.VERIFY);
        for(Member member : targetServer) { distroComponentHolder.findTransportAgent(type).syncVerifyData(distroData, member.getAddress()); }}}Copy the code

Other non-responsible nodes receive VERIFY Distro data via PUT /v1/ns/distro/checksum.

The responsibility node DistroConsistencyServiceImpl# onReceiveChecksums combined with the data in the DataStore current node, than the service need update, and delete.

Delete the service to be deleted from DataSore and ServiceManager.

For a service to be updated, you need to call GET, V1, NS, distro, or datum to query the responsible node to obtain the service registry information (queried from the DataStore) and update the registration information in the DataStore and ServiceManager.

// DistroConsistencyServiceImpl
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
  // Group data according to the data sent by the responsible node, combined with the data in the own DataStore
  // Service to be updated
  List<String> toUpdateKeys = new ArrayList<>();
  // Service to be deleted
  List<String> toRemoveKeys = new ArrayList<>();

  // Delete dataStore&ServiceManager
  for (String key : toRemoveKeys) {
    onRemove(key);
  }

  // Update the dataStore&ServiceManager memory registry with a secondary request to GET /v1/ns/distro/datum
  DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, server);
  distroKey.getActualResourceTypes().addAll(toUpdateKeys);
  DistroData remoteData = distroProtocol.queryFromRemote(distroKey);
  if (null != remoteData) {
    processData(remoteData.getContent());
  }
}
Copy the code

conclusion

  • The server model includes: ServiceManager manages the mapping between namespace+ Group + Service and Service instances; Service Manages the mapping between services and clusters. Cluster manages its underlying list of persistent/temporary instances.

  • Nacos cluster management: through MemberLoopup Nacos cluster initialization, can use commonly Nacos. Home/cluster/cluster. The conf initialization configuration file. Each node executes POST /v1/core/cluster/report every 2 seconds to send information about the current node to random nodes (including DOWN) in the cluster. This is to synchronize information about the current node and perform health check.

    The health check is bidirectional. Each node either initiates or receives the health check. If the health check fails, mark the SUSPICIOUS end node as Distro, which means the peer may be offline but can participate in Distro protocol to handle write requests. If more than three consecutive health check failures occur, the peer node is marked as DOWN.

  • Distro write requests: DistroFilter blocks Distro write requests to clients, such as service registrations, client heartbeats (based on the @candistro annotation). Check whether groupServiceName in the request parameter belongs to the management scope of the current node by using hash. If it does not belong to the management scope of the current node, forward the process to other nodes and return the return information of other nodes to the client. If it is managed by the current node, go to Controller.

  • Service Subscription/query: Distro protocol, every node in a cluster stores all data, and each node can process read requests to return data from the current node registry, regardless of whether it’s up to date. Service query refers to locating the Service managed by the ServiceManager on the server based on the namespace and groupServiceName provided by the client. In addition, if the udpPort is greater than 0, the service subscription will register the client UDP listening information to the memory Map, and notify the client through UDP after the registry changes. 2. When a large number of instances of a service (controlled by service.protectThreshold, 0 by default) are offline, the protection mode will be enabled. The server thinks it has been partitioned and returns all instances to the client as healthy.

  • Service registration: goes through the DistroFilter and can only be handled by the responsible node. Three things are done: 1. Update the registry (memory Map) of the current node; 2. Sync the updated Service to other nodes (Distro protocol states that each node can execute query requests and each node has full data); 3. UDP push client that listens to the service.

  • Client heartbeat: will pass through the DistroFilter. Every 5 s client to the server by heartbeat request PUT/nacos/v1 / ns/instance/beat, the server will update the instance in the memory of the last heart health and time; Each Service on the server performs a heartbeat timeout detection task every 5s. If no heartbeat is sent within 15 seconds, set an Instance to unhealthy. If no heartbeat is sent within 30 seconds, remove the Instance. Heartbeat time Settings can be set on the console or in the Instance metadata during service registration.

    meaning Instance Metadata Key The default value
    Interval between client sending heartbeat (ms) preserved.heart.beat.interval 5000
    Heartbeat timeout (marking instance as unhealthy) (ms) preserved.heart.beat.timeout 15000
    Heartbeat timeout (remove instance) (ms) preserved.ip.delete.timeout 30000
  • Cluster data synchronization: ** When service registration or service deregistration occurs (including 30 seconds heartbeat timeout on the client) **, the responsible node will synchronize service data to other non-responsible nodes. If the server detects that the client heartbeat timed out for 15 seconds (less than 30 seconds), the server marks the instance as unhealthy on the current responsible node and does not synchronize the unhealthy instance to other nodes. After the server receives the heartbeat from the client again (15 to 30 seconds), the instance is marked as healthy and data is not synchronized. Responsibility node (every 5 seconds. The default nacos. The core protocol. The distro. Data. Verify_interval_ms = 5000 ms), synchronize all their Service Instance list of MD5 to other nodes, If other nodes detect MD5 changes, they check the responsible node and update local data.