
This article focuses on Eureka’s TaskDispatcher


public class PeerEurekaNode { public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) { this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS); } /* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config, int batchSize, long maxBatchingDelayMs, long retrySleepTimeMs, long serverUnavailableSleepTimeMs) { this.registry = registry; this.targetHost = targetHost; this.replicationClient = replicationClient; this.serviceUrl = serviceUrl; this.config = config; this.maxProcessingDelayMs = config.getMaxTimeForReplication(); String batcherName = getBatcherName(); ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient); this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher( batcherName, config.getMaxElementsInPeerReplicationPool(), batchSize, config.getMaxThreadsForPeerReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor ); this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher( targetHost, config.getMaxElementsInStatusReplicationPool(), config.getMaxThreadsForStatusReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor ); } / /... }Copy the code
  • statusUpdate
/** * Send the status information of of the ASG represented by the instance. * * <p> * ASG (Autoscaling group) names are  availablefor instances in AWS and the
     * ASG information is used for determining if the instance should be
     * registered as {@link InstanceStatus#DOWN} or {@link InstanceStatus#UP}.
     * @param asgName
     *            the asg name ifany of this instance. * @param newStatus * the new status of the ASG. */ public void statusUpdate(final String asgName, final ASGStatus newStatus) { long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs; nonBatchingDispatcher.process( asgName, new AsgReplicationTask(targetHost, Action.StatusUpdate, asgName, newStatus) { public EurekaHttpResponse<? >execute() {
                        return replicationClient.statusUpdate(asgName, newStatus);
Submit tasks to nonBatchingDispatcher

  • cancel
    public void cancel(final String appName, final String id) throws Exception {
        long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
                taskId("cancel", appName, id),
                new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.cancel(appName, id);

                    public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                        super.handleFailure(statusCode, responseEntity);
                        if (statusCode == 404) {
                            logger.warn("{}: missing entry.", getTaskName());
Methods like Cancel are submitted to batchingDispatcher


Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/cluster/

 * Base class for all replication tasks.
abstract class ReplicationTask {

    private static final Logger logger = LoggerFactory.getLogger(ReplicationTask.class);

    protected final String peerNodeName;
    protected final Action action;

    ReplicationTask(String peerNodeName, Action action) {
        this.peerNodeName = peerNodeName;
        this.action = action;

    public abstract String getTaskName();

    public Action getAction() {
        returnaction; } public abstract EurekaHttpResponse<? > execute() throws Throwable; public voidhandleSuccess() {

    public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
It is the base class for all replication tasks


Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/cluster/

 * Base {@link ReplicationTask} class forinstance related replication requests. * * @author Tomasz Bak */ public abstract class InstanceReplicationTask extends ReplicationTask { /** * For cancel request there may be no InstanceInfo object available so we need to store app/id pair  * explicitly. */ private final String appName; private final String id; private final InstanceInfo instanceInfo; private final InstanceStatus overriddenStatus; private final boolean replicateInstanceInfo; / /... }Copy the code

Replication tasks related to instance, PeerEurekaNode uses InstanceReplicationTask for register, heartbeat, statusUpdate, deleteStatusOverride, and Cancel. Where statusUpdate is submitted to nonBatchingDispatcher, all other submissions are submitted to batchingDispatcher


Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/util/batcher/

 * Task dispatcher takes task from clients, and delegates their execution to a configurable number of workers.
 * The task can be processed one at a time or in batches. Only non-expired tasks are executed, and if a newer
 * task with the same id is scheduled for execution, the old one is deleted. Lazy dispatch of work (only on demand)
 * to workers, guarantees that data are always up to date, and no stale task processing takes place.
 * <h3>Task processor</h3>
 * A client of this component must provide an implementation of {@link TaskProcessor} interface, which will do
 * the actual work of task processing. This implementation must be thread safe, as it is called concurrently by
 * multiple threads.
 * <h3>Execution modes</h3>
 * To create non batched executor call {@link TaskDispatchers#createNonBatchingTaskDispatcher(String, int, int, long, long, TaskProcessor)}
 * method. Batched executor is created by {@link TaskDispatchers#createBatchingTaskDispatcher(String, int, int, int, long, long, TaskProcessor)}.
 * @author Tomasz Bak
public interface TaskDispatcher<ID, T> {

    void process(ID id, T task, long expiryTime);

    void shutdown();
The TaskDispatcher is for task dispatch, and the most important thing is that only tasks that have not expired will be executed, and if there is a newer task dispatch with the same ID, the old one will be deleted. There are two types of TaskDispatcher: nonBatchingDispatcher and batchingDispatcher.


Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/util/batcher/

public class TaskDispatchers {

    public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
                                                                                int maxBufferSize,
                                                                                int workerCount,
                                                                                long maxBatchingDelay,
                                                                                long congestionRetryDelayMs,
                                                                                long networkFailureRetryMs,
                                                                                TaskProcessor<T> taskProcessor) {
        final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
                id, maxBufferSize, 1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
        final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
        return new TaskDispatcher<ID, T>() {
            public void process(ID id, T task, long expiryTime) {
                acceptorExecutor.process(id, task, expiryTime);

            public void shutdown() { acceptorExecutor.shutdown(); taskExecutor.shutdown(); }}; } public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor<T> taskProcessor) { final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs ); final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);return new TaskDispatcher<ID, T>() {
            public void process(ID id, T task, long expiryTime) {
                acceptorExecutor.process(id, task, expiryTime);

Two factory methods are provided to create a nonBatchingDispatcher and a batchingDispatcher. For the former, maxBatchingSize = 1 and TaskExecutors = singleItemExecutors; MaxBatchingSize set by the constructor (default: 250) and TaskExecutors (batchExecutors method).


Eureka – core – 1.8.8 – sources jar! /com/netflix/eureka/util/batcher/

    private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
    private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();

    void process(ID id, T task, long expiryTime) {
        acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));

    void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) {
        replayedTasks += holders.size();

    void reprocess(TaskHolder<ID, T> taskHolder, ProcessingResult processingResult) {
Process is placed in the acceptorQueue, and reProcess is placed in the reprocessQueue


    ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
    this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);

    class AcceptorRunner implements Runnable {
        public void run() {
            long scheduleTime = 0;
            while(! isShutdown.get()) { try { drainInputQueues(); int totalItems = processingOrder.size(); long now = System.currentTimeMillis();if (scheduleTime < now) {
                        scheduleTime = now + trafficShaper.transmissionDelay();
                    if (scheduleTime <= now) {

                    // If no worker is requesting data or there is a delay injected by the traffic shaper,
                    // sleep for some time to avoid tight loop.
                    if (totalItems == processingOrder.size()) {
                } catch (InterruptedException ex) {
                    // Ignore
                } catch (Throwable e) {
                    // Safe-guard, so we never exit this loop in an uncontrolled way.
This will drainInputQueues, then assignBatchWork, assignSingleItemWork


        private void drainInputQueues() throws InterruptedException {
            do {

                if(! isShutdown.get()) { // If all queues are empty, blockfor a while on the acceptor queue
                    if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                        TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                        if(taskHolder ! = null) { appendTaskHolder(taskHolder); }}}}while(! reprocessQueue.isEmpty() || ! acceptorQueue.isEmpty() || pendingTasks.isEmpty()); }Copy the code

DrainReprocessQueue and drainAcceptorQueue are called here

  • drainAcceptorQueue
        private void drainAcceptorQueue() {
            while(! acceptorQueue.isEmpty()) { appendTaskHolder(acceptorQueue.poll()); } } private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {if (isFull()) {
            TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
            if (previousTask == null) {
            } else{ overriddenTasks++; }}Copy the code

Remove the tasks from the acceptorQueue and place them in the pendingTasks queue

  • drainReprocessQueue
        private void drainReprocessQueue() {
            long now = System.currentTimeMillis();
            while(! reprocessQueue.isEmpty() && ! isFull()) { TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast(); ID id = taskHolder.getId();if (taskHolder.getExpiryTime() <= now) {
                } else if (pendingTasks.containsKey(id)) {
                } else{ pendingTasks.put(id, taskHolder); processingOrder.addFirst(id); }}if(isFull()) { queueOverflows += reprocessQueue.size(); reprocessQueue.clear(); }}Copy the code

Take the tasks out of the reprocessQueue and place them in pendingTasks if they are not expired and have a duplicate ID, and processingOrder.addFirst(ID)

assign work

        void assignSingleItemWork() {
            if(! processingOrder.isEmpty()) {if (singleItemWorkRequests.tryAcquire(1)) {
                    long now = System.currentTimeMillis();
                    while(! processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id);if (holder.getExpiryTime() > now) {

        void assignBatchWork() {
            if (hasEnoughTasksForNextBatch()) {
                if (batchWorkRequests.tryAcquire(1)) {
                    long now = System.currentTimeMillis();
                    int len = Math.min(maxBatchingSize, processingOrder.size());
                    List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                    while(holders.size() < len && ! processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id);if (holder.getExpiryTime() > now) {
                        } else{ expiredTasks++; }}if (holders.isEmpty()) {
PendingTasks is placed in singleItemWorkQueue or batchWorkQueue by priority


    abstract static class WorkerRunnable<ID, T> implements Runnable {
        final String workerName;
        final AtomicBoolean isShutdown;
        final TaskExecutorMetrics metrics;
        final TaskProcessor<T> processor;
        final AcceptorExecutor<ID, T> taskDispatcher;

        WorkerRunnable(String workerName,
                       AtomicBoolean isShutdown,
                       TaskExecutorMetrics metrics,
                       TaskProcessor<T> processor,
                       AcceptorExecutor<ID, T> taskDispatcher) {
            this.workerName = workerName;
            this.isShutdown = isShutdown;
            this.metrics = metrics;
            this.processor = processor;
            this.taskDispatcher = taskDispatcher;

        String getWorkerName() {
Defines the basic Runnable classes


    private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();

    static class SingleTaskWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {

        SingleTaskWorkerRunnable(String workerName,
                                 AtomicBoolean isShutdown,
                                 TaskExecutorMetrics metrics,
                                 TaskProcessor<T> processor,
                                 AcceptorExecutor<ID, T> acceptorExecutor) {
            super(workerName, isShutdown, metrics, processor, acceptorExecutor);

        public void run() {
            try {
                while(! isShutdown.get()) { BlockingQueue<TaskHolder<ID, T>> workQueue = taskDispatcher.requestWorkItem(); TaskHolder<ID, T> taskHolder;while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) {
                        if (isShutdown.get()) {
                    if(taskHolder ! = null) { ProcessingResult result = processor.process(taskHolder.getTask()); switch (result) {case Success:
                            case Congestion:
                            case TransientError:
                                taskDispatcher.reprocess(taskHolder, result);
                            case PermanentError:
                                logger.warn("Discarding a task of {} due to permanent error", workerName);
                        metrics.registerTaskResult(result, 1);
            } catch (InterruptedException e) {
                // Ignore
            } catch (Throwable e) {
                // Safe-guard, so we never exit this loop in an uncontrolled way.
TaskHolder<ID, T>> Poll task from singleItemWorkQueue


   private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();

   static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {

        BatchWorkerRunnable(String workerName,
                            AtomicBoolean isShutdown,
                            TaskExecutorMetrics metrics,
                            TaskProcessor<T> processor,
                            AcceptorExecutor<ID, T> acceptorExecutor) {
            super(workerName, isShutdown, metrics, processor, acceptorExecutor);

        public void run() {
            try {
                while(! isShutdown.get()) { List<TaskHolder<ID, T>> holders = getWork(); metrics.registerExpiryTimes(holders); List<T> tasks = getTasksOf(holders); ProcessingResult result = processor.process(tasks); switch (result) {case Success:
                        case Congestion:
                        case TransientError:
                            taskDispatcher.reprocess(holders, result);
                        case PermanentError:
                            logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
                    metrics.registerTaskResult(result, tasks.size());
            } catch (InterruptedException e) {
                // Ignore
            } catch (Throwable e) {
                // Safe-guard, so we never exit this loop in an uncontrolled way.
                logger.warn("Discovery WorkerThread error", e);

        private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
            BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
            List<TaskHolder<ID, T>> result;
            do {
                result = workQueue.poll(1, TimeUnit.SECONDS);
            } while(! isShutdown.get() && result == null);return (result == null) ? new ArrayList<>() : result;

        private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
            List<T> tasks = new ArrayList<>(holders.size());
            for (TaskHolder<ID, T> holder : holders) {
List<TaskHolder<ID, T>> poll task from batchWorkQueue

The logic for ProcessingResult is the same for both, as follows:

                    switch (result) {
                        case Success:
                        case Congestion:
                        case TransientError:
                            taskDispatcher.reprocess(holders, result);
                        case PermanentError:
                            logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
Requeue retry for Congestion and TransientError, log warn for PermanentError.


Eureka designs TaskDispatcher by himself, which is divided into nonBatchingDispatcher and batchingDispatcher.

Scheduling tasks are instancereplicationTasks inherited from ReplicationTask, which define basic properties. But public Abstract String getTaskName() and public Abstract EurekaHttpResponse
execute() throws Throwable two abstract methods, which have anonymous implementation classes in PeerEurekaNode and implement the corresponding request logic such as register and cancel.

The scheduling logic mainly supports scheduling by ID and priority. Subsequent tasks with the same ID will overwrite the running tasks with the same ID. If processing fails, the tasks will be placed in the retry queue, and then placed in the pendingTasks with the highest priority.


