The background,

We know that SpringCloud Eureka is a component in the SpringCloud collection, which is an integration of Euraka for service registration and discovery. Eureka is an open source framework in Netflix. Like ZooKeeper and Consul, it is used for service registration management. Eureka consists of multiple instances (service instances), which can be divided into two types: Eureka Server and Eureka Client. For ease of understanding, we divide Eureka client into Service Provider and Service Consumer.

  • Eureka Server provides service registration and discovery
  • Service Provider A Service Provider registers its Service with Eureka so that Service consumers can find it
  • The Service Consumer obtains a list of registered services from Eureka and is able to consume services

Compared with Zookeeper, Eureka implements AP, which ensures the fault tolerance and availability of partition in CAP theory. Then how Eureka implements AP, and what benefits AP has, which is to find the answer from Eureka’s source code. This article first from the Eureka Server part to parse the source code.

Second,Eureka architecture diagram:

3. Eureka core function points

  1. Service registration The Eureka Client registers its services with the Eureka Server by sending REST requests and provides its metadata, such as IP address, port, URL of health indicator, and home page address. Once the Eureka Server receives the registration request, it stores the metadata information in a two-tier Map.
  2. Service renewal After a service is registered, the Eureka Client maintains a heartbeat to continuously notify the Eureka Server that the service is still available and cannot be excluded. By default, the Eureka Client sends a heartbeat every 30 seconds to renew the service
  3. Service synchronization Eureka servers register with each other to build a Eureka Server cluster. Services are synchronized between different Eureka servers to ensure service information consistency.
  4. Service invocation After obtaining the service list, the service consumer can find the address of other services according to the service list information in the list, so as to make remote invocation. Eurek has the concepts of regions and zones. A Region can contain multiple zones. During service invocation, service providers in the same Zone are preferentially accessed.
  5. When Eureka Client is started, it sends a REST request to the Eureka Server to obtain the list of registered services. The request is cached locally by the Eureka Client for 30 seconds by default. Also, for performance purposes, Eureka Server maintains a read-only service list cache that is updated every 30 seconds.
  6. Sometimes, a service instance may fail to provide services due to network faults, and the instance does not send a request to the Eureka Server for service offline. Therefore, a service deletion mechanism is required. Eureka Server creates a scheduled task when it is started. Every time (60 seconds by default), the Eureka Server removes the services that have not been renewed (90 seconds by default) from the current service list.
  7. Self-protection Within a short period, the Eureka Server collects statistics on the percentage of renewal failures. If the percentage reaches a certain threshold, the Eureka Server triggers the self-protection mechanism. Under this mechanism, the Eureka Server does not remove any micro-services and exits the self-protection mechanism after they become normal.
  8. When the Eureka Client needs to be shut down or restarted, it does not want any requests to come in during this period. Therefore, it needs to send a REST request to the Eureka Server in advance to inform the Eureka Server that it is going offline. After receiving the request, the Eureka Server sends a REST request to the Eureka Server. The service status is set to DOWN and the offline event is propagated.

4. Eureka Server code parsing

EnableEurekaServer: Anyone familiar with the SpringBoot startup process knows that it automatically injects some configuration into the container

@ Import (EurekaServerMarkerConfiguration. Class) : to the IOC container Import EurekaServerMarkerConfiguration the configuration class

Find EurekaServer through springboot automatic injection mechanism, under the jars of spring. The factories files, according to the spring again. The factories find EurekaServerAutoConfiguration configuration file class, The configuration class did two things, 1: @ Import (EurekaServerInitializerConfiguration. Class) to the IOC container Import EurekaServer initialization configuration class 2: EurekaServer need some kind of loaded into the container

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {

   /** * List of packages containing Jersey resources required by the Eureka server. */
   private static final String[] EUREKA_PACKAGES = new String[] {
         "com.netflix.discovery"."com.netflix.eureka" };

   @Autowired
   private ApplicationInfoManager applicationInfoManager;

   @Autowired
   private EurekaServerConfig eurekaServerConfig;

   @Autowired
   private EurekaClientConfig eurekaClientConfig;

   @Autowired
   private EurekaClient eurekaClient;

   @Autowired
   private InstanceRegistryProperties instanceRegistryProperties;

   /**
    * A {@link CloudJacksonJson} instance.
    */
   public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();

   @Bean
   public HasFeatures eurekaServerFeature(a) {
      return HasFeatures.namedFeature("Eureka Server",
            EurekaServerAutoConfiguration.class);
   }
   
   // load EurekaController. SpringCloud provides some interfaces to get information about EurekaServer
   @Bean
   @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
   public EurekaController eurekaController(a) {
      return new EurekaController(this.applicationInfoManager);
   }

   static {
      CodecWrappers.registerWrapper(JACKSON_JSON);
      EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec());
   }

   @Bean
   public ServerCodecs serverCodecs(a) {
      return new CloudServerCodecs(this.eurekaServerConfig);
   }

   private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) {
      CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName());
      return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec;
   }

   private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) {
      CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName());
      return codec == null ? CodecWrappers.getCodec(CodecWrappers.XStreamXml.class)
            : codec;
   }

   @Bean
   @ConditionalOnMissingBean
   public ReplicationClientAdditionalFilters replicationClientAdditionalFilters(a) {
      return new ReplicationClientAdditionalFilters(Collections.emptySet());
   }

   // Initialize the cluster registry
   @Bean
   public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) {
      this.eurekaClient.getApplications(); // force initialization
      return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
            serverCodecs, this.eurekaClient,
            this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
   }

   // Initialize Eureka node
   @Bean
   @ConditionalOnMissingBean
   public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
      return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
            this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
            replicationClientAdditionalFilters);
   }
   
   
   // Eureka context
   @Bean
	@ConditionalOnMissingBean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}

    // EurekaServer initiator
	@Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
		return new EurekaServerBootstrap(this.applicationInfoManager,
				this.eurekaClientConfig, this.eurekaServerConfig, registry,
				serverContext);
	}

	/**
	 * Register the Jersey filter.
	 * @param eurekaJerseyApp an {@link Application} for the filter to be registered
	 * @return a jersey {@link FilterRegistrationBean}
	 */
    // Jersey interceptor, which implements the Jersey framework in ServletContainer to implement the restFull interface to eurekaServer
	@Bean
	publicFilterRegistrationBean<? > jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean<Filter> bean =new FilterRegistrationBean<Filter>();
		bean.setFilter(new ServletContainer(eurekaJerseyApp));
		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
		bean.setUrlPatterns(
				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/ *"));

		return bean;
	}

	/**
	 * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
	 * required by the Eureka server.
	 * @param environment an {@link Environment} instance to retrieve classpath resources
	 * @param resourceLoader a {@link ResourceLoader} instance to get classloader from
	 * @return created {@link Application} object
	 */
    // Construct the Jersey application instance
	@Bean
	public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {

		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
				false, environment);

		// Filter to include only classes that have a particular annotation.
		//
		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));

		// Find classes in Eureka packages (or subpackages)
		//Set<Class<? >> classes =new HashSet<>();
		for (String basePackage : EUREKA_PACKAGES) {
			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
			for (BeanDefinition bd : beans) {
				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
						resourceLoader.getClassLoader());
				classes.add(cls);
			}
		}

		// Construct the Jersey ResourceConfig
		Map<String, Object> propsAndFeatures = new HashMap<>();
		propsAndFeatures.put(
				// Skip static content used by the webapp
				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");

		DefaultResourceConfig rc = new DefaultResourceConfig(classes);
		rc.setPropertiesAndFeatures(propsAndFeatures);

		return rc;
	}
Copy the code
@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration
      implements ServletContextAware.SmartLifecycle.Ordered {

   private static final Log log = LogFactory
         .getLog(EurekaServerInitializerConfiguration.class);

   @Autowired
   private EurekaServerConfig eurekaServerConfig;

   private ServletContext servletContext;

   @Autowired
   private ApplicationContext applicationContext;

   @Autowired
   private EurekaServerBootstrap eurekaServerBootstrap;

   private boolean running;

   private int order = 1;

   @Override
   public void setServletContext(ServletContext servletContext) {
      this.servletContext = servletContext;
   }

   @Override
   public void start(a) {
      // Start a thread, and since this configuration class implements the SmartLifecycle interface, the start() method is then called back
      new Thread(() -> {
         try {
            // TODO: is this class even needed now?
            // Context initialization
            eurekaServerBootstrap.contextInitialized(
                  EurekaServerInitializerConfiguration.this.servletContext);
            log.info("Started Eureka Server");

            publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
            EurekaServerInitializerConfiguration.this.running = true;
            publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
         }
         catch (Exception ex) {
            // Help!
            log.error("Could not initialize Eureka servlet context", ex);
         }
      }).start();
   }

   private EurekaServerConfig getEurekaServerConfig(a) {
      return this.eurekaServerConfig;
   }

   private void publish(ApplicationEvent event) {
      this.applicationContext.publishEvent(event);
   }

   @Override
   public void stop(a) {
      this.running = false;
      eurekaServerBootstrap.contextDestroyed(this.servletContext);
   }

   @Override
   public boolean isRunning(a) {
      return this.running;
   }

   @Override
   public int getPhase(a) {
      return 0;
   }

   @Override
   public boolean isAutoStartup(a) {
      return true;
   }

   @Override
   public void stop(Runnable callback) {
      callback.run();
   }

   @Override
   public int getOrder(a) {
      return this.order; }}Copy the code

The contextInitialized method for EurekaServerBootstrap:

public void contextInitialized(ServletContext context) {
   try {
      // Initialize the Eureka environment
      initEurekaEnvironment();
      // Initialize the EurekaServer context
      initEurekaServerContext();

      context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
   }
   catch (Throwable e) {
      log.error("Cannot bootstrap eureka server :", e);
      throw new RuntimeException("Cannot bootstrap eureka server :", e); }}protected void initEurekaServerContext(a) throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);

		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
			this.awsBinder.start();
		}

		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		// Copy registry from neighboring eureka node
    	// Copy the full registry from an adjacent node
		int registryCount = this.registry.syncUp();
    	// By default, heartbeat is sent every 30 seconds, twice a minute
        // Change the eureka state to Up and start a scheduled task to clean up clients that have no heartbeat for 60 seconds. Automatic logoff
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);

		// Register all monitoring statistics.
		EurekaMonitors.registerAllStats();
	}
Copy the code

Registry. SyncUp () method:

Override
public int syncUp(a) {
    // Copy entire entry from neighboring DS node
    int count = 0;

    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                        // Register other node information with this node
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; }}catch (Throwable t) {
                    logger.error("During DS init copy", t); }}}}return count;
}
Copy the code

The register () method:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    / / read lock
    read.lock();
    try {
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if(existingLease ! =null&& (existingLease.getHolder() ! =null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); }}else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    // The renewal threshold is calculated as follows: number of clients expected to renew contracts * (60/ Expected client renewal interval) * Threshold of the renewal ratio
                    updateRenewsPerMinThreshold();
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if(existingLease ! =null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        recentRegisteredQueue.add(new Pair<Long, String>(
                System.currentTimeMillis(),
                registrant.getAppName() + "(" + registrant.getId() + ")"));
        // This is where the initial state transfer of overridden status happens
        if(! InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if(! overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if(overriddenStatusFromMap ! =null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        // Expire the cache
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally{ read.unlock(); }}Copy the code

OpenForTraffic () method

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // Updates every 30 seconds
    this.expectedNumberOfClientsSendingRenews = count;
    // Minimum number of renewals per minute
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    // Enable the scheduled task once every 60 seconds by default
    super.postInit();
}
Copy the code

PostInit () method

protected void postInit(a) {
    // Service culling method
    renewsLastMin.start();
    if(evictionTaskRef.get() ! =null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}
Copy the code

EurekaServerAutoConfiguration eurekaServerContext method, in which created a DefaultEurekaServerContext object, There is an initialize() method in this class that is decorated with the @postConstruct annotation and is called when the application loads.

@PostConstruct
@Override
public void initialize(a) {
    logger.info("Initializing ...");
    // A thread pool that starts a thread updates the information of other nodes for the first time, and then starts a timed thread pool that updates the information every 60 seconds, which means that the node configuration can be changed dynamically later based on the configuration
    peerEurekaNodes.start();
    try {
        registry.init(peerEurekaNodes);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    logger.info("Initialized");
}
Copy the code

Start () method

public void start(a) {
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    returnthread; }});try {
        // Update the cluster node information for the first time
        updatePeerEurekaNodes(resolvePeerUrls());
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run(a) {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e); }}};// Start a thread pool to update cluster node information periodically
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL: {}", node.getServiceUrl()); }}// Create PeerEurekaNode information using the URL
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
            targetHost = "host";
        }
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }
Copy the code

Five, the summary

By analyzing Eureka Server source code, we can see how Eureka is integrated into Spring, we can see the Eureka Server initialization process and a series of configuration, next time, I will analyze Eureka Client source code. Let’s take a look at how Eureka implements AP and its own features.