RestTemplate and @loadBalanced SpringCloud Config and @refreshScope SpringCloud Zuul implementation principle SpringCloud source analysis – SpringCloud Sleuth principle exploration SpringCloud source analysis – Eureka principle exploration

This article shares the implementation principle of Eureka by reading Eureka source code. This article mainly combs Eureka overall design and implementation, does not list Eureka source code details one by one.

Source code analysis is based on Spring Cloud Hoxton, Eureka version 1.9

Eureka is divided into Eureka Client and Eureka Server. Multiple Eureka Server nodes form a Eureka cluster. Services are registered with Eureka Server through Eureka Client.

CAP theory states that A distributed system cannot satisfy C(consistency), A(availability), and P(partition fault tolerance) at the same time. Because fault tolerance of partitions must be guaranteed in distributed systems, the tradeoff is between A and C. Zookeeper guarantees CP, while Eureka guarantees AP. Why is that? In a registry scenario, availability is more important than consistency. As a registry, data does not change very often, only when the service is released, the machine goes online, and the service expands or shrinks. Therefore, Eureka chooses AP to return old data even if there is a problem, ensuring that the service can be invoked (to the maximum extent possible) and avoiding the outweighed benefit of service unavailability due to registry problems. Therefore, all Eureka nodes are equal (decentralized architecture, no master/slave differentiation). The failed nodes will not affect the work of normal nodes, and the remaining nodes can still provide registration and query services.

Eureka Client

Eureka 1.9 simply introduces the spring-cloud-starter-Netflix-Eureka-client dependency, even without the @enableDiscoveryClient or @enableeurekaclient annotation, Services are also registered with the Eureka cluster.

The client logic mainly in com.net flix. Discovery. DiscoveryClient implementation, build its subclasses CloudEurekaClient EurekaClientAutoConfiguration.

Timing task

The DiscoveryClient#initScheduledTasks method sets scheduled tasks, including CacheRefreshThread, HeartbeatThread, and InstanceInfoReplicator.

synchronous

The service registration information is cached in the DiscoveryClient#localRegionApps variable. CacheRefreshThread periodically reads the latest service registration information from the Eureka Server and updates it to the local cache. CacheRefreshThread -> DiscoveryClient#refreshRegistry -> DiscoveryClient#fetchRegistry If multiple Eureka Server nodes exist, Client will and eureka. Client. ServiceUrl. DefaultZone configuration of the first data synchronization Server node, when the first synchronization Server node failure, will synchronize the second node, and so on.

As you can see from DiscoveryClient#fetchRegistry, there are two methods for synchronizing data. (1) Full synchronization is implemented by the DiscoveryClient#getAndStoreFullRegistry method, Replace DiscoveryClient#localRegionApps with DiscoveryClient#localRegionApps The EurekaHttpClient interface initiates a Client request to the Server. Through the interface implementation class EurekaHttpClientDecorator RequestExecutor interface will be asked to entrust other EurekaHttpClient implementation class, Execute methods are provided for subclasses to implement extension processing (this extension processing can be applied to each EurekaHttpClient method, like AOP). Subclasses RetryableEurekaHttpClient# execute, get a eureka. Client. The service – url. DefaultZone configured in address, through TransportClientFactory# newClient, To construct a RestTemplateTransportClientFactory, an actual request again.

(2) Incremental synchronization is implemented by the DiscoveryClient#getAndUpdateDelta method, which calls the Server interface apps/delta through Http Get to obtain the latest ADDED, MODIFIED, DELETED operations and update the local cache. If the latest operation fails, a full synchronization is initiated.

The default value is true. Eureka.client.registrie-fetch -interval-seconds. The default value is 30

The heartbeat

HeartbeatThread -> DiscoveryClient#renew -> EurekaHttpClient#sendHeartBeat via Http Put Calls the Server interface apps/{appName}/{instanceId} appName is the spring.application.name of the service, and instanceId is the service IP address and service port.

Note: If the Server returns the NOT_FOUND status, re-register.

Configuration: Register-with-eureka: indicates whether the current application is registered with the Eureka cluster. The default value is true. How many seconds does it take to send a heartbeat? 30 by default

registered

The DiscoveryClient# constructor -> DiscoveryClient# Register invokes the Server interface apps/{appName} via Http Post to send the registration information of the current application to the Server. Configuration: Register-with-eureka: Indicates whether the current application is registered with the Eureka cluster. The default value is true. Eureka. Whether to register at initialization. Default is false

InstanceInfoReplicator

The InstanceInfoReplicator task monitors whether the IP and configuration information of the application has changed, and if so, re-registers the application. The configuration is as follows: eureka.client.initial-instance-info-replication-interval-seconds: Specifies the number of seconds at which self-information is checked. The default value is 40

offline

The destroy method CloudEurekaClient EurekaClientAutoConfiguration configuration

@Bean(destroyMethod = "shutdown")
Copy the code

The DiscoveryClient#shutdown method completes the offline processing, including canceling scheduled tasks, invoking the unregister method (calling the Server interface apps/{appName}/{ID} through Http Delete), and canceling monitoring tasks

Eureka Server

@ EnableEurekaServer introduce EurekaServerMarkerConfiguration, EurekaServerMarkerConfiguration build EurekaServerMarkerConfiguration. Marker. Can exist in the Spring context EurekaServerMarkerConfiguration EurekaServerAutoConfiguration shall come into force upon the Marker, construct the Server side components.

The Eureka Server also uses DiscoveryClient to pull service registration information from other Server nodes or register itself with the Eureka cluster.

Start the synchronization

When a Server is started, it obtains service registration information from neighboring Server nodes and synchronizes the information to its own memory.

The Server’s service registration information is stored in the AbstractInstanceRegistry#registry variable of type ConcurrentHashMap

>>. The outer Map Key is appName, the outer Map Key is instanceId, and the Lease represents a contract maintained between the Client and Server. InstanceInfo Stores specific service registration information, such as instanceId, appName, ipAddr, and port.
,>

EurekaServerBootstrap is Server side start bootstrap class, EurekaServerInitializerConfiguration implements the Lifecycle interface, Start method invocation eurekaServerBootstrap. ContextInitialized initialization complete Server end. eurekaServerBootstrap.contextInitialized -> EurekaServerBootstrap#initEurekaServerContext -> PeerAwareInstanceRegistryImpl#syncUp -> AbstractInstanceRegistry#register PeerAwareInstanceRegistryImpl# syncUp call DiscoveryClient# getApplications method, get all the services of adjacent server node registration information, Call the AbstractInstanceRegistry#register method and register it in the AbstractInstanceRegistry#registry variable.

AbstractInstanceRegistry#register

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); . // #1 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); if (existingLease ! = null && (existingLease.getHolder() ! = null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); . // #2 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { registrant = existingLease.getHolder(); } } else { synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; // #3 updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease ! = null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } // #4 gMap.put(registrant.getId(), lease); . registrant.setActionType(ActionType.ADDED); // #5 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); }}Copy the code

If the service already has a Lease and the value of LastDirtyTimestamp is larger, use the Lease. Update numberOfRenewsPerMinThreshold # 3, the value is used for self protection mode. #4 build a new Lease and add it to the AbstractInstanceRegistry#registry cache. #5 Add recentlyChangedQueue, apps/delta interface to get the latest change operation.

To provide services

Server through ApplicationsResource ApplicationResource/InstanceResource provides the Http service.

AbstractInstanceRegistry is responsible for implementing the business logic for operations such as cancle, register, Renew, statusUpdate, deleteStatusOverride, etc. PeerAwareInstanceRegistryImpl replicateToPeers way synchronous operation to other nodes, to ensure that the data synchronization cluster nodes. PeerAwareInstanceRegistryImpl# replicateToPeers method last parameter isReplication, decide whether to need to synchronize. If a Server node receives synchronization operations from other Server nodes, it does not need to continue synchronization to other servers. Otherwise, cyclic updates will occur. This parameter is determined by the Http Requst Header parameter x-Netflix-discovery-replication (this parameter is true only for requests sent by clients).

The data is consistent

PeerAwareInstanceRegistryImpl# replicateToPeers method through PeerEurekaNodes# getPeerEurekaNodes access other server node address, The peerEurekaNodes #peerEurekaNodes variable maintains all Server node information.

PeerEurekaNodes The peersUpdateTask task periodically obtains the latest Server node address list from the DNS or configuration file and updates PeerEurekaNodes# PeerEurekaNodes. Peer -eureka-nodes-update-interval-ms: Specifies the number of minutes at which the server node address list is pulled up. The default value is 10

PeerEurekaNode Manages a Server node and synchronizes register, Cancel, and heartbeat operations to the Server node. PeerEurekaNode synchronizes these operations in scheduled task mode. It maintains two TaskDispatchers, batchingDispatcher and nonBatchingDispatcher. PeerEurekaNode# constructor calls TaskDispatchers# createBatchingTaskDispatcher TaskDispatcher construction

public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor<T> taskProcessor) { final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher<ID, T>() { public void process(ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); }}; }Copy the code

The TaskDispatcher dispatches tasks. Expired tasks are discarded. If two tasks have the same ID, the previous task is deleted. AcceptorExecutor is responsible for consolidating tasks and putting them into batches. TaskExecutors distribute the completed tasks to TaskProcessor. ReplicationTaskProcessor actually handles the tasks. ReplicationTaskProcessor can execute failed tasks repeatedly. ReplicationTaskProcessor# Process (List

Tasks) handles batch tasks, combining tasks into a single request. The packet is sent to the downstream Server interface PeerReplication/Batch /. The task class is ReplicationTask, which provides a handleFailure method that is invoked when the downstream Server interface returns a statusCode not in the [200,300] range.

If the downstream Server interface returns 503 status or an I/O exception occurs, the task will be reexecuted using TaskDispatcher. reprocess. To ensure final consistency. If other exceptions occur, only logs are displayed and the task is not executed repeatedly.

Configuration: eureka.server.max-elements-in-peer-replication-pool: specifies the maximum number of tasks to be executed. The default value is 10000

Note the PeerEurekaNode#heartbeat method, which implements the handleFailure method

public void handleFailure(int statusCode, Object responseEntity) throws Throwable { super.handleFailure(statusCode, responseEntity); if (statusCode == 404) { logger.warn("{}: missing entry.", getTaskName()); if (info ! = null) { logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}", getTaskName(), info.getId(), info.getStatus()); register(info); }}... }Copy the code

If the downstream server node does not find the service registration information, it returns a 404 status and needs to re-register the service. This is important to ensure data consistency across Server nodes.

Suppose you have a client registered to the Eureka cluster server1,server2,server3. Let’s examine two scenarios. Scenario 1. During client startup, Server1 receives registration information with client but breaks down before it is synchronized to Server2. What can I do? In this case, the client periodically initiates heartbeat communication. However, the heartbeat communication between the client and server1 fails. Server2 returns 404 (NOT_FOUND status) and the client registers again.

Scenario 2. A network partition exists between Server3 and other machines server1 and Server2, and the client registers with the Eureka cluster. Then the network is restored. How does Server3 synchronize data? When server1 synchronizes heartbeat messages with server3, server3 returns 404. Server1 re-registers client information with Server3, and the data is consistent.

Take the initiative to failure

The AbstractInstanceRegistry#deltaRetentionTimer task periodically removes expired delts from recentlyChangedQueue Eureka. server.delta-retention-timer-interval-in-ms: How many seconds does it take to clear expired incremental operations? Default: 30 eureka.server.retention-time-in-m-s-in-delta-queue; default: 3

The AbstractInstanceRegistry#evictionTimer task periodically removes service registrations in the AbstractInstanceRegistry#registry that have expired. Compensation time will be added when calculating service failure time, that is, the time difference between this task execution and the last task execution will be calculated, if the time difference exceeds the eviction- interval-timer-in-MS configuration value, the compensation time will be added. There is an upper limit for the number of services to be removed each time, namely, the number of registered services *renewal- Percent-threshold. Eureka will randomly remove expired services. Configuration: Eureka. server. Eviction -interval-timer-in-ms, the iction iction is designed to clean expired services at an interval of 60 eureka.instance.lease-expiration-duration-in-seconds, The default value is 90 eureka.server.renewal- Percent-threshold. The default value is 0.85

Self-protection mechanism

PeerAwareInstanceRegistryImpl# scheduleRenewalThresholdUpdateTask, regularly updated numberOfRenewsPerMinThreshold, this value is used to determine whether enter the ego to protect mode, In self-protected mode, the AbstractInstanceRegistry#evictionTimer task returns directly without removing expired services.

NumberOfRenewsPerMinThreshold calculation in PeerAwareInstanceRegistryImpl# updateRenewsPerMinThreshold

protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int) (this expectedNumberOfClientsSendingRenews * (60.0 / serverConfig. GetExpectedClientRenewalIntervalSeconds ()) * serverConfig.getRenewalPercentThreshold()); }Copy the code

ExpectedNumberOfClientsSendingRenews – > 60.0 total registered service/serverConfig getExpectedClientRenewalIntervalSeconds () – > Prior-client-renewal -interval-seconds Specifies the heartbeat communication interval for a client. This parameter counts the heartbeat communication interval for a client every minute. RenewalPercentThreshold Self-protection threshold factor. As you can see, the Server receives a heartbeat numberOfRenewsPerMinThreshold said a minute minimum number, the actual amount less than the value into the model of self protection. At this point, Eureka considers that there is a network failure between the client and the registry (such as a network failure or frequent startup and shutdown of the client) and does not remove any services. It will wait for the network failure to recover before exiting the self-protection mode. In this way, services can be invoked normally to the maximum extent.

PeerAwareInstanceRegistryImpl# isLeaseExpirationEnabled () method determines if the current in the ego to protect mode. The method of comparative renewsLastMin value is greater than numberOfRenewsPerMinThreshold, AbstractInstanceRegistry# renewsLastMin statistics within a minute the heartbeat. Configuration: Enable -self-preservation: Whether to enable the self-preservation mechanism. The default value is true. Eureka.server. The number of seconds at which the Client sends heartbeat messages eureka.server.renewal- Percent-threshold, the self-protection threshold factor, 0.85 by default

Status updates

InstanceInfo maintains the status variable status and the overriddenStatus variable. Status Is the status published by Eureka Client. Overriddenstatus Is a status that is enforced manually or through a tool. The Server provides service apps/{appName}/{instanceId}/status. You can change the service instance status and overriddenStatus to actively change the service status. Note that the service state on the Client side is not changed, but the service state saved in the Server segment service registry. The Server overrides status with overriddenStatus when handling Client registration or heartbeat. When Eureka Client obtains registration information, it calls DiscoveryClient#shuffleInstances to filter out service instances that are not InstanceStatus. Without shutting down the service instance.

InstanceInfo also maintains the lastDirtyTimestamp variable, which represents when the service registration information was last updated. As you can see from InstanceResource, lastDirtyTimestamp can be provided when statusUpdate or deleteStatusUpdate is updated, and the renewLease methods that handle heartbeats, You must have the lastDirtyTimestamp parameter, and the validateDirtyTimestamp method validates the lastDirtyTimestamp parameter

  1. If lastDirtyTimestamp is equal to lastDirtyTimestamp in the current registration information, the processing succeeds.
  2. If the value of lastDirtyTimestamp is greater than lastDirtyTimestamp in the current registration information, the NOT_FOUND state is returned, indicating that the Client information has expired and needs to be registered again.
  3. If the value of lastDirtyTimestamp is smaller than the value of lastDirtyTimestamp in the current registration information, the CONFLICT (409) state is returned, indicating a data CONFLICT, and the registration information of the service in the current node is returned.

If the heartbeat is initiated by the Client, the Client ignores the return status of 409 (DiscoveryClient#renew). However, if the heartbeat is synchronized from other Server nodes, The Server node that sends the heartbeat uses the returned service registration information to update the registration information of the node (PeerEurekaNode#heartbeat).

Configuration: Filter-only – upinstances; default: true eureka.server. sync-timestamp-time-varying (‘ pegasus’); Whether to synchronize data. The default value is true

This is the end of Eureka, we can design and implement Eureka is relatively simple, but very practical. I hesitated for a while before reading the Eureka source code (after all, Eureka 2.0 open source failed), but after a period of in-depth study, I have learned a lot. I hope this article can also provide students who are interested in Eureka with an in-depth study idea.

If you think this article is good, welcome to pay attention to my wechat public number, your attention is my motivation!