System Architecture Design

Before explaining the architecture of the scheduling system, let’s take a look at the common terms of the scheduling system

1. Noun explanation

DAG: Directed Acyclic Graph (DAG). Tasks in the workflow are assembled as directed acyclic graphs, traversing topology from zero entry nodes until there are no successors. Here is an example:

DAG sample

Process definition: Visual DAG formed by dragging and dropping task nodes and establishing associations between task nodes

Process instance: A process instance is an instantiation of a process definition and can be generated manually or by scheduled scheduling

Task instance: A task instance is an instantiation of a task node in a process definition, which identifies the execution status of a specific task

Task types: SHELL, SQL, SUB_PROCESS, PROCEDURE, MR, SPARK, PYTHON, DEPENDENT are currently supported, and dynamic plug-in extensions are planned, note: The SUB_PROCESS is also a separate process definition that can be started and executed separately

Scheduling mode: The system supports scheduled scheduling and manual scheduling based on crON expressions. The following command types are supported: Start workflow, start execution from the current node, resume fault-tolerant workflow, resume suspended process, start execution from failed node, complement, schedule, rerun, pause, stop, and resume waiting thread. Among them, the two command types of restoring fault-tolerant workflow and restoring waiting thread are used by scheduling internal control and cannot be invoked externally

Timing: The system adopts Quartz distributed scheduler and supports crON expression visualization

Dependencies: The system not only supports dependencies between the simple precursor and successor nodes of DAG, but also provides task-dependent nodes to support custom task dependencies between processes

Priority: Supports the priority of a process instance and a task instance. If the priority of a process instance or a task instance is not set, the priority is first in first out by default

Email alarm: Supports email notification of QUERY results of SQL tasks, process instance running results, and fault tolerance alarm

Failure policy: For tasks running in parallel, if a task fails, two failure policies are provided. Continuation means that the status of tasks running in parallel is ignored until the process fails. If a task fails, the system kills the parallel tasks that are running

Data complement: Historical data complement, supporting interval parallel and serial data complement

2. System architecture

2.1 System architecture diagram

System Architecture Diagram

2.2 Architecture Description

  • MasterServer

    MasterServer adopts the concept of distributed, centrless design. MasterServer is mainly responsible for DAG task partitioning, task submission monitoring, and monitoring the health status of other MasterServers and WorkerServers. When the MasterServer service starts, it registers a temporary node with Zookeeper and monitors the change of the temporary Zookeeper node for fault tolerance.

    The service mainly includes:
    • Distributed Quartz Is a Distributed scheduling component that is responsible for starting and stopping scheduled tasks. When Quartz starts a task, the Master has a thread pool to process subsequent operations of the task

    • The MasterSchedulerThread is a scanning thread that periodically scans the command table in the database to perform different service operations based on different command types

    • MasterExecThread is mainly responsible for DAG task segmentation, task submission monitoring, and logical processing of various command types

    • MasterTaskExecThread is responsible for the persistence of tasks

  • WorkerServer

    WorkerServer also adopts the concept of distributed no-center design. WorkerServer is mainly responsible for executing tasks and providing logging services. When the WorkerServer service starts, it registers temporary nodes with Zookeeper and maintains heartbeat.

    The service includes:
    • FetchTaskThread is mainly responsible for continuously from the Task Queue to receive tasks, and types of tasks according to the different call TaskScheduleThread corresponding actuator.

    • LoggerServer is an RPC service that provides the functions of viewing, refreshing, and downloading log fragments

  • ZooKeeper

    ZooKeeper: The MasterServer and WorkerServer nodes in the system use ZooKeeper to manage clusters and tolerate faults. In addition, the system also implements event monitoring and distributed locking based on ZooKeeper. We also implemented queues based on Redis, but we wanted EasyScheduler to rely on as few components as possible, so we ended up removing the Redis implementation.

  • Task Queue

    Provides the operation of the task queue, which is also based on Zookeeper. Due to the small amount of information stored in the queue, there is no need to worry about the situation of excessive data in the queue. In fact, we have tested the million-level data stored in the queue, which has no impact on the stability and performance of the system.

  • Alert

    Provides interfaces for storing, querying, and notifying alarms of two types of alarm data. There are two notification functions: email notification and **SNMP(not yet implemented)**.

  • API

    The API interface layer is mainly responsible for processing requests from the front-end UI layer. This service provides RESTful apis to provide request services externally. Interfaces include creating, defining, querying, modifying, publishing, offline, manually starting, stopping, pausing, resuming, executing from that node, and so on.

  • UI

    The front-end page of the system provides various visual operation interfaces of the system. See ** section of ** system user manual for details.

2.3 Architecture design idea

1. Decentralization vs centralization
Centralization thought

The centralized design concept is relatively simple. Nodes in a distributed cluster can be divided into two roles according to the division of labor:

The master – slave role

  • The Master role distributes tasks and monitors the health status of the slaves. The Master role dynamically balances tasks to the slaves so that the Slave nodes are not busy or idle.
  • The role of Worker is mainly responsible for executing tasks and maintaining the heartbeat with the Master so that the Master can assign tasks to the Slave.

Problems of centralized thought design:

  • If there is a problem with the Master, there is no leader and the whole cluster collapses. To solve this problem, most Master/Slave architectures adopt the Master/Slave design scheme, which can be hot standby or cold standby, automatic switchover or manual switchover, and more and more new systems begin to have the ability to automatically switch the Master to improve system availability.
  • Another problem is that if the Scheduler is on the Master, while it can support different tasks in a DAG running on different machines, it can create an overload on the Master. If Scheduler is on the Slave, all tasks in a DAG can only submit jobs on a certain machine. In this case, when there are many parallel tasks, the Slave may be under great pressure.
decentralized

decentralized

  • In decentralized design, there is usually no concept of Master/Slave. All roles are the same and their status is equal. The global Internet is a typical decentralized distributed system.

  • The core design of decentralized design is that there is no “manager” that is different from other nodes in the whole distributed system, so there is no single point of failure. However, since there is no “manager” node, each node needs to communicate with other nodes to get the necessary machine information, and the unreliable communication of distributed system greatly increases the difficulty of realizing the above functions.

  • In fact, truly decentralized distributed systems are rare. Instead, dynamic centralized distributed systems are emerging. In this architecture, the managers in the cluster are dynamically selected, rather than preset, and in the event of a cluster failure, the nodes of the cluster will spontaneously hold a “meeting” to elect a new “manager” to lead the work. The most typical case is Etcd implemented by ZooKeeper and Go.

  • The decentralization of EasyScheduler is that Master/Worker registers in Zookeeper to realize that Master cluster and Worker cluster have no center, and Zookeeper distributed lock is used to elect one Master or Worker as “manager” to execute tasks.

Second, distributed lock practice

EasyScheduler uses ZooKeeper distributed lock to realize the task submission with only one Master Scheduler or only one Worker executing the task at the same time.

  1. The core process algorithm of acquiring distributed lock is as follows

Get the distributed lock process

  1. Flowchart of Scheduler thread distributed lock realization in EasyScheduler: Obtain distributed lock process
Three, insufficient thread loop waiting problem
  • If there are no subprocesses in a DAG, the direct process waits or fails if the number of data items in the Command is greater than the threshold set by the thread pool.
  • If a large DAG has many subprocesses nested within it, the following diagram produces a “dead” state:

Insufficient threads loop wait problem

Starting a new Master to break the logjam seems a bit disappointing, so we proposed the following three options to mitigate this risk:

  1. Count the total of all Master threads, and then count the number of threads required for each DAG, that is, do a predictive calculation before the DAG process is executed. Because it is a multi-master thread pool, the total number of threads is unlikely to be captured in real time.
  2. Check the single Master thread pool and let the thread fail if the pool is full.
  3. Add an under-resourced Command type that suspends the main process if the thread pool is insufficient. This gives a new thread to the thread pool, allowing a process that has been suspended due to insufficient resources to wake up and execute.

Note that the Master Scheduler thread executes FIFO when fetching Command.

So we chose a third way to solve the thread shortage problem.

Fault tolerant design

Fault tolerance can be divided into service downtime fault tolerance and task retry, and service downtime fault tolerance can be divided into Master fault tolerance and Worker fault tolerance

1. Fault tolerance for downtime

Service fault-tolerant design relies on the Watcher mechanism of ZooKeeper, and the implementation principle is shown in the figure below:

EasyScheduler fault tolerant design

Master monitors directories of other masters and workers. If the remove event is monitored, process instance or task instance fault tolerance will be performed according to specific business logic.

  • Master fault tolerance flow chart:

Master Fault tolerance flowchart

  • Worker fault tolerance flow chart:

Worker fault tolerance flow chart

The Master Scheduler thread takes over the task and resubmits it once it finds that the task instance is in the fault tolerant state.

Note: Due to network jitter, the node may lose the heartbeat communication with ZooKeeper in a short period of time, and thus the node remove event occurs. In this case, we use the simplest way, that is, once the node encounters timeout connection with ZooKeeper, it will directly stop the Master or Worker service.

2. If the task fails, try again

Here we first distinguish between task failure retry, process failure recovery, and process failure rerun:

  • Retry is performed automatically by the scheduling system at the task level. For example, if the retry times of a Shell task is set to three, the system tries to run a Shell task for a maximum of three times after the Shell task fails
  • Process failure recovery is process-level and manual, and recovery can only be performed from the failed node or from the current node
  • A process failure rerun is also process-level and is done manually from the start node

Moving on, we have divided the task nodes in the workflow into two types.

  • One is the service node, which corresponds to an actual script or processing statement, such as Shell node, MR node, Spark node, dependent node, etc.

  • There is also logical node, which does not do the actual script or statement processing, but only the logical processing of the entire process flow, such as sub-process sections.

You can set the retry times for each service node. If the task node fails, the system automatically tries again until the task node succeeds or exceeds the configured retry times. Logical nodes do not support retry failures. However, tasks in logical nodes support retry.

If a task in a workflow fails to reach the maximum number of retries, the workflow will fail and stop. The failed workflow can be manually rerun or process recovery

5. Task priority design

In the early scheduling design, if there is no priority design and the fair scheduling design is adopted, the task submitted first may be completed at the same time as the task submitted later, and the process or task priority cannot be set. Therefore, we redesigned this, and our current design is as follows:

  • In accordance with theDifferent process instance prioritiesPrior to theSame process instance priorityPrior to thePriority of tasks within the same processPrior to theTasks within the same processTasks are processed in descending order of submission.
    • The specific implementation is to parse the priority according to the JSON of the task instance, and then save the process instance priority _ process instance ID_ Task Priority _ task ID information in the ZooKeeper task queue. When it is obtained from the task queue, the most priority task can be obtained by comparing strings

      • The priority defined by a process takes into account that some processes need to be processed before other processes. This priority can be configured when a process is started or started at a scheduled time. There are five levels in total, namely HIGHEST, HIGH, MEDIUM, LOW, and LOWEST. The following figure

Process priority configuration

- Task priorities are divided into five levels: HIGHEST, HIGH, MEDIUM, LOW, and LOWEST. The following figureCopy the code

Task priority configuration

Logback and gRPC implement log access
  • Because the Web(UI) and Worker are not necessarily on the same machine, viewing logs cannot be the same as querying local files. There are two options:

  • Place the logs on the ES search engine

  • Obtain remote log information through gRPC communication

  • Considering the lightness of EasyScheduler as much as possible, gRPC is selected to achieve remote access to log information.

GRPC remote access

  • We use the FileAppender and Filter functions of our custom Logback to generate one log file per task instance.
  • FileAppender is implemented as follows:
/** * task log appender */
public class TaskLogAppender extends FileAppender<ILoggingEvent {...@Override
   protected void append(ILoggingEvent event) {

       if (currentlyActiveFile == null){
           currentlyActiveFile = getFile();
       }
       String activeFile = currentlyActiveFile;
       // Thread name: taskThreadName -processDefineid_processInstanceid_taskInstanceid
       String threadName = event.getThreadName();
       String[] threadNameArr = threadName.split("-");
       // logId = processDefineId_processInstanceId_taskInstanceId
       String logId = threadNameArr[1]; .super.subAppend(event); }}Copy the code

Logs are generated in the format of/process definition ID/process instance ID/task instance id.log

  • Filter matches thread names starting with TaskLogInfo:

  • The TaskLogFilter implementation is as follows:

/** * task log filter */
public class TaskLogFilter extends Filter<ILoggingEvent {

   @Override
   public FilterReply decide(ILoggingEvent event) {
       if (event.getThreadName().startsWith("TaskLogInfo-")) {return FilterReply.ACCEPT;
       }
       returnFilterReply.DENY; }}Copy the code

conclusion

Starting from scheduling, this paper introduces the architecture principle and implementation idea of EasyScheduler, a distributed workflow scheduling system for big data.