Series of articles:

SpringCloud source series (1) – registry initialization for Eureka

SpringCloud source code series (2) – Registry Eureka service registration, renewal

SpringCloud source code series (3) – Registry Eureka crawl registry

SpringCloud source code series (4) – Registry Eureka service offline, failure, self-protection mechanism

Eureka Server cluster

In a real production environment, where there might be dozens or hundreds of microservice instances, Eureka Server takes on a very high load and is typically deployed in clusters to ensure high availability of registries. Here’s a look at Eureka Server clusters.

Set up a Eureka Server cluster

Let’s set up a three-node Eureka-server cluster to see the effect.

1. Cluster configuration

Configure the following mappings in the local hosts file:

127.0.0.1 peer1
127.0.0.1 peer2
127.0.0.1 peer3
Copy the code

Change the application. Yml profile of the registry to add three profiles, each corresponding to three client configurations of Eureka-Server.

To act as a client of eureka-server in a cluster, you need to capture the registry and configure the ADDRESS of Eureka-server.

spring:
  application:
    name: sunny-register

---
spring:
  profiles: peer1
server:
  port: 8001

eureka:
  instance:
    hostname: peer1
  client:
    # Whether to register yourself with the registry
    register-with-eureka: false
    Whether to fetch the registry
    fetch-registry: true
    service-url:
      defaultZone: http://peer1:8001/eureka,http://peer2:8002/eureka,http://peer3:8003/eureka


---
spring:
  profiles: peer2
server:
  port: 8002

eureka:
  instance:
    hostname: peer2
  client:
    # Whether to register yourself with the registry
    register-with-eureka: false
    Whether to fetch the registry
    fetch-registry: true
    service-url:
      defaultZone: http://peer1:8001/eureka,http://peer2:8002/eureka,http://peer3:8003/eureka

---
spring:
  profiles: peer3
server:
  port: 8003

eureka:
  instance:
    hostname: peer3
  client:
    # Whether to register yourself with the registry
    register-with-eureka: false
    Whether to fetch the registry
    fetch-registry: true
    service-url:
      defaultZone: http://peer1:8001/eureka,http://peer2:8002/eureka,http://peer3:8003/eureka
Copy the code

2. Start the cluster

Each of the three registries is started, and the environment variable spring.profiles. Active activates the corresponding cluster configuration.

After startup, visit http://peer1:8001/ to enter the peer1 registry, and you can see the other two shards peer2 and peer3, indicating that there are three nodes in the cluster.

3. Start the client

First the client configuration adds the cluster address:

eureka:
  client:
    serviceUrl:
      defaultZone: http://peer1:8001/eureka,http://peer2:8002/eureka,http://peer3:8003/eureka
Copy the code

Start several client instances and after a while, you will find that all three eureka-servers are registered:

At this point, the Eureka-Server cluster is set up, and you can see that the instances of the registry are synchronized with each other and can receive requests for registration, renewal, and referral every other time. They are peer.

Eureka Server cluster architecture

Generally speaking, data replication in a distributed system can be divided into master-slave replication and peer-to-peer replication.

1. Master/slave replication

Master-slave replication is the master-slave mode, that is, one Master copy and all other copies are Slave copies. All writes to the data are committed to the master copy and then synchronized from the master copy to the slave copy.

In the master-slave replication mode, the primary copy is the bottleneck of the entire system, while the secondary copy helps the primary copy share read requests.

2. Peer replication

Peer replication is the peer-to-peer mode. Any copy can receive write operations regardless of the primary and secondary copies. Each copy updates and synchronizes data with each other.

Peer to Peer mode Each copy can receive write requests and there is no write operation pressure bottleneck. However, since each copy can be written, data synchronization and conflict handling between copies is a thorny problem.

3. Eureka Server cluster architecture

Eureka Server uses peer-to-peer replication. For example, a client instance registers randomly with one of the servers, and then it is synchronized to other nodes.

Eureka Server starts by fetching the registry

As previously analyzed, when Eureka Server starts initialization, that is, EurekaBootStrap initializes the DiscoveryClient class, DiscoveryClient will fully capture the registry from the registry to the local.

The initialization ends with a call to registry.syncup () to synchronize the registry, which registers the DiscoveryClient cached instance into the Eureka-Server registry.

Note that the default registry synchronization retry times configured by Eureka is 5, and the default registry synchronization retry times configured by SpringCloud is 0. Therefore, you need to add the following configuration to enable registry synchronization.

eureka:
  server:
    registry-sync-retries: 5
Copy the code

To register an instance that is local to DiscoveryClient into the registry:

Cluster Node Synchronization

1. Registration, renewal and referral

As previously analyzed, client registration, renewal, and offline are synchronized to other nodes in the cluster. As you can see, the replicateToPeers method is called to replicate to other clusters.

/ / / / / / / / / / / / / / / / / / / / / / / registration / / / / / / / / / / / / / / / / / / / / / / /
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    // If there is no cycle configuration in the instance, set the default to 90 seconds
    if(info.getLeaseInfo() ! =null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // Register an instance
    super.register(info, leaseDuration, isReplication);
    // Copy to other server nodes in the cluster
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

/ / / / / / / / / / / / / / / / / / / / / / / offline / / / / / / / / / / / / / / / / / / / / / / /
public boolean cancel(final String appName, final String id, final boolean isReplication) {
	// Get offline
    if (super.cancel(appName, id, isReplication)) {
    	// Copy to other server nodes in the cluster
        replicateToPeers(Action.Cancel, appName, id, null.null, isReplication);
        return true;
    }
    return false;
}

/ / / / / / / / / / / / / / / / / / / / / / / renewal / / / / / / / / / / / / / / / / / / / / / / /
public boolean renew(final String appName, final String id, final boolean isReplication) {
    AbstractInstanceRegistry (AbstractInstanceRegistry)
    if (super.renew(appName, id, isReplication)) {
        // Synchronize the contract to other nodes in the cluster
        replicateToPeers(Action.Heartbeat, appName, id, null.null, isReplication);
        return true;
    }
    return false;
}
Copy the code

2. Synchronize to other nodes

Take a look at the replicateToPeers method:

  • In the first place to judgeisReplicationIn the cluster replication operation, the number of replication operations performed in the last minutenumberOfReplicationsLastMin + 1. IsReplication is specified in the request header, which isPeerEurekaNode. HEADER_REPLICATION (x - netflix - discovery - the replication).
  • It then iterates through the cluster list and copies the instance operation to the cluster node. And as I’ve analyzed before,PeerEurekaNodeIt represents a Eureka-server,PeerEurekaNodesThis represents the Eureka-Server cluster.
  • A way to copy instance operations to a clusterreplicateInstanceActionsToPeersInvoke methods corresponding to PeerEurekaNode in the cluster based on different operation types to perform operation replication.
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */.boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            // If it is a registration request from another server node, the number of cluster synchronizations in the last minute +1
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        // If it is a registration request from the client, it is synchronized to the other server nodes in the cluster
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
			// Synchronize to other nodes in the clusterreplicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); }}finally{ tracer.stop(); }}private void replicateInstanceActionsToPeers(
		Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node {
    try {
        InstanceInfo infoFromRegistry;
        switch (action) {
            case Cancel:
                / / offline
                node.cancel(appName, id);
                break;
            case Heartbeat:
                / / contract
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                / / register
                node.register(info);
                break;
            case StatusUpdate:
            	// Update the status
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
            	// Delete the overwrite state
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break; }}catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); }}Copy the code

3, isReplication

The component that PeerEurekaNode communicates with Eureka server is JerseyReplicationClient. This class overwrites the addExtraHeaders method. Add request header PeerEureanode.header_replication, set to true.

In this way, when other Eureka-servers receive the replication operation, they know that it is a synchronization operation from the cluster node and do not synchronize it to other nodes, thus avoiding an infinite loop.

@Override
protected void addExtraHeaders(Builder webResource) {
    webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
}
Copy the code

Cluster synchronization mechanism

The synchronization mechanism between Eureka Server clusters is quite complicated. Imagine that Eureka Server immediately synchronizes to other Server nodes in the cluster every time a client request comes in, such as registration and heartbeat. Therefore, the peer-to-peer mode of Eureka-server cannot share the write operation pressure of clients, which means that each Eureka-server receives the same number of requests. In order to avoid this situation, Eureka Server adopts three-layer queue and batch task to synchronize between clusters. In simple terms, client operations are queued, a batch of operations are fetched from the queue, and the batch of operations are sent to other Server nodes, which receive the batch of operations and parse them locally. Here’s how to do it in detail.

Cluster node PeerEurekaNode

During the initialization of Eureka-Server, EurekaBootStrap initializes PeerEurekaNodes representing the cluster, which constructs PeerEurekaNode according to the configured registry address. The PeerEurekaNode is the core component for synchronizing between clusters. Let’s use client registration as an example to see how synchronization works.

1. Register synchronization

ReplicateInstanceActionsToPeers call PeerEurekaNode register method to synchronous register operation to the cluster.

Node. The register method:

  • You can see that the expiration time is calculated first, which isCurrent time + Lease interval (default 90 seconds)
  • And then calledbatchingDispatcherBatch task dispatcher to handle tasks, submitted oneInstanceReplicationTaskInstance of theexecuteMethod is calledreplicationClientRegister synchronization with this server.
public void register(final InstanceInfo info) throws Exception {
    // Expiration time: current time + lease time (default 90 seconds)
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null.true) {
                public EurekaHttpResponse<Void> execute(a) {
                	// Synchronize to Server
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}
Copy the code

If you look at the getLeaseRenewalOf method, it’s buggy, it returns the number of milliseconds, and you can see that the else part of the statement is multiplied by 1000, and the if part is not multiplied by 90, But info.getleaseInfo () should never be null.

private static int getLeaseRenewalOf(InstanceInfo info) {
    // bug : Lease.DEFAULT_DURATION_IN_SECS * 1000
    return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000;
}
Copy the code

2. Construction of PeerEurekaNode

The batchingDispatcher is initialized in the constructor of PeerEurekaNode.

  • Registry: indicates the local registry
  • TargetHost: eureka – server host
  • ReplicationClient: jersey-based cluster replicationClient communication component with peereureanode.header_replication set to true in the request header
  • ServiceUrl: indicates the eureka-server address
  • MaxProcessingDelayMs: The maximum processing delay in milliseconds. The default is 30000 milliseconds, or 30 seconds, which is useful when offline
  • BatcherName: indicates the batch processor name
  • TaskProcessor: ReplicationTaskProcessor that encapsulates targetHost and replicationClient. ReplicationTaskProcessor processes batch task submissions
  • BatchingDispatcher: batch task dispatcher. It sends tasks to eureka-server in batches to avoid multiple requests to Eureka-server
  • NonBatchingDispatcher: non-batch task dispatcher, which is a task by task submission
public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {
    this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS);
}

PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
                                 HttpReplicationClient replicationClient, EurekaServerConfig config,
                                 int batchSize, long maxBatchingDelayMs,
                                 long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
    this.registry = registry;
    // Cluster node host
    this.targetHost = targetHost;
    this.replicationClient = replicationClient;

    // Cluster node address
    this.serviceUrl = serviceUrl;
    this.config = config;
    // The maximum delay is 30 seconds by default
    this.maxProcessingDelayMs = config.getMaxTimeForReplication();

    // Batch processor name
    String batcherName = getBatcherName();

    // Copy the task handler
    ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
    // Batch task dispatcher
    this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
            batcherName,
            // Maximum capacity of the replication pool. The default value is 10000
            config.getMaxElementsInPeerReplicationPool(),
            batchSize, / / 250
            // The default maximum number of threads used for synchronization is 20
            config.getMaxThreadsForPeerReplication(),
            maxBatchingDelayMs, / / 500
            serverUnavailableSleepTimeMs, / / 1000
            retrySleepTimeMs, / / 100
            taskProcessor
    );
    // A single task dispatcher
    this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
            targetHost,
            config.getMaxElementsInStatusReplicationPool(),
            config.getMaxThreadsForStatusReplication(),
            maxBatchingDelayMs,
            serverUnavailableSleepTimeMs,
            retrySleepTimeMs,
            taskProcessor
    );
}
Copy the code

The batch dispatcher TaskDispatcher

Create batchingDispatcher calls when the TaskDispatchers. CreateBatchingTaskDispatcher method to create.

First look at the createBatchingTaskDispatcher parameters and a default value, the back analysis code will use these parameters:

  • Id: specifies the name of the batch distributor
  • MaxBufferSize: indicates the maximum number of cache pools. The default value is 10000
  • WorkloadSize: The number of workloads, i.e. the maximum number of tasks in a batch. The default value is 250
  • WorkerCount: Number of workers. This is the number of worker threads in the thread pool. The default is 20
  • MaxBatchingDelay: indicates the maximum delay for batch tasks in milliseconds. The default value is 500 milliseconds
  • CongestionRetryDelayMs: Number of milliseconds of blocking retry delay. Default is 1000 milliseconds
  • NetworkFailureRetryMs: indicates the number of milliseconds of network failure retry delay. The default value is 100 milliseconds
  • TaskProcessor: taskProcessor, ReplicationTaskProcessor

Take a look at this method:

  • First, a receiver executor is createdAcceptorExecutorThe main parameters are cache and time dependent
  • A task handler is createdTaskExecutors, the main parameters are the number of worker threads, the task processor, and the receiver executor, which can be assumed to be the executor that ultimately performs the batch task submission
  • Finally, a task distributor is createdTaskDispatcherFrom itprocessMethod shows that the task submitted by the distributor is actually submitted toAcceptorExecutor

From here you can know, front batchingDispatcher. When registering process () submit task is distributed to the acceptorExecutor the receiver actuator. The TaskDispatcher consists of AcceptorExecutor and TaskExecutors. The core distribution functions are located in the two components.

public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize,
                                                                         int workerCount, long maxBatchingDelay, long congestionRetryDelayMs,
                                                                         long networkFailureRetryMs, TaskProcessor<T> taskProcessor) {
    // Receiver executor AcceptorExecutor
    final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
            id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
    );

    // TaskExecutors, workerCount = 20
    final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);

    return new TaskDispatcher<ID, T>() {
        @Override
        public void process(ID id, T task, long expiryTime) {
            // The task is handled by acceptorExecutor
            acceptorExecutor.process(id, task, expiryTime);
        }

        @Override
        public void shutdown(a) { acceptorExecutor.shutdown(); taskExecutor.shutdown(); }}; }Copy the code

The receiver executor AcceptorExecutor

Let’s look at the constructor for creating AcceptorExecutor:

  • According to theCongestionRetryDelayMs, networkFailureRetryMsA time regulator is createdTrafficShaper, should be mainly used to adjust the compensation time
  • Then a background thread is createdacceptorThread, the task it runs isAcceptorRunner, is mainly to turn tasks into batch tasks
  • I ended up registering some monitoring statistics or something
AcceptorExecutor(String id, int maxBufferSize, int maxBatchingSize, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs) {
    // Batch processor name
    this.id = id;
    // Maximum buffer number: 10000
    this.maxBufferSize = maxBufferSize;
    // Maximum quantity per batch: 250
    this.maxBatchingSize = maxBatchingSize;
    // Maximum delay time: 500 ms
    this.maxBatchingDelay = maxBatchingDelay;
    // time regulator
    // congestionRetryDelayMs CongestionRetry delay time, 1000ms
    // networkFailureRetryMs network exception retry time, 100ms
    this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);

    // The receiver handles the thread in the background
    ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
    this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
    this.acceptorThread.setDaemon(true);
    this.acceptorThread.start();

    // Monitor statistics related
    final double[] percentiles = {50.0.95.0.99.0.99.5};
    final StatsConfig statsConfig = new StatsConfig.Builder()
            .withSampleSize(1000)
            .withPercentiles(percentiles)
            .withPublishStdDev(true)
            .build();
    final MonitorConfig config = MonitorConfig.builder(METRIC_REPLICATION_PREFIX + "batchSize").build();
    this.batchSizeMetric = new StatsTimer(config, statsConfig);
    try {
        Monitors.registerObject(id, this);
    } catch (Throwable e) {
        logger.warn("Cannot register servo monitor for this object", e); }}Copy the code

Then look at the AcceptorExecutor property, which defines several queues and containers to handle batch tasks. We’ll see how they are used later.

AcceptorExecutor makes extensive use of the following classes and queue features:

  • LinkedBlockingQueue: linked list based single-end blocking queue, that is, queue last queue, queue first queue out
  • Deque: two – end queue, that is, the head of the queue, the tail of the queue can enter and leave the queue
  • Semaphore: semaphore, which must acquire (or tryAcquire) a license before entering the critical zone and releasing the license. Semaphore allows multiple threads to enter a critical section as long as a license is obtained. In addition, notice that the number of licenses set here is 0, indicating that acquire can be obtained only after release is called and a license is placed.
// Queue to receive tasks
private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
// Queue for retry tasks
private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
// Background receiver thread
private final Thread acceptorThread;
// The task container to be processed
private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>();
// Queue in process
private final Deque<ID> processingOrder = new LinkedList<>();

// The semaphore of a single queue request
private final Semaphore singleItemWorkRequests = new Semaphore(0);
// Single task queue
private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();

// Batch queue request semaphore
private final Semaphore batchWorkRequests = new Semaphore(0);
// Batch task queue
private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
// time regulator
private final TrafficShaper trafficShaper;
Copy the code

TaskDispatcher calls acceptorExecutor. Process forwards the task to acceptorExecutor, adding the task to the end of the acceptorQueue.

void process(ID id, T task, long expiryTime) {
    acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
    acceptedTasks++;
}
Copy the code

Recipient Task AcceptorRunner

The task is added to the acceptorQueue, so where is the task processed? This is the AcceptorRunner task to handle, this task is more complex, I first put out the entire code, and then analyze.

class AcceptorRunner implements Runnable {
    @Override
    public void run(a) {
        long scheduleTime = 0;
        while(! isShutdown.get()) {try {
                // Move the tasks of the reprocessQueue and acceptorQueue to pendingTasks
                drainInputQueues();

                // The number to be processed
                int totalItems = processingOrder.size();

                long now = System.currentTimeMillis();
                if (scheduleTime < now) {
                    TransmissionDelay () normally returns 0
                    scheduleTime = now + trafficShaper.transmissionDelay();
                }
                if (scheduleTime <= now) {
                    // Assign batch work tasks: Batch tasks from pendingTasks to batchWorkQueue (up to 250 tasks)
                    assignBatchWork();
                    PendingTasks If there are any remaining tasks, move them to singleItemWorkQueue without expiration
                    assignSingleItemWork();
                }

                // If no worker is requesting data or there is a delay injected by the traffic shaper,
                // sleep for some time to avoid tight loop.
                if (totalItems == processingOrder.size()) {
                    Thread.sleep(10); }}catch (InterruptedException ex) {
                // Ignore
            } catch (Throwable e) {
                // Safe-guard, so we never exit this loop in an uncontrolled way.
                logger.warn("Discovery AcceptorThread error", e); }}}private boolean isFull(a) {
        PendingTasks >= 10000, i.e., pendingTasks can have a maximum of 10000 tasks
        return pendingTasks.size() >= maxBufferSize;
    }

    private void drainInputQueues(a) throws InterruptedException {
        do {
            // Dequeue reprocessQueue and move the tasks from the reprocessQueue queue to pendingTasks
            drainReprocessQueue();
            // Exit the acceptorQueue and move the acceptorQueue tasks to pendingTasks
            drainAcceptorQueue();

            if (isShutdown.get()) {
                break;
            }
            // If all queues are empty, block for a while on the acceptor queue
            if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                // Wait 10 milliseconds for the task to be placed in the acceptorQueue
                TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                if(taskHolder ! =null) {
                    // acceptorQueue and pendingTasks are not emptyappendTaskHolder(taskHolder); }}// If pendingTasks is empty, acceptorQueue is not empty, and reprocessQueue is not empty, the loop will continue
            // reprocessQueue, acceptorQueue, and pendingTasks are empty if all tasks are completed.
            // A loop waits for the task to enter the acceptorQueue, each time 10 milliseconds
        } while(! reprocessQueue.isEmpty() || ! acceptorQueue.isEmpty() || pendingTasks.isEmpty()); }private void drainAcceptorQueue(a) {
        while(! acceptorQueue.isEmpty()) {// Move the acceptorQueue task to pendingTasksappendTaskHolder(acceptorQueue.poll()); }}private void drainReprocessQueue(a) {
        long now = System.currentTimeMillis();
        while(! reprocessQueue.isEmpty() && ! isFull()) {Fetch the task from the end of the reprocessQueue
            TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
            ID id = taskHolder.getId();
            if (taskHolder.getExpiryTime() <= now) {
                // The task expired
                expiredTasks++;
            } else if (pendingTasks.containsKey(id)) {
                // pendingTasks already exists
                overriddenTasks++;
            } else {
                // Place the reprocessQueue task in pendingTasks
                pendingTasks.put(id, taskHolder);
                // Add to the head of the processingOrder queue. ReprocessQueue is the queue for failed retries, so it has a higher priorityprocessingOrder.addFirst(id); }}if (isFull()) {
            queueOverflows += reprocessQueue.size();
            // If pendingTasks is full, clear the reprocessQueuereprocessQueue.clear(); }}private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
        if (isFull()) {
            // When pendingTasks is full, remove an element
            pendingTasks.remove(processingOrder.poll());
            queueOverflows++;
        }
        // Place the tasks in the acceptorQueue to pendingTasks
        TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
        if (previousTask == null) {
            // Add the task ID to the end of the processingOrder queue
            processingOrder.add(taskHolder.getId());
        } else {
            // Override already existsoverriddenTasks++; }}void assignSingleItemWork(a) {
        if(! processingOrder.isEmpty()) {if (singleItemWorkRequests.tryAcquire(1)) {
                long now = System.currentTimeMillis();
                while(! processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id);if (holder.getExpiryTime() > now) {
                        // Move the pendingTasks task to singleItemWorkQueue
                        singleItemWorkQueue.add(holder);
                        return; } expiredTasks++; } singleItemWorkRequests.release(); }}}void assignBatchWork(a) {
        // There are enough tasks to do a batch
        if (hasEnoughTasksForNextBatch()) {
            if (batchWorkRequests.tryAcquire(1)) {
                long now = System.currentTimeMillis();
                // A maximum of 250 tasks
                int len = Math.min(maxBatchingSize, processingOrder.size());
                List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                // Move the task from pendingTasks to holders
                That is, if there are 500 tasks in the queue, the batch will be at most 250
                while(holders.size() < len && ! processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id);if (holder.getExpiryTime() > now) {
                        holders.add(holder);
                    } else{ expiredTasks++; }}if (holders.isEmpty()) {
                    batchWorkRequests.release();
                } else {
                    batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                    // Add to the batch queuebatchWorkQueue.add(holders); }}}}// Whether there are enough tasks to do a batch
    private boolean hasEnoughTasksForNextBatch(a) {
        if (processingOrder.isEmpty()) {
            return false;
        }
        if (pendingTasks.size() >= maxBufferSize) {
            return true;
        }

        // First fetch a task ID from the processingOrder team, and then read the task from pendingTasks. Note that peek() only removes the element, not the element at the head of the queue
        TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
        // Determine whether the time difference between submitting the task to the present exceeds the maximum batch delay time (500 ms)
        long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
        returndelay >= maxBatchingDelay; }}Copy the code

First look at its run method:

1. The tasks in the queue are moved to the container to be processed

DrainInputQueues move tasks from input queues (reprocessQueue, acceptorQueue) to the pendingTasks container.

The drainReprocessQueue shifts tasks from reprocessQueue to pendingTasks:

  • ifpendingTasksIt was full (over 10,000) and then emptiedreprocessQueue. Does task discarding matter?
  • Otherwise, ifreprocessQueueIf it is not empty, fetch it one by one from the end of the reprocessQueue:
    • If the task expires, it is discarded, indicating that the renewal period has expired (90 seconds). Instance registration, for example, if the synchronization fails several times and then is discarded, isn’t that the other server will never know about the registered instance? We’ll look at that later.
    • ifpendingTasksThe retry task is discarded because it already exists
    • Otherwise, add it to pendingTasks and go toprocessingOrderThe task ID is added to the header of
    • Note that it is taken one by one from the end of the reprocessQueue and put into the head of processingOrder. The final tasks are in the same order in processingOrder as in reprocessQueue

The drainAcceptorQueue shifts the tasks in the recipient queue to The pendingTasks:

  • If the acceptorQueue is not empty, fetch the task from the head of the queue
  • If pendingTasks is full, theprocessingOrderThe team leader fetches the ID of the first task and removes it from pendingTasks
  • Otherwise, the task is added to pendingTasks, or if no task with the same ID already exists, the task ID is added to the end of the processingOrder queue
  • Notice that it fetches the acceptorQueue tasks from the start of the acceptorQueue and places them at the end of the processingOrder queue. The final tasks are in the same order in the processingOrder as in the acceptorQueue

ProcessingOrder determines the order in which tasks are processed, with the first one being processed first, indicating that reprocessQueue has a higher priority than acceptorQueue. PendingTasks is a key-value queue, making it easy to quickly read tasks by ID.

private void drainAcceptorQueue(a) {
    while(! acceptorQueue.isEmpty()) {// Move the acceptorQueue task to pendingTasksappendTaskHolder(acceptorQueue.poll()); }}private void drainReprocessQueue(a) {
    long now = System.currentTimeMillis();
    while(! reprocessQueue.isEmpty() && ! isFull()) {Fetch the task from the end of the reprocessQueue
        TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
        ID id = taskHolder.getId();
        if (taskHolder.getExpiryTime() <= now) {
            // The task expired
            expiredTasks++;
        } else if (pendingTasks.containsKey(id)) {
            // pendingTasks already exists
            overriddenTasks++;
        } else {
            // Place the reprocessQueue task in pendingTasks
            pendingTasks.put(id, taskHolder);
            // Add to the head of the processingOrder queue. ReprocessQueue is the queue for failed retries, so it has a higher priorityprocessingOrder.addFirst(id); }}if (isFull()) {
        queueOverflows += reprocessQueue.size();
        // If pendingTasks is full, clear the reprocessQueuereprocessQueue.clear(); }}private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
    if (isFull()) {
        // When pendingTasks is full, remove an element
        pendingTasks.remove(processingOrder.poll());
        queueOverflows++;
    }
    // Place the tasks in the acceptorQueue to pendingTasks
    TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
    if (previousTask == null) {
        // Add the task ID to the end of the processingOrder queue
        processingOrder.add(taskHolder.getId());
    } else {
        // Override already existsoverriddenTasks++; }}Copy the code

2. Next, trafficShaper is used to obtain a compensation time, which is mainly used to compensate the time during the task scheduling cycle when a task fails due to congestion or network exceptions. This will be analyzed until the task fails to be submitted.

long now = System.currentTimeMillis();
if (scheduleTime < now) {
    TransmissionDelay () normally returns 0
    scheduleTime = now + trafficShaper.transmissionDelay();
}
Copy the code

3. Pack tasks

Then there is assignBatchWork, which is the grouping of tasks into batches:

  • First callhasEnoughTasksForNextBatchDetermine if there are enough tasks to be grouped into a batch, and note that it determines whether the latest submitted task has exceeded the delay timeMaxBatchingDelay (500 ms)That is to sayBatch tasks run every 500 milliseconds.
  • After being able to package, to obtainbatchWorkRequestsA license for semaphore, because the license default quantity is0That must have been called somewhere firstbatchWorkRequests.release()Put in the license, otherwise this wouldn’t be packed.
  • Then you can see that the maximum number of tasks in a batch is250
  • It retrieves the batch’s task ID from the head of the queue in processingOrder, and it retrieves the task from pendingTasks, discarding it if it is expired.
  • It then calls if the batch does not have a taskbatchWorkRequests.release()Release the license, otherwise add the batch task to the batch work queuebatchWorkQueueNotice that the license is not released.
void assignBatchWork(a) {
    // There are enough tasks to do a batch
    if (hasEnoughTasksForNextBatch()) {
        // Get the license
        if (batchWorkRequests.tryAcquire(1)) {
            long now = System.currentTimeMillis();
            // A maximum of 250 tasks
            int len = Math.min(maxBatchingSize, processingOrder.size());
            List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
            // Move the task from pendingTasks to holders
            That is, if there are 500 tasks in the queue, the batch will be at most 250
            while(holders.size() < len && ! processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id);if (holder.getExpiryTime() > now) {
                    holders.add(holder);
                } else{ expiredTasks++; }}if (holders.isEmpty()) {
                batchWorkRequests.release();
            } else {
                batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                // Add to the batch queuebatchWorkQueue.add(holders); }}}}// Whether there are enough tasks to do a batch
private boolean hasEnoughTasksForNextBatch(a) {
    if (processingOrder.isEmpty()) {
        return false;
    }
    if (pendingTasks.size() >= maxBufferSize) {
        return true;
    }

    // First fetch a task ID from the processingOrder team, and then read the task from pendingTasks. Note that peek() only removes the element, not the element at the head of the queue
    TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
    // Determine whether the time difference between submitting the task to the present exceeds the maximum batch delay time (500 ms)
    long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
    return delay >= maxBatchingDelay;
}
Copy the code

assignSingleItemWork

  • If processingOrder is not empty and gotsingleItemWorkRequestsThe semaphore’s license pulls out the remaining tasks in the processingOrder queue and places them on a single item work queuesingleItemWorkQueue
  • That is, a batch of tasks have been played ahead (250) after that, there are still tasks in processingOrder, and they are taken out and put insingleItemWorkQueueIn the queue
void assignSingleItemWork(a) {
    if(! processingOrder.isEmpty()) {if (singleItemWorkRequests.tryAcquire(1)) {
            long now = System.currentTimeMillis();
            while(! processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id);if (holder.getExpiryTime() > now) {
                    // Move the pendingTasks task to singleItemWorkQueue
                    singleItemWorkQueue.add(holder);
                    return; } expiredTasks++; } singleItemWorkRequests.release(); }}}Copy the code

Task processor TaskExecutors

* * Off the batchWorkQueue and singleItemWorkQueue / * off the batchWorkQueue and singleItemWorkQueue / * off the batchWorkQueue and singleItemWorkQueue / * off the batchWorkQueue and singleItemWorkQueue / * off the batchWorkQueue and singleItemWorkQueue / * off the batchWorkQueue and singleItemWorkQueue / * off the TaskExecutors.

1. Create TaskExecutors

Create TaskExecutors by following the methods:

  • The class for batch processing tasks isBatchWorkerRunnable, which basically processes batch task queuesbatchWorkQueueThe task
  • The class that handles a single task isSingleTaskWorkerRunnable, which mainly deals with single task queuessingleItemWorkQueueThe task
  • TaskExecutors create a thread pool, batchExecutors have one by default20(not sure why he doesn’t use the JDK’s existing thread pool…) , singleItemExecutors has only one working thread by default.
static <ID, T> TaskExecutors<ID, T> singleItemExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) {
    final AtomicBoolean isShutdown = new AtomicBoolean();
    final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
    registeredMonitors.put(name, metrics);
    // workerCount = 1
    return new TaskExecutors<>(idx -> new SingleTaskWorkerRunnable<>("TaskNonBatchingWorker-" + name + The '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
}

////////////////////////////////////////////////

static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) {
    final AtomicBoolean isShutdown = new AtomicBoolean();
    final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
    registeredMonitors.put(name, metrics);
    // BatchWorkerRunnable Batch task processing
    return new TaskExecutors<>(idx -> new BatchWorkerRunnable<>("TaskBatchingWorker-" + name + The '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
}

////////////////////////////////////////////////

private final List<Thread> workerThreads;

TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
    this.isShutdown = isShutdown;
    // Set of worker threads
    this.workerThreads = new ArrayList<>();

    // Create a pool of 20 threads
    ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
    for (int i = 0; i < workerCount; i++) {
        WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i);
        Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
        workerThreads.add(workerThread);
        workerThread.setDaemon(true); workerThread.start(); }}Copy the code

2, BatchWorkerRunnable

Look at batch processing tasks:

  • First of all,getWorkGet the batch task that it callstaskDispatcher.requestWorkItems()“, which actually returns taskDispatcherbatchWorkQueueAnd callbatchWorkRequests.release()Put a license into the semaphore so that the previous AcceptorRunner can get the license and package the batch task
  • If there are no batch tasks in the batchWorkQueue, you can see that you are waiting in the while loop until you get a batch task. It is thisBatchWorkerRunnableThe mission and the one aheadAcceptorRunnerTasks, senses form one by way of semaphoresMechanism for waiting for notificationsBatchWorkerRunnable puts a permit in the AcceptorRunner’s hand to order a batch of tasks.
  • When you get this batch of tasks, callProcessor (ReplicationTaskProcessor)To deal with tasks.
  • If the result of task processing isCongestion(blocking),TransientError(transfer failure) is about to be reprocessed, calledtaskDispatcher.reprocessSubmit this batch of tasks to the reprocessing queuereprocessQueueIn the.
static class BatchWorkerRunnable<ID.T> extends WorkerRunnable<ID.T> {

    BatchWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) {
        super(workerName, isShutdown, metrics, processor, acceptorExecutor);
    }

    @Override
    public void run(a) {
        try {
            while(! isShutdown.get()) {// Get a batch task
                List<TaskHolder<ID, T>> holders = getWork();
                metrics.registerExpiryTimes(holders);
                // TaskHolder extracts ReplicationTask
                List<T> tasks = getTasksOf(holders);
                // Processor => Task replication processor ReplicationTaskProcessor
                ProcessingResult result = processor.process(tasks);
                switch (result) {
                    case Success:
                        break;
                    case Congestion:
                    case TransientError:
                        // If the network is blocked or fails, the batch of tasks will be processed again
                        taskDispatcher.reprocess(holders, result);
                        break;
                    case PermanentError:
                        logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); } metrics.registerTaskResult(result, tasks.size()); }}catch (InterruptedException e) {
            // Ignore
        } catch (Throwable e) {
            // Safe-guard, so we never exit this loop in an uncontrolled way.
            logger.warn("Discovery WorkerThread error", e); }}private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
        // Get the batchWorkQueue
        BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
        List<TaskHolder<ID, T>> result;
        do {
            result = workQueue.poll(1, TimeUnit.SECONDS);
            // Loop until a batch task is fetched
        } while(! isShutdown.get() && result ==null);
        return (result == null)?newArrayList<>() : result; }}Copy the code
BlockingQueue<TaskHolder<ID, T>> requestWorkItem() {
    singleItemWorkRequests.release();
    return singleItemWorkQueue;
}

BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
    batchWorkRequests.release();
    return batchWorkQueue;
}
Copy the code

3. Task reprocessing

If the process fails, the task is added to the reprocessQueue, and the reprocessQueue fails to register with the reprocessQueue.

void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) {
    // Add to reprocessing queue reprocessQueue
    reprocessQueue.addAll(holders);
    replayedTasks += holders.size();
    // Time adjuster registration failed
    trafficShaper.registerFailure(processingResult);
}
Copy the code

4, TrafficShaper

TrafficShaper: trafficShaper trafficShaper: trafficShaper trafficShaper trafficShaper trafficShaper trafficShaper trafficShaper trafficShaper trafficShaper trafficShaper trafficShaper

long now = System.currentTimeMillis();
if (scheduleTime < now) {
    TransmissionDelay () normally returns 0
    scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
    // Assign batch work tasks: Batch tasks from pendingTasks to batchWorkQueue (up to 250 tasks)
    assignBatchWork();
    PendingTasks If there are any remaining tasks, move them to singleItemWorkQueue without expiration
    assignSingleItemWork();
}
Copy the code

TrafficShaper:

  • registerFailureIs to set the final time of failure
  • Then watchtransmissionDelayFor example, if the last block failed 500 milliseconds ago, then transmissionDelay returns 500. Then transmissionDelay is greater than now and the task will not be packaged.
  • To summarize, if the batch task fails due to the last block, it will be executed 1000 ms later. If the batch task fails to be submitted due to the last network failure, the execution will be delayed for 100 ms.
TrafficShaper(long congestionRetryDelayMs, long networkFailureRetryMs) {
    / / 1000
    this.congestionRetryDelayMs = Math.min(MAX_DELAY, congestionRetryDelayMs);
    / / 100
    this.networkFailureRetryMs = Math.min(MAX_DELAY, networkFailureRetryMs);
}

void registerFailure(ProcessingResult processingResult) {
    if (processingResult == ProcessingResult.Congestion) {
        // The time when the last block caused the commit batch to fail
        lastCongestionError = System.currentTimeMillis();
    } else if (processingResult == ProcessingResult.TransientError) {
        // The time when the last batch submission failed due to network reasonslastNetworkFailure = System.currentTimeMillis(); }}// Calculate the transmission delay time
long transmissionDelay(a) {
    if (lastCongestionError == -1 && lastNetworkFailure == -1) {
        return 0;
    }

    long now = System.currentTimeMillis();
    if(lastCongestionError ! = -1) {
        // Block delay time
        long congestionDelay = now - lastCongestionError;
        if (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) {
            return congestionRetryDelayMs - congestionDelay;
        }
        lastCongestionError = -1;
    }

    if(lastNetworkFailure ! = -1) {
        // Network latency
        long failureDelay = now - lastNetworkFailure;
        if (failureDelay >= 0 && failureDelay < networkFailureRetryMs) {
            return networkFailureRetryMs - failureDelay;
        }
        lastNetworkFailure = -1;
    }
    return 0;
}
Copy the code

5, SingleTaskWorkerRunnable

The process of single task processing is similar to that of batch task processing, except that synchronous operation is sent one by one. The failure of processing will also be put into the reprocessing queue.

If there are many instances in the microservice cluster, Eureka can use batch processing as much as possible through constant polling. I think single task processing is more like a supplement to batch processing.

ReplicationTaskProcessor ReplicationTaskProcessor

Batch tasks are submitted to ReplicationTaskProcessor. ReplicationClient is used to submit batch tasks. The interface for submitting batch tasks is POST PeerReplication/Batch. This entry allows us to see how eureka-Server receives batch tasks.

public ProcessingResult process(List<ReplicationTask> tasks) {
    // Task encapsulation to ReplicationList
    ReplicationList list = createReplicationListOf(tasks);
    try {
        // Submit batch tasks: POST peerreplication/batch/
        EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
        int statusCode = response.getStatusCode();
        if(! isSuccess(statusCode)) {if (statusCode == 503) {
                return ProcessingResult.Congestion;
            } else {
                returnProcessingResult.PermanentError; }}else {
            // Process the results of batch taskshandleBatchResponse(tasks, response.getEntity().getResponseList()); }}catch (Throwable e) {
        if (maybeReadTimeOut(e)) {
            return ProcessingResult.Congestion;
        } else if (isNetworkConnectException(e)) {
            return ProcessingResult.TransientError;
        } else {
            returnProcessingResult.PermanentError; }}return ProcessingResult.Success;
}
Copy the code

Receive replication synchronization requests

It is easy to find the batch task submission interface in the batchReplication method of PeerReplicationResource.

As can be seen, in fact, iterating through batch tasks, and then calling XxxResource interface for corresponding operations according to different operation types. Such as registration, is called applicationResource. AddInstance completed instance of registration.

@Path("/{version}/peerreplication")
@Produces({"application/xml", "application/json"})
public class PeerReplicationResource {
    private static final Logger logger = LoggerFactory.getLogger(PeerReplicationResource.class);

    private static final String REPLICATION = "true";

    private final EurekaServerConfig serverConfig;
    private final PeerAwareInstanceRegistry registry;

    @Inject
    PeerReplicationResource(EurekaServerContext server) {
        this.serverConfig = server.getServerConfig();
        this.registry = server.getRegistry();
    }

    public PeerReplicationResource(a) {
        this(EurekaServerContextHolder.getInstance().getServerContext());
    }

    @Path("batch")
    @POST
    public Response batchReplication(ReplicationList replicationList) {
        try {
            ReplicationListResponse batchResponse = new ReplicationListResponse();
            for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
                try {
                    // dispatch Dispatches tasks
                    batchResponse.addResponse(dispatch(instanceInfo));
                } catch (Exception e) {
                    batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null)); }}return Response.ok(batchResponse).build();
        } catch (Throwable e) {
            returnResponse.status(Status.INTERNAL_SERVER_ERROR).build(); }}private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
        ApplicationResource applicationResource = createApplicationResource(instanceInfo);
        InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);

        String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
        String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
        String instanceStatus = toString(instanceInfo.getStatus());

        Builder singleResponseBuilder = new Builder();
        // According to different types of processing
        switch (instanceInfo.getAction()) {
            case Register: / / register
                singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
                break;
            case Heartbeat: / / contract
                singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
                break;
            case Cancel: / / offline
                singleResponseBuilder = handleCancel(resource);
                break;
            case StatusUpdate: // Status changes
                singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
                break;
            case DeleteStatusOverride:
                singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
                break;
        }
        return singleResponseBuilder.build();
    }

    private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
        // addInstance
        applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
        return new Builder().setStatusCode(Status.OK.getStatusCode());
    }

    private static Builder handleCancel(InstanceResource resource) {
        // cancelLease
        Response response = resource.cancelLease(REPLICATION);
        return new Builder().setStatusCode(response.getStatus());
    }

    private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) {
        Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp);
        int responseStatus = response.getStatus();
        Builder responseBuilder = new Builder().setStatusCode(responseStatus);

        if ("false".equals(config.getExperimental("bugfix.934"))) {
            if(responseStatus == Status.OK.getStatusCode() && response.getEntity() ! =null) { responseBuilder.setResponseEntity((InstanceInfo) response.getEntity()); }}else {
            if((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode()) && response.getEntity() ! =null) { responseBuilder.setResponseEntity((InstanceInfo) response.getEntity()); }}return responseBuilder;
    }

    private static Builder handleStatusUpdate(ReplicationInstance instanceInfo, InstanceResource resource) {
        Response response = resource.statusUpdate(instanceInfo.getStatus(), REPLICATION, toString(instanceInfo.getLastDirtyTimestamp()));
        return new Builder().setStatusCode(response.getStatus());
    }

    private static Builder handleDeleteStatusOverride(ReplicationInstance instanceInfo, InstanceResource resource) {
        Response response = resource.deleteStatusUpdate(REPLICATION, instanceInfo.getStatus(),
                instanceInfo.getLastDirtyTimestamp().toString());
        return new Builder().setStatusCode(response.getStatus());
    }

    private static <T> String toString(T value) {
        if (value == null) {
            return null;
        }
        returnvalue.toString(); }}Copy the code

Cluster data synchronization conflicts

The Peer to Peer mode focuses on solving the problem of data replication conflict, because the replication between Peer nodes cannot guarantee the success of all operations. Eureka mainly uses lastDirtyTimestamp and heartbeat to make the final data repair. Here is how Eureka deals with data conflicts.

1, look at the method of renewal

  • In the contractrenewLeaseiflastDirtyTimestampIs not null and allows synchronization if the timestamp is inconsistent (on by default)validateDirtyTimestampMethod validates lastDirtyTimestamp.
  • If lastDirtyTimestamp is the same as lastDirtyTimestamp of the local instance, the data is consistent and the renewal is successfulOK (200)..
  • If lastDirtyTimestamp is greater than the lastDirtyTimestamp of the local instance, the replicated instance is newly updated and a data conflict occursNOT_FOUND (404)..
  • If lastDirtyTimestamp is smaller than the lastDirtyTimestamp of the local instance, the replicated instance is old and data conflicts occurCONFLICT (409).And returns a local instance.
@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    // Call registry renew to renew the service
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    if(! isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();
    }
    
    Response response;
    // If it is a copy operation, check lastDirtyTimestamp
    if(lastDirtyTimestamp ! =null && serverConfig.shouldSyncWhenTimestampDiffers()) {
        response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        // Store the overridden status since the validation found out the node that replicates wins
        if(response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus ! =null) &&! (InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); }}else {
        response = Response.ok().build();
    }
    return response;
}

private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) {
    InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
    if(appInfo ! =null) {
        // If lastDirtyTimestamp in the copied instance is not the same as lastDirtyTimestamp in the local instance
        if((lastDirtyTimestamp ! =null) && (! lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) { Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                // If the lastDirtyTimestamp of the replicated instance is greater than the lastDirtyTimestamp of the local instance, data conflict occurs. 404 is returned and the application instance is required to register again
                return Response.status(Status.NOT_FOUND).build();
            } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                // In the case of replication, send the current instance info in the registry for the
                // replicating node to sync itself with this one.
                if (isReplication) {
                    CONFLICT(409) if the lastDirtyTimestamp of the local instance is > the lastDirtyTimestamp of the replicated instance, CONFLICT(409) is returned
                    // Notice that the local instance appInfo is put into the Response Entity
                    return Response.status(Status.CONFLICT).entity(appInfo).build();
                } else {
                    returnResponse.ok().build(); }}}}return Response.ok().build();
}
Copy the code

2. Next, look at how PeerReplicationResource handles heartbeats

  • The first is to call the renewal methodrenewLeaseTo renew the contract
  • If the returned state isOKorCONFLICTIn resposeEntity to return the local instance
private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) {
    // Call renewLease
    Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp);
    int responseStatus = response.getStatus();
    Builder responseBuilder = new Builder().setStatusCode(responseStatus);

    if ("false".equals(config.getExperimental("bugfix.934"))) {
        if(responseStatus == Status.OK.getStatusCode() && response.getEntity() ! =null) { responseBuilder.setResponseEntity((InstanceInfo) response.getEntity()); }}else {
        if((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode()) && response.getEntity() ! =null) {
            // Local instance appInfo is returned to the client on successful renewal or CONFLICTresponseBuilder.setResponseEntity((InstanceInfo) response.getEntity()); }}return responseBuilder;
}
Copy the code

3. PeerEurekaNode sends the heartbeat

After receiving the result of batch tasks, ReplicationTaskProcessor processes the response result. For heartbeat tasks, you can find the response result and call back handleFailure.

  • If the return status is404 (NOT_FOUND), will be re-registered, also committed to the queue. throughRe - register for data synchronization.
  • If I were in some other state(409 CONFLICT)And enabled the synchronization of inconsistent timestamp configuration, justRegister the instance returned by the server to the local server for data synchronization.
public void heartbeat(final String appName, final String id, final InstanceInfo info, 
			final InstanceStatus overriddenStatus, boolean primeConnection) throws Throwable {
    ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
        @Override
        public EurekaHttpResponse<InstanceInfo> execute(a) throws Throwable {
        	// Send heartbeat to cluster Server
            return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        }

        @Override
        public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
            super.handleFailure(statusCode, responseEntity);
            if (statusCode == 404) {
                logger.warn("{}: missing entry.", getTaskName());
                if(info ! =null) {
                    // Copy back 404, register againregister(info); }}else if (config.shouldSyncWhenTimestampDiffers()) {
                // 409 (CONFLICT) synchronizes the instance returned by the server to the local
                InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                if(peerInstanceInfo ! =null) { syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo); }}}};long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}

private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
    try {
        if(infoFromPeer ! =null) {
            // Register the server instance to the local host for data synchronization
            registry.register(infoFromPeer, true); }}catch (Throwable e) {
        logger.warn("Exception when trying to set information from peer :", e); }}Copy the code

So far, we can conclude that Eureka Server realizes replication and final synchronization of cluster data by comparing lastDirtyTimestamp with heartbeat operation.

It also does not guarantee the success of all operations of replication between peer nodes. Eureka adopts final consistency, which realizes the final repair and synchronization of cluster data by means of heartbeat, but the synchronization between clusters may be delayed.

A diagram summarizes cluster synchronization

The following is a summary of synchronization between eureka-Server cluster nodes:

  • The first eureka-Server cluster usesPeer To PeerIn peer-to-peer replication mode, each server can receive write requests and synchronize data with each other.
  • Data synchronization is adoptedMulti-layer task queue + batch processing mechanism:
    • Eureka-server invokes PeerEurekaNode to synchronize operations after receiving client requests such as registration, offline, and renewal
    • PeerEurekaNode encapsulates operations into InstanceReplicationTask instance replication tasks, and dispatches them using batchingDispatcher (TaskDispatcher)
    • Inside the batchingDispatcher, the job is sent to the AcceptorExecutor acceptorQueue
    • AcceptorExecutor has a background worker thread (AcceptorRunner) that keeps polling, Move tasks from the acceptorQueue and reprocessQueue to the processing queue (processingOrder + pendingTasks)
    • The tasks in the processing queue are then packaged up to 250 at a time and placed in the batchWorkQueue. If there are still tasks in the processing queue, the task is placed in the singleItemWorkQueue
    • The mission is packed, BatchWorkerRunnable and SingleTaskWorkerRunnable are available to process the batchWorkQueue and TaskExecutors respectively Tasks in singleItemWorkQueue
    • The processor uses the Task Replication Processor (ReplicationTaskProcessor) to submit tasks. Batch tasks are submitted to the batch interface of the server node (peerreplication/batch/), single task will be submitted to the corresponding operation interface
    • A task submission that is blocked or fails is placed in the Reprocessing queue and polled again by the AcceptorRunner, but any task that has expired (in more than 90 seconds) is discarded
  • Other Eureka-server synchronization:
    • After receiving the batch replication request, other Eureka-servers poll the batch task list and invoke Resource interfaces to process the batch replication request based on the operation type (Register, Heartbeat, Cancel, etc.)
    • In the case of a renewal operation, the system determines the lastDirtyTimestamp of the replication instance and the lastDirtyTimestamp of the local instance. If they are consistent, the task data is consistent
    • If the instance is copiedLastDirtyTimestamp > lastDirtyTimestamp of the local instanceA 404 (NOT_FOUND) is returned asking the client to re-send a registration operation
    • If the instance is copiedLastDirtyTimestamp < lastDirtyTimestamp of the local instance, the data of the local instance is the latest, return 409 (CONFLICT) and the local instance, and the client overwrites the local instance with the returned instance

Here’s another diagram to summarize cluster synchronization: