From the previous study, we know that Eureka-client proactively reports its communication address to Eureka-server by invoking the interface registered by the service.

Now the Eureka-server can obtain all eureka-clients’ communication addresses. How does the Eureka-client obtain other Eureka-clients’ communication addresses? Here’s the reveal.

How is eureka-client initialized

Let’s take a look at what happens when eureka-client is initialized. If you remember, one of the steps in the eureka-Server initialization process is to create a eurekaClient object.

Why do eurekaClient objects need to be created during eureka-server initialization?

If our Eureka-Server is deployed on only one server, and only one Eureka-Server is deployed, then of course there is no need to create eurekaClient. But once you need to deploy eureka-Server as a cluster, things get more complicated.

Because eureka servers need to register with each other in the cluster, each Eureka server is also a Eureka client.

Here is just a brief mention, specific content, later explain eureka-server cluster related knowledge will be supplemented. Here we just use the eureka-Server startup to see the eurekaClient initialization logic.

First post a small piece of code to see, you have a general impression in your mind, and then draw a flow chart to explain.

EurekaClient if (eurekaClient == null) {EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); ApplicationInfoManager = new applicationInfoManager (instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); // Create eurekaClient eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); }Copy the code

Flow chart:

InstanceInfo, as mentioned in the previous chapter, contains the service IP, port number, service name, instance ID, and so on. When a service is registered, Eureka-Client sends the InstanceInfo information to eureka-Server.

Now you know where InstanceInfo was created: based on our own configuration file + some default configuration.

How does eureka-Client get the registry

This section describes how to create a DiscoveryClient object. During the creation of this object, Eureka-Client sends an HTTP request to Eureka-Server to request all registry information.

As usual, here is the source code sketch created by DiscoveryClient.

The main logic for pulling the registry is posted in the getAndStoreFullRegistry() method.

private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications apps = null; EurekaHttpResponse<Applications> httpResponse =... ; If (httpResponse. GetStatusCode () = = Status. OK. GetStatusCode ()) {/ / of the result of the request, access to services list apps = httpResponse. GetEntity (); } if (apps == null) {logger.error("... ); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, CurrentUpdateGeneration + 1)) {localRegionApps.set(this.filterandShuffle (apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else {logger.warn(" slightly...") ); }}Copy the code

The list of services is saved in AtomicReference

localRegionApps, and we’ll look at the contents of Applications via breakpoints later.

Before we do that, let’s take a look at what logic eureka-server does when it receives an HTTP request from a client. You might be thinking: What’s so hard about that? Can’t you just return the registry data? Hold your breath, let’s take a look at the source code to verify your guess is correct.

How does eureka-Server return the registry

Eureka – server receives an HTTP request code in eureka – ApplicationsResource under the core engineering. In Java, the directory structure is as follows:

The method of returning the list of services is getContainers(the parameters are omitted). Let’s take a look at the main logic inside the method through a simple code diagram.

In the processing logic of Eureka-Server, a set of multi-level caching mechanism is used. When the service list is returned, it is not directly returned to the registry, but first read from the read-only cache, if there is no cache data, then read from the read/write cache, if there is no read/write cache, then read from the registry.

Read-only cache: ConcurrentMap

readOnlyCacheMap
,>

Read and write cache: LoadingCache < Key, Value > readWriteCacheMap, based on mon. Com.google.com cache. LoadingCache implementation.

The cache reading process is shown as follows:

Now that caching is used, a new question arises: when do cached data expire?

1. CreatereadWriteCacheMapWhen specified, 180s automatically expire. (Timed expiration)

public class ResponseCacheImpl implements ResponseCache { private final LoadingCache<Key, Value> readWriteCacheMap; This.readwritecachemap = cacheBuilder.newBuilder ().initialCapacity(1000) Specified for - SECONDS. ExpireAfterWrite (serverConfig. GetResponseCacheAutoExpirationInSeconds (), TimeUnit. SECONDS)}Copy the code

Time allocation in DefaultEurekaServerConfig. In Java:

public class DefaultEurekaServerConfig implements EurekaServerConfig { @Override public long GetResponseCacheAutoExpirationInSeconds () {/ / the default configuration 180 return configInstance. GetIntProperty (namespace + "responseCacheAutoExpirationInSeconds", 180).get(); }}Copy the code

2. When a new service is registered, the cache will expire. (Active expiration)

Content in front of the, we explain the service registry, is actually called the eureka – server AbstractInstanceRegistry. Java register () method, then we ignore some details, including expiration cache it for a short period. Now let’s take a look.

Public abstract class AbstractInstanceRegistry implements InstanceRegistry {public void implements InstanceRegistry. {// Invoke the internal invalidateCache() method invalidateCache(registrant.getAppName(), registrant.getvipAddress (), registrant.getSecureVipAddress()); } private void invalidateCache {// call responseCache. Invalidate (appName, vipAddress, secureVipAddress); }}Copy the code

As you can see from invalidateCache(), we actually use the invalidate() method defined internally in responsecacheimpl.java.

public void invalidate(Key... {keys) for (Key Key: keys) {/ / cache the framework used to own removal method, clear the cache readWriteCacheMap. Invalidate (Key); / / a little... }}Copy the code

3.ResponseCacheImpl.javaA scheduled task is started at creation time and the cache expires every 30 seconds. (Passive expiration)

Public class ResponseCacheImpl implements ResponseCache {// Implements ResponseCacheImpl; {/ / boot read-only cache, the default true if (shouldUseReadOnlyResponseCache) {/ / to perform a specific task: GetCacheUpdateTask () timer.schedule(getCacheUpdateTask(), The new Date of task for the first time (((System. CurrentTimeMillis ()/responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + ResponseCacheUpdateIntervalMs), / / how many seconds after every, Repeat this task / / configuration in config: "responseCacheUpdateIntervalMs responseCacheUpdateIntervalMs (30 * 1000)); Private TimerTask getCacheUpdateTask() {return new TimerTask() {@override public void run() { / / all the cache in the traversal readOnlyCacheMap for (Key Key: readOnlyCacheMap keySet ()) {CurrentRequestVersion. Set (Key) getVersion ()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); If (cacheValue!) {// If (cacheValue!) {if (cacheValue! = currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); }}}}; }}Copy the code

To sum up:

Finally, let’s take a look at what the registry data returned by Eureka-Server looks like.

What Applications might look like.

Now we are halfway through the eureka-client registry retrieval mechanism.

Why say half? Hold your breath and keep looking back.

Now that eureka-Client is able to retrieve the registry, it seems like there’s no problem, but if we think a little bit more, we’ll see where the problem is. For example, once a new service is registered with eureka-server, how does the Eureka-client that has previously acquired the registry synchronize the new data?

How does eureka-Client update the registry

In order to obtain the changed registry on eureka-server, a scheduled task is started during eureka-Client initialization to request the changed registry data from Eureka-server every 30 seconds.

InitScheduledTasks () initScheduledTasks() initScheduledTasks()

InitScheduledTasks () contains multiple scheduled tasks. For the moment, we only focus on the scheduled tasks for refreshing the registry. The rest will be used later.

The main logic code is as follows:

@Singleton public class DiscoveryClient implements EurekaClient { private final ScheduledExecutorService scheduler; Private void initScheduledTasks() {//... Scheduler. schedule(New TimedSupervisorTask("cacheRefresh", Scheduler, cacheRefreshExecutor, // How often the scheduler executes the scheduled task Default (30) was obtained from the default configuration registryFetchIntervalSeconds, / / time interval unit TimeUnit. SECONDS, expBackOffBound, Specific tasks / / logic new CacheRefreshThread ()), registryFetchIntervalSeconds, TimeUnit. SECONDS); }}Copy the code

Here we first take a look at eureka – where the client’s default configuration is defined in, is defined in DefaultEurekaClientConfig. In Java:

public class DefaultEurekaClientConfig implements EurekaClientConfig { @Override public int GetRegistryFetchIntervalSeconds () {/ / the default configuration: . How long is the interval timer return configInstance getIntProperty (namespace + REGISTRY_REFRESH_INTERVAL_KEY, 30). The get (); / / configuration key definitions: PropertyBasedClientConfigConstants. Java / / String REGISTRY_REFRESH_INTERVAL_KEY = ". The client. The refresh interval "; }}Copy the code

Having taken a look at the default configuration, let’s move on to the scheduled task of refreshing the registry.

Because the method calls inside the code are too complex, I will not post the source code here and use the code diagram instead.

The main method is getAndUpdateDelta(), post the source code and have a look:

private void getAndUpdateDelta(Applications applications) Applications delta = null; / / send the request for incremental registry EurekaHttpResponse < TAB > httpResponse. = eurekaTransport queryClient. getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) {getAndStoreFullRegistry(); } else if (slightly)... {// Merge the obtained registry data with local updateDelta(delta); // The combined results generate a HashCode(analyzed here later) String reconcileHashCode = getReconcileHashCode(applications); // Compare the HashCode returned by the HTTP request with the newly generated HashCode if (! ReconcileHashCode. Equals (delta. GetAppsHashCode () | | clientConfig. ShouldLogDeltaDiff ()) {/ / inconsistent results contrast, To pull the whole amount reconcileAndLogDifference registry (delta, reconcileHashCode); } }else{ //log.err(..) ; })Copy the code

As you can see from the source code, eureka-Client performs a local merge and validation process after getting the data. Let’s look at this section.

Registry data merge: After eureka-client obtains the registry data, it determines the change type of the service instance based on an ActionType. That is, it determines whether the service instance needs to be added to the local registry, deleted from the local registry, or updated information about a local service instance.

Take a look at the source code and understand it later with a flow chart:

Private void updateDelta(Applications delta) {for (Application app: delta.getRegisteredApplications()) { for (InstanceInfo instance : App.getinstances ()) {if (actionType.added. Equals (instance.getActionType())) {// ADDED applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType) MODIFIED) equals (instance. GetActionType ())) {/ / modify applications. * * *. AddInstance (instance); } else if (ActionType. Does. Equals (instance. GetActionType ())) {/ / remove applications. * * * in this (instance); }}}}Copy the code

Data verification: After the Eureka-server updates the local registry, it hashes the local registry information to obtain a hash value. In addition, the Eureka-server returns data with a hash value.

Theoretically, after a round of updates, the registry data in Eureka-server and Eureka-client are exactly the same, so the hash results should be the same.

If no, it indicates that data synchronization between Eureka-client and Eureka-server is faulty. In this case, Eureka-Client requests all registry data from Eureka-server again. Then, the newly obtained data overwrites the local registry data to ensure that the data is consistent with that of eureka-server.

Combined with the flow chart, understand the merge and data verification process:

How the eureka-client periodically sends HTTP requests, and how the data is merged and validated.

How does the eureka-server interface work? Is the data still returned through multi-level caching?

How is eureka-Server incremental registry data maintained

Eureka-server still returns incremental registry data through multi-level caching mechanism. However, since registry information is constantly changing, Eureka-Server does not repeatedly return all registry information. Here eureka-Server uses a Queue to record the changed portion of the registry.

Each time a service is registered or a service is taken offline, the changed registry information is sent to a recentlyChangedQueue, and the data is fetched directly from the queue when needed.

Understand with flow chart:

RecentlyChangedQueue defined in AbstractInstanceRegistry. In Java, if you have impression, can remember the front on behalf of the registry ConcurrentHashMap is also defined here.

Let’s take a quick look at what the recentlyChangedQueue looks like:

Public abstract class AbstractInstanceRegistry implements InstanceRegistry {// Implements InstanceRegistry private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>(); Private static final Class RecentlyChangedItem {private long lastUpdateTime; private Lease<InstanceInfo> leaseInfo; }}Copy the code

Lease and InstanceInfo represent Lease information and service instance information respectively, as described earlier.

Let’s look at the logic that readWriteCacheMap uses to retrieve data from the Queue.

public class ResponseCacheImpl implements ResponseCache { private final AbstractInstanceRegistry registry; //readWriteCacheMap does not obtain data according to the specified key. Private Value generatePayload(Key Key) {// Get all registry data (all registry data) if (all_apps.equals (key.getName())) {skip.. Else if(all_apps_delta.equals (key.getName())){payload = getPayLoad(key, / / call AbstractInstanceRegistry. Java / / getApplicationDeltasFromMultipleRegions () method to obtain the queue data registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); }}}Copy the code

Think about it, what’s the new problem with using queues to store data here? Data in the recentlyChangedQueue increases over time, causing the Eureka-Client to obtain duplicate data for multiple timed fetches.

There is no need to repeatedly acquire data that has been acquired before. After eureka-client obtains incremental registry data, it also needs to perform some local merge and verification work. As data increases, the efficiency of transferring, merging, and verifying data from the network decreases.

So Eureka-Server starts a scheduled task that cleans up the recentlyChangedQueue every 30 seconds to make sure that the data in the Queue is a service instance that has changed within 180 seconds.

Data flow chart of clearing queues for scheduled tasks:

Post a short snippet of simplified source code:

Public abstract class AbstractInstanceRegistry implements InstanceRegistry {// constructor protected Slightly AbstractInstanceRegistry (..) {/ / open timing task this. DeltaRetentionTimer. The schedule (getDeltaRetentionTask (), / / the default configuration 30 s serverConfig. GetDeltaRetentionTimerIntervalInMs (), serverConfig. GetDeltaRetentionTimerIntervalInMs ()); } private TimerTask getDeltaRetentionTask() {return new TimerTask() {@override public void run() { / / traverse the queue data Iterator < RecentlyChangedItem > it = recentlyChangedQueue. The Iterator (); While (it.hasnext ()) { If (it.next().getLastupdateTime () < system.currentTimemillis () - // Default 180s serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; }}}}; }}Copy the code

At this point, the entire core process of service registration and service discovery is fully understood. In the next article, we will look at how Eureka’s heartbeat mechanism is implemented.