This article has participated in the weekend study program, click the link to see the details of the link

One, foreword

Look at the source code: catch small, the main process, and then details.

2. To guess with skill:

  1. See method name (English name)
  2. See the comments
  3. Start with unit tests

eurekaRunning the core process:

  1. Service Registration:eureka clienteureka serverRegistration process
  2. Service discovery:eureka clienteureka serverThe process of obtaining the registry
  3. Service heartbeat:eureka clientTiming toeureka serverSend renewal notification (heartbeat)
  4. Service instance removal: The service is offline or faulty
  5. Communications:HTTP
  6. Current limiting
  7. Self-protection: Automatically identifies whether a network fault occurs
  8. serverThe cluster:eureka-serverBetween the mutual registration, multi-level queue task batch mechanism

eureka server: Provides the functions of the registry.

Prerequisites: Eureka-Server is a Web application that can be started in Tomcat as a WAR package.





(1) source code starting point

Source code can be divided into two points:

  1. eureka-serverbuild.gradle: configuration required for various dependencies and builds
  2. eureka-serverweb.xml:webApply the core, define variouslistenerfilter


1) from thebuild.gradleYou can see

File path: Eureka-server /build.gradle

As you can see, the dependent modules are:

  1. Eureka-client: In cluster mode, each server is also a client and can register with each other.

  2. Eureka-core: The core role of the registry. It receives service registration requests, provides service discovery functions, maintains heartbeat (renewal requests), and removes failed service instances.

  3. Jersey: RESTful HTTP framework, communication between Eureka client and Eureka Server, is based on Jersey.

    For example, eureka-client-jersey2, eureka-core-Jersey2

  4. Others can be found at build.gradle


2) fromweb.xmlYou can see

File path: eureka – server/SRC/main/webapp/WEB – INF/WEB. XML

fromweb.xmlWe can see:

  1. Listener: initializes web applications. For example, start a background thread to load a configuration file

    Corresponding eureka – server, is com.net flix. Eureka. EurekaBootStrap

  2. Filter: The request is processed


The core of thefilterThere are four:

Of course, each filter also defines a corresponding URL match in web.xml.

  • StatusFilter: Is responsible for state-related processing logic
  • ServerRequestAuthFilter: processing logic related to authentication
  • RateLimitingFilter: Is responsible for the processing logic related to traffic limiting
  • GzipEncodingEnforcingFilter:gzip, compression related

The last one is Jersey’s ServletContainer: Receives all requests as an entry point to the request

<filter>
    <filter-name>jersey</filter-name>
    <filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>. .</filter>
Copy the code


(2) Some concepts

Read the source code to prepare for some of Eureka’s concepts:

  1. RegisterThe service registry
  2. RenewThe service contract
  3. Fetch RegistriesGet service registration list information
  4. CancelService offline
  5. EvictionService to eliminate





What have you learned from the source code

What Fan Fan feels “learned.”

Tips: Method names and variable names are worth learning everyday.

(1) The use of thread pools

Learn how others use thread pools so you can use them when writing your own architecture.

Source thread pool:

  1. Scheduling thread pool
  2. Heartbeat thread pool
  3. Cache flushes the thread pool
  4. Nodes synchronize the registry thread pool

Detailed analysis, as follows:

  1. Scheduling thread pool

    / / location: com.net flix. Discovery. DiscoveryClient. Java
    private final ScheduledExecutorService scheduler;
    // Support scheduling thread pool, core thread number 2, using Google toolkit thread factory, background thread
    scheduler = Executors.newScheduledThreadPool(
        2.new ThreadFactoryBuilder()
        .setNameFormat("DiscoveryClient-%d")
        .setDaemon(true)
        .build());
    Copy the code
  2. Heartbeat thread pool

    / / location: com.net flix. Discovery. DiscoveryClient. Java
    private final ThreadPoolExecutor heartbeatExecutor;
    // Support heartbeat thread pool: core thread number 1, default maximum thread number 5, and thread lifetime is 0, background thread
    heartbeatExecutor = new ThreadPoolExecutor(
        1.5.0, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(),
        new ThreadFactoryBuilder()
        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
        .setDaemon(true)
        .build()
    Copy the code
  3. Cache flushes the thread pool

    / / location: com.net flix. Discovery. DiscoveryClient. Java
    private final ThreadPoolExecutor cacheRefreshExecutor;
    // Support cache refresh thread pool: core thread number 1, the default maximum number of threads 5, and thread lifetime is 0, background thread
    cacheRefreshExecutor = new ThreadPoolExecutor(
        1.5.0, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(),
        new ThreadFactoryBuilder()
        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
        .setDaemon(true)
        .build()
    ); 
    Copy the code
  4. Nodes synchronize the registry thread pool

    / / location: com.net flix. Eureka. Cluster. PeerEurekaNodes. Java
    private ScheduledExecutorService taskExecutor;
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                thread.setDaemon(true);
                returnthread; }}}Copy the code


(2)ConcurrentHashMapuse

/ / location: com.net flix. Eureka. Registry. ResponseCacheImpl. Java
// 1. Read-only cache
// Expiration policy: Passive expiration
private final ConcurrentMap<Key, Value> readOnlyCacheMap = 
    new ConcurrentHashMap<Key, Value>();

Read/write cache: use Guava's cache
// Expiration policy: Active expiration
private final LoadingCache<Key, Value> readWriteCacheMap = 
    CacheBuilder.newBuilder()
    .initialCapacity(1000)
    .expireAfterWrite(180, TimeUnit.SECONDS) // Expiration time: 180 seconds by default
    .removalListener(new RemovalListener<Key, Value>() {
        @Override
        public void onRemoval(RemovalNotification<Key, Value> notification) {
            Key removedKey = notification.getKey();
            if (removedKey.hasRegions()) {
                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
            }
        }
    })
    .build(new CacheLoader<Key, Value>() {
        @Override
        public Value load(Key key) throws Exception {
            if (key.hasRegions()) {
                Key cloneWithNoRegions = key.cloneWithoutRegions();
                regionSpecificKeys.put(cloneWithNoRegions, key);
            }
            Value value = generatePayload(key);
            returnvalue; }});Copy the code


(3) Circular queue

The com.net flix. Eureka. Registry. AbstractInstanceRegistry. Two circular queue defined in Java.

// The most recently registered queue
private final CircularQueue<Pair<Long, String>> recentRegisteredQueue;
// The queue was recently cancelled
private final CircularQueue<Pair<Long, String>> recentCanceledQueue;
Copy the code
private class CircularQueue<E> extends ConcurrentLinkedQueue<E> {
    private int size = 0;

    public CircularQueue(int size) {
        this.size = size;
    }
    
    @Override
    public boolean add(E e) {
        this.makeSpaceIfNotAvailable();
        return super.add(e);
    }
    
    private void makeSpaceIfNotAvailable(a) {
        if (this.size() == size) {
            this.remove(); }}public boolean offer(E e) {
        this.makeSpaceIfNotAvailable();
        return super.offer(e); }}Copy the code


(4) The use of locks

The use of locks are:

  1. Read-write lockReentrantReadWriteLock
  2. synchronizeduse
  3. ReentrantLockuse


Detailed use, as follows:

  1. Read-write lockReentrantReadWriteLock
/ / location: com.net flix. Eureka. Registry. AbstractInstanceRegistry. Java
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock read = readWriteLock.readLock();
private final Lock write = readWriteLock.writeLock();

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        / / read lock
        read.lock();
    } finally {
        / / releaseread.unlock(); }}Copy the code
  1. synchronizeduse
/ / location: com.net flix. Eureka. Registry. AbstractInstanceRegistry. Java
protected final Object lock = new Object();

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    synchronized(lock) { ... . }}Copy the code
  1. ReentrantLockuse
/ / location: com.net flix. Discovery. DiscoveryClient. Java
private final Lock fetchRegistryUpdateLock = new ReentrantLock();

// Try locking
if (fetchRegistryUpdateLock.tryLock()) {
    try{... . }finally {
        / / releases the lockfetchRegistryUpdateLock.unlock(); }}else {
    // Failed to obtain the lock
    logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
Copy the code


(5) Use of timer

It is found that eureka uses a large number of timers:

  • Timer is one of the earlier implementations of the JDK and can implement fixed-cycle tasks, as well as deferred tasks.

  • A Timer starts an asynchronous thread to execute a task that is due. The task can be scheduled for execution only once, or it can be periodically repeated multiple times.

Example code for how Timer is used is as follows:

Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run(a) {
        // do something}},10000.1000);  // A scheduled task is scheduled 10 seconds later
Copy the code

As can be seen, the task is implemented by the TimerTask class. TimerTask is an abstract class that implements the Runnable interface. Timer is responsible for scheduling and executing the TimerTask.

Let’s take a look at the Timer internals:

public class Timer {
    // Run O(1), add O(logn), cancel O(logn)
    private final TaskQueue queue = new TaskQueue();
    // Create another thread, task processing, will poll queue
    private final TimerThread thread = new TimerThread(queue);
    public Timer(String name) { thread.setName(name); thread.start(); }}Copy the code

TimerIt has a number of design flaws, so it is not recommended:

  • Timer is a single-threaded mode. If a TimerTask takes a long time to execute, the scheduling of other tasks will be affected.

  • The task scheduling of Timer is based on the absolute time of the system. If the system time is incorrect, problems may occur.

  • If the TimerTask execution is abnormal, the Timer does not catch it, causing the thread to terminate and other tasks to never execute.





Third, direct hate source code

(1) Initialization

When eureka-Server is initialized, this listener is triggered:

In eureka – server/SRC/main/webapp/WEB – INF/WEB. XML definition

<listener>
    <listener-class>com.netflix.eureka.EurekaBootStrap</listener-class>
</listener>
Copy the code

The listener’s initialization method, contextInitialized(), is as follows:

/ / positioning: eureka - core/SRC/main/Java/com/netflix/had been/EurekaBootStrap. Java
@Override
public void contextInitialized(ServletContextEvent event) {
    try {
        // 1. Initialize environment variables
        initEurekaEnvironment();
        // 2. Initialize context
        initEurekaServerContext();

        ServletContext sc = event.getServletContext();
        sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
    } catch (Throwable e) {
        logger.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e); }}Copy the code

The current source point can be divided into two points:

  1. Initialize the environment variable initEurekaEnvironment()

  2. Initialize context initEurekaServerContext()

1) Initialize environment variablesinitEurekaEnvironment()

The main process steps are as follows:

  1. Create ConcurrentCompositeConfiguration: represents the so-called configuration, including the eureka need all of the configuration

    1.1 At creation time, the clear() method is first called

    After 1.2, fireEvent() releases an event (EVENT_CLEAR)

  2. toConcurrentCompositeConfiguration configThe instance added a bunch of different configurations

    2.1 Initializing the Data Center Configuration. If no data Center configuration is configured, the DEFAULT data Center is used

    2.2 Initializing the Eureka environment. If the eureka environment is not configured, the default environment is Test

Enter the method, view the corresponding source:

/ / positioning: eureka - core/SRC/main/Java/com/netflix/had been/EurekaBootStrap. Java
protected void initEurekaEnvironment(a) throws Exception {
    logger.info("Setting the eureka configuration..");

    // 1. It is important that:
    String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER);
    
    // 2. Other configurations. . }Copy the code

Enter theConfigurationManager.getConfigInstance(), can be seen:

/ / location: com.net flix. Config. ConfigurationManager. Java
public static AbstractConfiguration getConfigInstance(a) {
    if (instance == null) {
        synchronized (ConfigurationManager.class) {
            if (instance == null) { instance = getConfigInstance(Boolean.getBoolean(DynamicPropertyFactory.DISABLE_DEFAULT_CONFIG)); }}}return instance;
}
Copy the code

ConfigurationManager: Uses the double check and volatile singleton pattern for thread safety.

Go to the getConfigInstance() method:

/ / location: com.net flix. Config. ConfigurationManager. Java
private static AbstractConfiguration getConfigInstance(boolean defaultConfigDisabled) {
    if (instance == null && !defaultConfigDisabled) {
        / / 1. Create ConcurrentCompositeConfiguration
        instance = createDefaultConfigInstance();
        registerConfigBean();
    }
    return instance;
}

private static AbstractConfiguration createDefaultConfigInstance(a) {
    // Create the main configuration
    ConcurrentCompositeConfiguration config = new ConcurrentCompositeConfiguration();  
    
    // Load other configurations. .return config;
}
/ / location: com.net flix. Config. ConcurrentCompositeConfiguration. Java
// 1.1. When creating the main configuration, it is cleaned up first
public ConcurrentCompositeConfiguration(a) {
    clear();
}
public final void clear(a) {
    // 1.2 Release event
    fireEvent(EVENT_CLEAR, null.null.true); . . fireEvent(EVENT_CLEAR,null.null.false); . . }Copy the code

2) Initialize the contextinitEurekaServerContext()

The main process steps are as follows:

  1. Step 1: Loadeureka-serverConfiguration file
  2. Step 2: Initializeeureka-serverThe inside of theeureka-client(Used with otherseureka-serverNodes register and communicate)
  3. Step 3: Deal with registration related matters
  4. Step 4: ProcesspeerThe node
  5. Step 5: Finisheureka-serverContext (contextThe construction of)
  6. Step 6: Deal with some of the aftermath from the neighboringeurekaThe node copies registration information
  7. Step 7: Register all monitors
protected void initEurekaServerContext(a) throws Exception {
    // Step 1: Load eureka-server configuration file
    EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();

    // Step 2: Initialize eureka-client inside eureka-server
    // Register and communicate with other Eureka-server nodes.
    ApplicationInfoManager applicationInfoManager = null;

    // Step 3: Handle registration related matters
    PeerAwareInstanceRegistry registry;

    // Step 4: Process the peer related nodes
    PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes();

    // Step 5: Complete the construction of eureka-server context
    serverContext = new DefaultEurekaServerContext();

    // Step 6: Do some cleaning up and copy the registration information from the adjacent Eureka node
    int registryCount = registry.syncUp();
    registry.openForTraffic(applicationInfoManager, registryCount);

    // Step 7: Register all monitors
    EurekaMonitors.registerAllStats();
}
Copy the code

Enter the main process below:


Step 1: Loadeureka-serverConfiguration file

Loading eureka-server.properties

  1. Created a DefaultEurekaServerConfig object, create called when the init () method

    DefaultEurekaServerConfig provides access to configuration items of each method, by hard-coding configuration item name, gets the value of the configuration items for DynamicPropertyFactory The DynamicPropertyFactory is retrieved from the ConfigurationManager, so it also contains the values of all configuration items.

// 1. init()
private void init(a) {
    // Get the current environment: eureka. Environment
    String env = ConfigurationManager.getConfigInstance().getString(
        EUREKA_ENVIRONMENT, TEST);
    // Set the current property
    ConfigurationManager.getConfigInstance().setProperty(
        ARCHAIUS_DEPLOYMENT_ENVIRONMENT, env);
    // Load the eureka configuration file name: eureka-server
    String eurekaPropsFile = EUREKA_PROPS_FILE.get();
    // 2. Load configuration: eureka-server.properties
    ConfigurationManager.loadCascadedPropertiesFromResources(eurekaPropsFile);
}
Copy the code
  1. Load the configuration from Eureka-server. properties into a Properties object and put the configuration from the Properties object into the ConfigurationManager. Now you have all the configuration in ConfigurationManager.

    Eureka-server. properties is not configured by default, so the default values are read.


Step 2: Initializeeureka-serverThe inside of theeureka-client
ApplicationInfoManager applicationInfoManager = null;

if (eurekaClient == null) {
    // 1. Load the Eureka-client. properties configuration to the ConfigurationManager
    EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
        ? new CloudInstanceConfig()
        : new MyDataCenterInstanceConfig();

    // 2. InstanceInfo (the instance itself information of the service instance) is built during this time.
    // Build ApplicationInfoManager based on EurekaInstanceConfig and InstanceInfo
    // ApplicationInfoManager: Manages service instances
    applicationInfoManager = new ApplicationInfoManager(
        instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

    // 3. Create a configuration item for Eureka-client
    EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
    
    // 4. Create EurekaClient, of which DiscoveryClient is a subclass
    eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
    applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
Copy the code
  1. Load service instance-related configurations: Load the Eureka-client.properties configuration into the ConfigurationManager and supply EurekaInstanceConfig

    See PropertiesInstanceConfig for details

    Problem: This is not initializationeureka-server? Why loadeureka-client

    A Eureka-server is also a Eureka-client because it may register with other Eureka-servers to form a cluster of Eureka-Servers.

  2. ApplicationInfoManager is built based on EurekaInstanceConfig and InstanceInfo

  3. Example Create a Eureka-client configuration item

  4. Focus: CreateEurekaClient.DiscoveryClientIs its subclasses

    / / location: com.net flix. Discovery. DiscoveryClient. JavaDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { ... .// 1. Read EurekaClientConfig, including TransportConfig
        clientConfig = config;
        transportConfig = config.getTransportConfig();
    
        // 2. Save EurekaInstanceConfig and InstanceInfoinstanceInfo = myInfo; . .// 3. Check whether the registry needs to be captured
        if(config.shouldFetchRegistry()) {... . }// 3. Whether to register with Eureka
        if(config.shouldRegisterWithEureka()) {... . }try {
            // 4. Support scheduling thread pool, core thread number 2, using Google toolkit thread factoryscheduler = Executors.newScheduledThreadPool(... ...). ;The number of core threads is 1, the default maximum number of threads is 5, and the thread lifetime is 0
            heartbeatExecutor = newThreadPoolExecutor(... ...). ;Support cache refresh thread pool: 1 core thread, default maximum number of threads 5, and thread lifetime 0
            cacheRefreshExecutor = newThreadPoolExecutor(... ...). ;// create EurekaTransport:
            // A component that supports network communication between the underlying Eureka-client and eureka-server
            eurekaTransport = newEurekaTransport(); . . }// 8. Fetch the registry
        // The code is not well written
        if(clientConfig.shouldFetchRegistry() && ! fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }
    
        // 9. Initialize the scheduling taskinitScheduledTasks(); . . }Copy the code

    The steps are as follows:

    1. readEurekaClientConfig, includingTransportConfig
    2. saveEurekaInstanceConfigInstanceInfo
    3. Whether to register and crawl the registry, if not, release some resources
    4. Thread pools that support scheduling
    5. Thread pools that support heartbeats
    6. Thread pools that support cache flushing
    7. createEurekaTransport: Supports low-leveleureka-clienteureka-serverA component that communicates with a network
    8. Fetching the registry
    9. Example Initialize the scheduling task:
      1. If you want to crawl the registry, a scheduled task will be registered and executed in the scheduled thread pool according to the set crawl interval (default is 30 seconds)CacheRefreshThread
      2. If you want toeureka-serverTo register, start a scheduled task, send heartbeat at regular intervals, executeHeartbeatThread
      3. A service instance copy propagator is created to schedule itself as a scheduled task
      4. A listener for a service instance’s state change is created, and if configured, it is registered
Step 3: Deal with registration related matters
// Registry: the service instance registry aware of the Eureka-Server cluster
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
    registry = newAwsInstanceRegistry(... ...). ; . . }else {
    / / implementation class
    registry = new PeerAwareInstanceRegistryImpl(
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        eurekaClient
    );
}
Copy the code
Step 4: ProcesspeerThe node
PeerEurekaNodes: Represents the Eureka Server cluster
Peers is a cluster of peers. Peers is an instance of a cluster of peersPeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(... ...). ;Copy the code

There is a line comment on the initialization method that shows what peer means:

/**
Initializes Eureka, including syncing up with other Eureka peers and publishing the registry.
*/
Copy the code
Step 5: Finisheureka-serverContext construction
1. Create a context
serverContext = newDefaultEurekaServerContext(.. ...). ;// 2. Put the context into the holder for other use
EurekaServerContextHolder.initialize(serverContext);
// 3. Initialize context
serverContext.initialize();
Copy the code

Servercontext.initialize (); :

public void initialize(a) throws Exception {
    logger.info("Initializing ...");
    // 1. Used to update the eureka-server cluster information
    // Make the current Eureka-server aware of other Eureka-servers
    // Principle: There is a scheduled scheduling task in the background to periodically update the eureka-server cluster information
    peerEurekaNodes.start();
    // 2. Initialize the registry
    // Fetch the registry information of all Eureka-servers in the eureka-server cluster and store it in the local registry
    registry.init(peerEurekaNodes);
    logger.info("Initialized");
}
Copy the code
Step 6: Deal with the aftermath from the adjacenteurekaThe node copies registration information
// Copy the registry information from a neighboring Eureka-server node
// If the copy fails, go to the next one
int registryCount = registry.syncUp();
Copy the code
Step 7: Register all monitors
EurekaMonitors.registerAllStats();
Copy the code





(2)eureka-serverHow do I complete service registration?

Starting from the unit test, step by step to see eurek-Server complete service registration.

Environment preparation, as shown in figure:

Copy eureka-client.properties of resources in Eureka-server to Resources in Eureka-core

Then you can DEBUG happily, locating the code as follows:

Eureka-core module, test package:
// com.netflix.eureka.resources.ApplicationResourceTest.java
@Test
public void testGoodRegistration(a) throws Exception {
    // 1. Generate a service instance
    InstanceInfo noIdInfo = InstanceInfoGenerator.takeOne();
    // 2. Send an HTTP request - add an instance
    Response response = applicationResource.addInstance(noIdInfo, false+"");
    assertThat(response.getStatus(), is(204));
}
Copy the code

InstanceInfo, the main data in the service instance:

  • hostname(hostname),ipAddress, port number,urladdress
  • lease(Lease) information: heartbeat interval, last heartbeat time, service registration time, service start time


Soul question: to put it bluntly, register for what?

  1. Tell the registry,clientThe information of
  2. Synchronize the registry information to the local cache


Main concern step 2 🙁ApplicationResource.addInstance())

/ / location:
public Response addInstance(InstanceInfo info,
              @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    // Validates a bunch of InstanceInfo parameters. .// Handle data center, region, etc. .// Actually register and check if it is cluster mode
    registry.register(info, "true".equals(isReplication));

    // On success, HTTP status code 204 is returned
    return Response.status(204).build();
}
Copy the code

Register with the following code:

/ / location: com.net flix. Eureka. Registry. PeerAwareInstanceRegistryImpl. Java
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if(info.getLeaseInfo() ! =null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    
    // 1. Invoke the parent method to register
    super.register(info, leaseDuration, isReplication);
    
    // 2. Register with peer
    replicateToPeers(Action.Register, info.getAppName(), 
                     info.getId(), info, null, isReplication);
}
Copy the code
  1. registeredAbstractInstanceRegistry.register()
/ / location:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        // With read locks, multiple service instances can be registered simultaneously
        read.lock();
        
        // Get the Map by AppNameMap<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); . .// Get the lease for the service instance from gMap with instanceId. . }finally {
        // Release the read lockread.unlock(); }}Copy the code

Memory registry, that is:Map<String, Lease<InstanceInfo>>

Instance information for each service is savedCopy the code





(3)eureka-serverMulti-level caching of the registry

The code is as follows:

/ / location: com.net flix. Eureka. Resources. ApplicationResource. Java
public Response getApplication(...). {... .// Build the cache key
    Key cacheKey = new Key(
        Key.EntityType.Application,
        appName,
        keyType,
        CurrentRequestVersion.get(),
        EurekaAccept.fromString(eurekaAccept)
    );

    // Important: caching mechanism
    String payLoad = responseCache.get(cacheKey);
    
    / / return
}
Copy the code

Enter responseCache. Get (cacheKey) with the following code:

/ / location: com.net flix. Eureka. Registry. ResponseCacheImpl. Java
String get(final Key key, boolean useReadOnlyCache) {
    // Important, get the value
    Value payload = getValue(key, useReadOnlyCache);
    if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
        return null;
    } else {
        returnpayload.getPayload(); }}// Important: Multi-level cache
Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
            // 1. Get it from the read-only cache
            final Value currentPayload = readOnlyCacheMap.get(key);
            if(currentPayload ! =null) {
                payload = currentPayload;
            } else {
                It is not in the read-only cache. It is retrieved from the read/write cache
                payload = readWriteCacheMap.get(key);
                // And put the data in the read-only cachereadOnlyCacheMap.put(key, payload); }}else {
            // 2. Fetch from the read/write cachepayload = readWriteCacheMap.get(key); }}catch (Throwable t) {
        logger.error("Cannot get value for key :" + key, t);
    }
    return payload;
}
Copy the code

To sum up, as shown in the figure:





(4)eureka-serverRegistry multi-level cache expiration mechanism

Expiration mechanisms fall into two categories:

  1. Active Expiration: The active notification expires when a service instance is registered, offline, or faulty
  2. Timed expiration:
    1. Timer: scheduled scheduling
    2. Expiration judgment


Read-only cache and read/write cache expire.

  1. Read-only cache (readOnlyCacheMap)

    1. Execute a timing scheduler (TimeTask), default 30 seconds.
    2. rightreadOnlyCacheMapreadWriteCacheMapThe data in aHashThe comparison:
      • Inconsistent, willreadWriteCacheMapData synchronization toreadOnlyCacheMapIn the.
/ / location: com.net flix. Eureka. Registry. ResponseCacheImpl. Java
private final java.util.Timer timer = new java.util.Timer("Eureka-CacheFillTimer".true);

if (shouldUseReadOnlyResponseCache) {
    timer.schedule(getCacheUpdateTask(),
   new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)  + responseCacheUpdateIntervalMs),
                   responseCacheUpdateIntervalMs);
}
Copy the code
  1. Read and write cache (readWriteCacheMap)

There are two kinds of expiration:

  1. Cache component expired, usingguavacache:
/ / location: com.net flix. Eureka. Registry. ResponseCacheImpl. Java
Read/write cache: use Guava's cache
// Expiration policy: Active expiration
private final LoadingCache<Key, Value> readWriteCacheMap = 
    CacheBuilder.newBuilder()
    .initialCapacity(1000)
    .expireAfterWrite(180, TimeUnit.SECONDS) // Expiration time: 180 seconds by default.Copy the code
  1. Active notification: when service instances change, register, go offline, or fail
// For example, when registering
/ / location: com.net flix. Eureka. Registry. AbstractInstanceRegistry. Java
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {... .// This cache is updated when the service instance registration information changesinvalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); . . }Copy the code

Summary, as shown in the figure:





(5) Automatic fault perception and automatic removal of service instances

eureka-serverBy the heartbeateureka-clientWhether or not they are alive.

If no heartbeat is received within a certain period of time, the Eureka-client is considered to be down. In this case, the corresponding client status is modified and removed.

If Eureka wanted to be able to detect the heartbeat on a regular basis, there would have to be a regular task to judge.

When eureka-server is started, automatic awareness is enabled. The code is as follows:

/ / location: com.net flix. Eureka. EurekaBootStrap. Java
// 1. Open the registry when initializing Eureka
protected void initEurekaServerContext(a) throws Exception {... .// Open the registry and receive the requestregistry.openForTraffic(applicationInfoManager, registryCount); . . }/ / location: com.net flix. Eureka. Registry. PeerAwareInstanceRegistryImpl. Java
/ / 2.
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {... ./ / key points:
    super.postInit();
}


/ / location: com.net flix. Eureka. Registry. AbstractInstanceRegistry. Java
/ / 3.
protected void postInit(a) {
    // 1. Start the scheduler: run every 60 seconds and count the heartbeat times within one minuterenewsLastMin.start(); . .// 2. By default, the service instance is scheduled every 60 seconds to determine whether the lease of the service instance has expired
    evictionTimer.schedule(new EvictionTask(), 60.60);
}
Copy the code

Next, analyze the new EvictionTask() scheduling task:

/ / location: com.net flix. Eureka. Registry. AbstractInstanceRegistry. Java
class EvictionTask extends TimerTask {
    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
    @Override
    public void run(a) {
        try {
            // Get the compensation time
            long compensationTimeMs = getCompensationTimeMs();
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e); }}// 1. Obtain the compensation time
    // Why compensate for time: maybe there was a JVM GC problem or a system clock problem that didn't execute the task exactly 60 seconds
    long getCompensationTimeMs(a) {
        // Get the current time
        long currNanos = getCurrentTimeNano();
        // getAndSet: sets the new value and returns the old value
        / / such as:
        // The first time: 10:00:00, 10:00:00 will be set to lastExecutionNanosRef
        // Second time: 10:01:00 set to lastExecutionNanosRef
        long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
        if (lastNanos == 0l) {
            return 0l;
        }

        // Number of past seconds = 10:01:00-10:00:00 = 60s
        long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);

        // Use this 60s - configured 60s = 0s
        long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
        return compensationTime <= 0l ? 0l : compensationTime;
    }

    long getCurrentTimeNano(a) {
        returnSystem.nanoTime(); }}// 2
public void evict(long additionalLeaseMs) {
    // Whether to delete the faulty service instance
    if(! isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled.");
        return;
    }
    // 2.1. List of expired service instances to determine whether they are expired
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry 
         : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if(leaseMap ! =null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();

                // Whether the lease has expired
                // Determine the lease for each service instance if the last heartbeat time of a service instance is now
                // If the value exceeds 90 × 2 = 180s, the service instance is considered expired and faulty
                if(lease.isExpired(additionalLeaseMs) && lease.getHolder() ! =null) { expiredLeases.add(lease); }}}}// 2.2. Calculate the removal task: Do not remove too many service instances at once
    // Suppose there are 20 service instances and 6 service instances are unavailable.. .// Calculate: can remove 3

    if (toEvict > 0) {
        // There are 6 service instances to be removed, but a maximum of 3 service instances can be removed
        // Out of the 6 service instances, 3 service instances are randomly selected to remove
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Select at random
            int next = i + random.nextInt(expiredLeases.size() - i);
            / /... .

            // Remove: call the service offline method
            internalCancel(appName, id, false); }}}Copy the code

The key,Eureka BUG

Determine if lease has expired: isExpired()

/ / location: com.net flix. Eureka. Lease. Lease. Java
/** Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to + duration more than what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will not be fixed. */
public boolean isExpired(long additionalLeaseMs) {

    // Whether the current time is greater than the last heartbeat time + 90s(renewal duration) + 92s(compensation time)
    return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
/ / example:
// Current time: 20:02:32
// Judgment: 20:02:32 > 19:56:20 + 90 + 92s = 19:59:32

If the current time is more than 90s later than the last heartbeat, it means that the heartbeat has not been updated in the 90s
// The server instance has not updated its heartbeat in the 90s
// It is assumed that the service instance is down

// 19:55:00 -> If there is no heartbeat at 19:56:30, the service instance is considered faulty
// 19:56:30 -> If there is no heartbeat at 19:56:30, the service instance is not considered down
// 20:00:00 -> 19:56:30 + 90s = 19:58:00

// Therefore, we need to wait for 2 90s, when there is no heartbeat, the service instance will be considered dead, and the service instance will be offline, 90s, 180s
// If there is no heartbeat for 3 minutes, the service instance is considered down and offline

// There is a bug in the source code, 90 * 2 = 180s, then the service instance will go offline
// If the multilevel cache fails, synchronization will take 30 seconds, and the service will re-fetch the incremental registry for 30 seconds

// After a service instance has failed, it may take several minutes, 4-5 minutes, for other services to sense it
Copy the code





(6)eureka-serverSelf-protection mechanism for network faults

At some point in time, a large number of service instances expire (no heartbeat is received), theneuerka-serverDo I want to remove all of these instances?

The answer is: No Because there may be a network fault (network partition) on the machine where the Eureka-Server resides, which causes that the eureka-server cannot receive the heartbeat from the client.

The general plan is as follows:

When deleting the service, it will judge whether to enable the service self-protection, and the code is as follows:

/ / location: com.net flix. Eureka. Registry. AbstractInstanceRegistry. Java
public void evict(long additionalLeaseMs) {
    // Whether to delete the faulty service instance
    if(! isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled.");
        return; }}public boolean isLeaseExpirationEnabled(a) {
    // The default is true
    if(! isSelfPreservationModeEnabled()) {// The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    // Dynamically determine whether to enable self-protection mechanism:
    / / numberOfRenewsPerMinThreshold expect heartbeat count: the expected service instance 1 minute How many heartbeat sending
    GetNumOfRenewsInLastMin () Actual heartbeats: The number of heartbeats sent by all service instances in the last minute
    // Expected heartbeats < actual heartbeats: return true to clean up the service instance
    // Expected heartbeats > Actual heartbeats: returns false and does not clean up the service instance
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
Copy the code

Why count the heartbeat of all service instances? If a service sends heartbeats frequently, isn’t this mechanism broken?


According to the code above, there are still two problems:

  1. If you calculate the expected number of heartbeats per minute (numberOfRenewsPerMinThreshold)?
  2. How are the actual heartbeats calculated in the last minute?

1) How to calculate the expected number of heartbeats per minute?

Calculate the expected number of heartbeats per minute when:

  1. When initializing, give a default value

  2. Schedule update

  3. Service instance status changed: registered, offline, faulty

  4. At initialization, the default value is given. EurekaBootStrap is the class to initiate initialization with a line of registry. OpenForTraffic (to enable fault checking) code.

/ / location: com.net flix. Eureka. EurekaBootStrap. Java
protected void initEurekaServerContext(a) throws Exception {... .// Start a background thread task that periodically checks if the service instance is downregistry.openForTraffic(applicationInfoManager, registryCount); . . }/ / location: com.net flix. Eureka. Registry. PeerAwareInstanceRegistryImpl. Java
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {

    // Heartbeat takes 30 seconds. Expected heartbeat per minute = Number of registrations x 2
    this.expectedNumberOfRenewsPerMin = count * 2;
    // Minimum expected heartbeat per minute = Expected heartbeat per minute x 2
    this.numberOfRenewsPerMinThreshold =
        (int) (this.expectedNumberOfRenewsPerMin * 0.85); . . }Copy the code

It’s not good to hardcode count * 2 here. If the heartbeat interval is changed, this is a BUG.

  1. Timing scheduler
/ / location: com.net flix. Eureka. Registry. PeerAwareInstanceRegistryImpl. Java
private Timer timer = new Timer(
            "ReplicaAwareInstanceRegistry - RenewalThresholdUpdater".true);
private void scheduleRenewalThresholdUpdateTask(a) {
    // Run every 15 minutes
    timer.schedule(new TimerTask() {
        @Override
        public void run(a) { updateRenewalThreshold(); }},15 * 60 * 1000.15 * 60 * 1000);
}

// Do the following:
private void updateRenewalThreshold(a) {
    try {
        // Synchronize, pull the registry from other Eureka-servers. .synchronized (lock) {
            // Calculate again according to the pulled registry
            if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
                || (!this.isSelfPreservationModeEnabled())) {
                this.expectedNumberOfRenewsPerMin = count * 2;
                this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold()); }}... . }catch(Throwable e) { ... . }}Copy the code
  1. Service instance status changed: registered, offline, faulty
// 1
/ / location: com.net flix. Eureka. Registry. AbstractInstanceRegistry. Java
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {... .synchronized (lock) {
        if (this.expectedNumberOfRenewsPerMin > 0) {
            // Once a service instance is registered, + 2
            // Indicates a heartbeat interval of 30 seconds
            this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
            // Expected heartbeat per minute × 0.85
            this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * 0.85); }}... . }// 2
/ / location: com.net flix. Eureka. Registry. PeerAwareInstanceRegistryImpl. Java
public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        // Synchronize to other servers
        replicateToPeers(Action.Cancel, appName, id, null.null, isReplication);
        synchronized (lock) {
            if (this.expectedNumberOfRenewsPerMin > 0) {
                // If a service goes offline, then -2
                this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
                this.numberOfRenewsPerMinThreshold =
                    (int) (this.expectedNumberOfRenewsPerMin * 0.85); }}return true;
    }
    return false;
}


// 3
// It is a BUG
Copy the code


2) How is the actual heartbeat count in the last minute calculated?

Renew ()

/ / location: com.net flix. Eureka. Registry. AbstractInstanceRegistry. Java
private final MeasuredRate renewsLastMin;
public boolean renew(String appName, String id, boolean isReplication) {... .// Heartbeat count + 1
    renewsLastMin.increment();
}

/ / location: com.net flix. Eureka. Util. MeasuredRate. Java
private final AtomicLong lastBucket = new AtomicLong(0);  // Last minute statistics
private final AtomicLong currentBucket = new AtomicLong(0); // Current minute statistics
this.timer = new Timer("Eureka-MeasureRateTimer".true);
public synchronized void start(a) {
    if(! isActive) { timer.schedule(new TimerTask() {
            @Override
            public void run(a) {
                try {
                    // Once every minute (60s)
                    // Save the number of times in one minute, and update this time to 0
                    lastBucket.set(currentBucket.getAndSet(0));
                } catch (Throwable e) {
                    logger.error("Cannot reset the Measured Rate", e);
                }
            }
        }, sampleInterval, sampleInterval);

        isActive = true; }}public void increment(a) {
    currentBucket.incrementAndGet();
}
Copy the code





(7) Cluster mechanism – registry synchronization and high availability

Eureka-server enables high cluster availability: it is required to register with each other and then synchronize the list of service instances with each other.

The peer to peer mode is different from the master-slave mode.

The general plan is as follows:

Registry synchronization, timing is divided into:

  1. eureka-serverInitialization time
  2. Synchronize when there is service registration, offline, and heartbeat


  1. eureka-serverInitialization time

When eureka-server is started, the peer node will be processed with the following code:

/ / location: com.net flix. Eureka. EurekaBootStrap. Java
protected void initEurekaServerContext(a) throws Exception {... .// Step 4: Process the peer related nodesPeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes( registry, eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, applicationInfoManager ); . .// Step 6: Copy the registry information from a neighboring Eureka-server node
    int registryCount = registry.syncUp();
}

/ / location: com.net flix. Eureka. Cluster. PeerEurekaNodes. Java
// Periodically updates the eureka-server cluster information
public void start(a) {... .try {
        // The url in the configuration file to refresh the eureka-server list
        updatePeerEurekaNodes(resolvePeerUrls());
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run(a) {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e); }}};// Every 10 minutes
        private ScheduledExecutorService taskExecutor;
        taskExecutor.scheduleWithFixedDelay(
            peersUpdateTask,
            serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
            serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
            TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL: "+ node.getServiceUrl()); }}@Override
public int syncUp(a) {
    // Copy entire entry from neighboring DS node
    int count = 0;
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); 
         i++) {
        if (i > 0) {
            try {
                // If you do not get the registry from your local eureka-client for the first time
                The local Eureka-client has not fetched the registry from any other Eureka-server
                // So try again now and wait 30 seconds
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break; }}// Take all the registries and process (registration). . }return count;
}
Copy the code


  1. Synchronization in case of service registration, offline, heartbeat, failure
/ / 1. Registration
/ / location: com.net flix. Eureka. Registry. PeerAwareInstanceRegistryImpl. Java
public void register(final InstanceInfo info, final boolean isReplication) {... ./ / register
    super.register(info, leaseDuration, isReplication);
    // Synchronize registration information to peer
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), 
                     info, null, isReplication);
}


/ / 2. Offline
@Override
public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        // Synchronize registration information to peer
        replicateToPeers(Action.Cancel, appName, id, null.null, isReplication); . .return true;
    }
    return false;
}


/ / 3. The heartbeat
public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) {
        // Synchronize registration information to peer
        replicateToPeers(Action.Heartbeat, appName, id, null.null, isReplication);
        return true;
    }
    return false;
}
Copy the code

The replicateToPeers() method performs the following action:

/ / location: com.net flix. Eureka. Registry. PeerAwareInstanceRegistryImpl. Java
private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            caseCancel: ... .caseHeartbeat: ... .caseRegister: ... .caseStatusUpdate: ... .caseDeleteStatusOverride: ... . }}catch(Throwable t) { ... . }}Copy the code





(8) Registry synchronization between clusters – Layer 3 queue task processing mechanism

Eureka-server batch processing mechanism for synchronization tasks.

Key points:

  1. Mechanism for cluster synchronization:eureka-clienteureka-serverSend the request, thiseureka-serverThe request is synchronized to all the otherseureka-serverGet up there. The resteureka-serverWill only execute locally (isReplicationCheck), will not be synchronized again.
  2. An asynchronous batch mechanism for data synchronization: three queues, the first queue (acceptorQueue) : is pure write; Second queue (processingOrder) : is used to split queues according to time and size; The third queue (batchWorkQueue) : Used to place batch tasks (asynchronous batch mechanism)


For example, peerEureanode.register (InstanceInfo info) is called after registration.

The code is as follows:

/ / location: com.net flix. Eureka. Cluster. PeerEurekaNode. Java
private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
// 1. Receive the request and encapsulate it into a task
public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(
        // Encapsulate the task. ...). ; }/ / enter batchingDispatcher. The process ()
/ / location: com.net flix. Eureka. Util. Batcher. TaskDispatchers. Java
// 2. Process tasks
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(...). {... .return new TaskDispatcher<ID, T>() {
        @Override
        public void process(ID id, T task, long expiryTime) {
            // Receive queue, pure write
            acceptorExecutor.process(id, task, expiryTime);
        }

        @Override
        public void shutdown(a) { acceptorExecutor.shutdown(); taskExecutor.shutdown(); }}; }/ / location: com.net flix. Eureka. Util. Batcher. AcceptorExecutor. Java
// 3. Receive the task and process it
class AcceptorExecutor<ID.T> {... ./ / processing
    void process(ID id, T task, long expiryTime) {
        // Put the task into the queue
        acceptorQueue.add(newTaskHolder<ID, T>(id, task, expiryTime)); acceptedTasks++; }... . }// create a background thread
class AcceptorExecutor<ID.T> {
    private final Thread acceptorThread;
    AcceptorExecutor(...) {
        // Create a thread
        ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
        this.acceptorThread = new Thread(threadGroup, 
                                         new AcceptorRunner(), "TaskAcceptor-" + id);
        this.acceptorThread.setDaemon(true);
        this.acceptorThread.start(); }}// 5. Make a batch. By default, 3 batches are processed
class AcceptorExecutor<ID.T> {
    void assignBatchWork(a) {... . batchWorkQueue.add(holders); . . }}/ / location: com.net flix. Eureka. Cluster. PeerEurekaNode. Java
// 6. Process batch requests
ReplicationTaskProcessor taskProcessor = 
    new ReplicationTaskProcessor(targetHost, replicationClient);
Copy the code

Summary: