preface

  • Recently, I have been studying the source code and execution process principle of the Nacos open source framework
  • This time, I will briefly talk about the principle of Nacos heartbeat design in AP cluster architecture

Nacos client heartbeat health report source

  • When the client registers with the NACOS server, a scheduled task thread, BeatTask, is started. By default, heartbeat is sent to the server for detection every 5 seconds.
public class NacosNamingService implements NamingService { ...... @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); beatInfo.setPeriod(instance.getInstanceHeartBeatInterval()); / / create a sends a heartbeat timing task beatReactor addBeatInfo (NamingUtils. GetGroupedName (serviceName, groupName), beatInfo); } / / the current client information registration to serverProxy nacos service. The registerService (NamingUtils. GetGroupedName (serviceName, groupName), groupName, instance); }... }Copy the code
  • The client heartbeat health report object BeatReactor is a timer that sends HTTP requests to the server periodically
  • If the heartbeat health check is disconnected, the last heartbeat is registered and reported again within 30 seconds
public class BeatReactor { ...... public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; // Build a timed task thread pool, And is set to the daemon thread executorService = new ScheduledThreadPoolExecutor (threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; }}); } public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) ! = null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); Try {/ / send the request to the heartbeat nacos server-side JSONObject result. = serverProxy sendBeat (beatInfo BeatReactor. Enclosing lightBeatEnabled); long interval = result.getIntValue("clientBeatInterval"); boolean lightBeatEnabled = false; if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.containsKey(CommonParams.CODE)) { code = result.getIntValue(CommonParams.CODE); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); Try {/ / instance register reported serverProxy. RegisterService (beatInfo. GetServiceName (), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ne) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JSON.toJSONString(beatInfo), ne.getErrCode(), ne.getErrMsg()); } executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); }}}Copy the code

Nacos server heartbeat health check source code

  • After receiving the registered instance from the client, the nacOS server starts a scheduled task thread ClientBeatCheckTask to check whether the last heartbeat interval of the instance is greater than 15 seconds every 5 seconds. If the interval is greater than 15 seconds, the health status is set to false. If the interval is longer than 30 seconds, the instance will be deleted, and the delete interface will be deleted through HTTP self-invocation, and the heartbeat will be re-registered.

Note: In the Nacos cluster environment, only one Nacos node will be connected to the client for heartbeat monitoring, and then this node will synchronize the client information to other Nacos nodes.

/** * Check and update statues of ephemeral instances, remove them if they have been expired. * * @author nkorange */ public class ClientBeatCheckTask implements Runnable { private Service service; public ClientBeatCheckTask(Service service) { this.service = service; }... @override public void run() {try {Override public void run() {if (! getDistroMapper().responsible(service.getName())) { return; } if (! getSwitchDomain().isHealthCheckEnabled()) { return; } List<Instance> instances = service.allIPs(true); // Determine if the last heartbeat time of the client Instance is more than 15 seconds, if so, set the health status to false and notify other nacOS nodes for (Instance Instance: instances) { if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (! instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (! getGlobalConfig().isExpireInstance()) { return; } // Check whether the last heartbeat time of the client Instance is more than 30 seconds. If it is more than 30 seconds, the client will be deleted by calling HTTP request. instances) { if (instance.isMarked()) { continue; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); }}... }Copy the code
  • In the nacOS cluster environment, only one NACOS node is enabled for heartbeat detection. In the distromapper.responsible () method, specify the hash algorithm to determine which node performs heartbeat detection
/** * Distro mapper, judge which server response input service. * * @author nkorange */ @Component("distroMapper") public class DistroMapper extends MemberChangeListener { ...... /** * Judge whether current server is responsible for input service. * * @param serviceName service name * @return true if input service is response, otherwise false */ public boolean responsible(String serviceName) { final List<String> servers = healthyList; if (! switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) { return true; } if (CollectionUtils.isEmpty(servers)) { // means distro config is not ready yet return false; } int index = servers.indexOf(EnvUtil.getLocalAddress()); int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress()); if (lastIndex < 0 || index < 0) { return true; } int target = distroHash(serviceName) % servers.size(); return target >= index && target <= lastIndex; } private int distroHash(String serviceName) { return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE); }... }Copy the code
  • The algorithm is mainly to model nacOS cluster nodes, and then determine which node to make heartbeat connection.
  • The problem is that if the NACOS nodes hang, there is a problem with the way they are modded, so the NACOS cluster nodes need to synchronize their states.
  • Design principle of node state synchronization in Nacos cluster Architecture

The last

  • Learn with an open mind and make progress together