This series of articles is based on JerryLead’s SparkInternals. This article is based on the author’s own understanding, annotations, and some source code for learning purposes. After comparison, it is found that the core part has not changed much and is still worth reference

An overview of

After getting the system, deploying the system is the first thing. Then what services are started on each node after the system is successfully deployed?

Deployment diagram

You can see this in the deployment diagram

  • The whole cluster is divided into Master nodes and Worker nodes, which are equivalent to Master and Slave nodes of Hadoop.

  • The Master daemon resides on the Master node and is responsible for managing all Worker nodes.

  • Resident Worker daemon on the Worker node, responsible for communicating with the Master node and managing executors.

  • The process runs The main() function of The application and creates The SparkContext. Application is a driver program (Spark) written by users, such as WordCount. Scala. If the driver program is running on the Master, for example

    ./bin/run-example SparkPi 10
    Copy the code

Then SparkPi is the Driver on the Master. If it is a YARN cluster, the Driver may be scheduled to run on Worker nodes (such as Worker Node 2 in the figure above). In addition, if you run the driver Program directly on your OWN PC, such as in Eclipse, use the

val sc = new SparkContext("spark://master:7077"."AppName")
Copy the code
//feng: Use SparkSession as the unified entrance after Spark 2.x
val spark = SparkSession
 .builder()
 .appName("AppName")
 .master("spark://master:7077")
 .config("spark.sql.warehouse.dir"."D:\\spark-warehouse")
 .getOrCreate()
Copy the code

To connect to the master, the driver is on its own PC, but this is not recommended because the PC and Workers may not be on the same LAN, and communication between the driver and executor may be slow.

  • One or more ExecutorBackend processes exist on each Worker. Each process contains an Executor object that holds a thread pool, and each thread can execute a task.
//Executor.scala uses thread pools to handle execution tasks
  private val threadPool = {
    val threadFactory = new ThreadFactoryBuilder()
      .setDaemon(true)
      .setNameFormat("Executor task launch worker-%d")
      .setThreadFactory(new ThreadFactory {
        override def newThread(r: Runnable) :Thread =
          new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
      })
      .build()
      // Unbounded thread pool
    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]}// When executing a task
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription) :Unit = {
    //class TaskRunner extends Runnable
    val tr = new TaskRunner(context, taskDescription)
    runningTasks.put(taskDescription.taskId, tr)
    // execute in thread pool
    threadPool.execute(tr)
  }
Copy the code
  • Each Application contains a driver and multiple Executors. All tasks running in each Executor belong to the same Application.

  • In the Standalone version, ExecutorBackend instance into CoarseGrainedExecutorBackend process.

    Each Worker in my deployed cluster only run a CoarseGrainedExecutorBackend process, didn’t find out how to configure multiple CoarseGrainedExecutorBackend process. (There should be multiple processes when running multiple applications, which I haven’t tested yet.)

    To learn more about the relationship between Worker and Executor, see Spark Executor Driver resource scheduling summary written by @oopsoutofMemory.

    Feng :Spark Executor Driver resource scheduling 1. Every time the worker LaunchExecutor creates a CoarseGrainedExecutorBackend process (Executor and CoarseGrainedExecutorBackend is 1 to 1 relationship) 2. Executor allocation model: 1)spreadOutApps distribute executors to maximize load balancing and high parallelism; 2)! SpreadOutApps centralize distribution to satisfy the App as quickly as possible, regardless of parallelism and load balancing

  • Worker via ExecutorRunner object to control CoarseGrainedExecutorBackend rev. Stop.

//worker.scala
/ / Worker via ExecutorRunner object to control CoarseGrainedExecutorBackend rev. Stop
val executors = new HashMap[String.ExecutorRunner]
def receive = {
    case LaunchExecutor
        val manager = new ExecutorRunner(...)./ / create ExecutorRunner
        executors(appId + "/" + execId) = manager
        manager.start() // Start a process within a thread
        coresUsed += cores_
        memoryUsed += memory_
        // When the local worker status is updated and the executor thread is started, notify the master
        // Send ExecutorStateChanged to Master
        sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None.None))... }Copy the code

With the deployment diagram in mind, let’s first give an example of a job and then take a look at how jobs are generated and run.

Feng: After the Spark cluster is started, master and worker processes exist After a task is submitted, the driver(one application corresponds to one driver) starts the ExecutorBackend process on the worker, which is used to allocate executors

Scala Master starts the driver on the worker
// There are multiple drivers waiting to start
private val waitingDrivers = new ArrayBuffer[DriverInfo]
 The /** * schedule() method is a scheduling method for currently available resources. It manages the allocation of waiting Apps resources. * This method is called every time the cluster resources change, and allocates Apps resources based on the latest resources in the current cluster. * Changes include registering the driver, starting the driver on the worker, and starting executor on the worker */
  private def schedule() :Unit= {...// The Driver's internal implementation sends the Driver command to the specified Worker, and the Worker starts the Driver(the master sends the LaunchDriver message to the Worker).
    launchDriver(worker, driver) 
=>        worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    startExecutorsOnWorkers()
  }
Copy the code
//worker.scala
 def receive = {
    case LaunchDriver(driverId, driverDesc) =>   //worker receives the master message and starts the driver
      val driver = new DriverRunner(...). drivers(driverId) = driver//HashMap[String, DriverRunner]
      driver.start()    // The driver process is started by an internal thread
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
}
Copy the code

The Master and Worker start

Start the command

start-all.sh
    # internal call
    spark-config.sh Load configuration
    start-master.sh # will start the Master,new Master object
    start-slaves.sh Start the slave,new worker object
Copy the code

Master

// Master.scala main() new MasterArguments(argStrings, Conf)// Set the configuration parameter rpcenv.create ()// Create RPC communication, start Netty Server startRpcEnvAndEndpoint()// Start master receivers. Offer (data) / / receivers: LinkedBlockingQueue [EndpointData] MessageLoop. The run () / / thread consumption receivers of information / / thread according to the corresponding processing, message made here called the master of the onStart method  endpoint.onStart() Master.onStart() new MasterWebUI(this, WebUiPort).bind()// start the master UI // start a thread to CheckForWorkerTimeOut to remove the heartbeat timeout Worker Runnable.run() {self. Send (CheckForWorkerTimeOut)} / / according to RECOVERY_MODE (ZOOKEEPER/FILESYSTEM/CUSTOM), To generate the persistence engine persistenceEngine and election agent leaderElectionAgent / / generates zk directory, app, driver and worker information zkFactory createPersistenceEngine () ZkFactory. CreateLeaderElectionAgent (this) / / start leader of competition and new elections LeaderLatch (zk, WORKING_DIR).start() //Master is notified when its role changes, The Master electedLeader() method or revokedLeadership() method will be called master.ElectedLeader () self.send(electedLeader)// send the electedLeader to yourself receive()caseElectedLeader // To fetch app,driver and worker information zk.getChildren.forPath(WORKING_DIR).asScala.filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]) RegisterApplication (app) // Re-register all app read from zK driver. Send (MasterChanged(self, MasterWebUiUrl)) // Notifies the driver maste to change sell.send (CompleteRecovery)// Periodically sends itself the CompleteRecovery messagecaseCompleteRecovery worker.filter & apps.filter & drivers.filter // Filter/remove invalid components schedule() launchDriver() startExecutorsOnWorkers()Copy the code

worker

Main () new WorkerArguments(argStrings, The conf) / / reading configuration startRpcEnvAndEndpoint () / / start the worker receivers. The offer (data) / / receivers: LinkedBlockingQueue [EndpointData] MessageLoop. Run () // The information from the thread to consume receivers // The consuming thread processes the message accordingly. Here the worker's onStart method end.onstart () worker.onstart () new is called ExternalShuffleService().startifEnabled () // A new process that provides shuffle map outputs on other compute nodes. New WorkerWebUI(this, workDir, webUiPort).bind() //web ui registerWithMaster() tryRegisterAllMasters() new Runnable {run{sendRegisterMessageToMaster () ()}} / / send the registration information to master new Runnable {run() {Option (self). Foreach (_. Send (RegisterWorker)}} / / timing to send information to your registered worker cancelLastRegistrationRetry () / / thread has registered just cancel the timing task SendRegisterMessageToMaster (RegisterWorker) / / otherwise continue to asynchronously send registration information within retries. / / master scala the receive ()caseRegisterWorker workerRef.send(MasterInStandby) RegisterWorker workerRef.send(MasterInStandby) RegisterWorker workerRef.send(MasterInStandby) RegisterWorker workerRef.send(MasterInStandby workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) / / notify repeat. Registered the persistenceEngine addWorker (new WorkerInfo) / / worker information persistence (such as a zookeeper) workerRef. Send (RegisteredWorker ()) Schedule () // rescheduling, allocating resources //worker.scalacaseRegisteredWorker changeMaster() // Updates the master message self.send(SendHeartbeat) // The thread asynchronously sends the SendHeartbeat signal to itself // forwards it to the master sendToMaster(Heartbeat(workerId, self) master=> workerInfo.lastHeartbeat = System.currentTimeMillis() master=> worker.send(ReconnectWorker(masterUrl)) / / worker to restart/new join will find information in the master, register the self. The send (WorkDirCleanup) / / if opens the spark. The worker. The cleanup. Enabled =true// Notifies The Masterworker of the current status. The master compares the current status with the current status. // If the Executor/driver is inconsistent, the master returns a message telling the worker to kill the corresponding Executor/driver masterRef.send(WorkerLatestState)Copy the code

Spark – Submission process

Review the Spark-Submit process

spark-submit
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"SparkSubmit. Main () submit(kill// Reflection calls the main method of the custom class mainClass."main", new Array[String](0).getClass).invoke(null, Childargs.toarray) // Custom sparkSession creates a New SparkContext in the main method of the class sparkSession.getOrCreate(SparkContext.getOrCreate(sparkConf)) val (backend, ts) = SparkContext.createTaskScheduler(this, master, DeployMode) / / here will be according to the master of the spark - submit parameter selection mode, standalone, for example new StandaloneSchedulerBackend (new TaskSchedulerImpl (sc), sc, MasterUrls) TaskSchedulerImpl. The initialize () / / initializes the FIFO scheduling pool/FAIR ts. The start () backend. The start () new StandaloneAppClient().start() new ClientEndpoint().onstart () // tryRegisterAllMasters() registerMasterThreadPool.submit(new Runnable {run(masterRef.send(RegisterApplication(appDescription, Self)))}) / / speculate execution speculationScheduler scheduleWithFixedDelay (new Runnable {run (checkSpeculatableTasks ())} //Master receive()caseSend (RegisteredApplication(app.id, Self)) schedule() // launchDriver(worker, driver) Worker.new DriverRunner().start() // the Worker receives the message and starts DriverRunner StartExecutorsOnWorkers () / / calculate the corresponding to the worker need to assign cores, whether spreadOutApps concentration distribution executor scheduleExecutorsOnWorkers (app, usableWorkers, SpreadOutApps) allocateWorkerResourceToExecutors () / / distribution of real executor launchExecutor () Worker.endpoint. send(LaunchExecutor()) worker.new ExecutorRunner().start() // the worker receives the message to start ExecutorRunner Exec. Application. Driver. Send (ExecutorAdded ()) / / StandaloneAppClient receive registration information the receive ()caseRegisteredApplication() // Records that the app is registeredCopy the code

Job example

We use GroupByTest in the examples package provided with Spark. Assume that the command is run on the Master node

/* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] */
bin/run-example GroupByTest 100 10000 1000 36
Copy the code

The GroupByTest code is as follows

package org.apache.spark.examples
import java.util.Random
import org.apache.spark.{SparkConf.SparkContext}
import org.apache.spark.SparkContext. _/** * Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] */
object GroupByTest {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("GroupBy Test")
    var numMappers = 100
    var numKVPairs = 10000
    var valSize = 1000
    var numReducers = 36

    val sc = new SparkContext(sparkConf)

    val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
      val ranGen = new Random
      var arr1 = new Array[(Int.Array[Byte])](numKVPairs)
      for (i <- 0 until numKVPairs) {
        val byteArr = new Array[Byte](valSize)
        ranGen.nextBytes(byteArr)
        arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
      }
      arr1
    }.cache
    // Enforce that everything has been calculated and in cache
    pairs1.count

    println(pairs1.groupByKey(numReducers).count)

    sc.stop()
  }
}

Copy the code

After reading the code, the job execution flow in the user’s mind looks like this:

The specific process is very simple, here is to estimate the data size and execution result:

  1. Initialize SparkConf().
  2. Initialize numMappers=100, numKVPairs=10,000, valSize=1000, numReducers= 36.
  3. Initialize SparkContext. This step is important to establish the status of the driver, which contains the various actors and objects needed to create the driver.

Spark 2.x uses Netty based Rpc communication. Akka is deprecated because it is not suitable for package/ Stream data transmission. See Why Spark uses Netty communication framework instead of Akka

  1. Each mapper generates onearr1: Array[(Int, Byte[])], length is numKVPairs. The length of each Byte[] is valSize, and Int is a randomly generated integer.Size(arr1) = numKVPairs * (4 + valSize) = 10MB, soSize(pairs1) = numMappers * Size(arr1) = 1000MB. All of the numerical results here areApproximately equal to the.
  2. Each Mapper will cache the ARR1 array into memory.
  3. Then execute an action count() to count the number of elements in arr1 in all mappersNumMappers * numKVPairs = 1,000,000. The main purpose of this step is to cache the ARR1 array generated by each mapper into memory.
  4. Run the groupByKey operation on paris1 that has been cached. The number of reducer (partitions) generated by the groupByKey is numReducers. Theoretically, if hash(Key) is average, the number of <Int, Array[Byte]> records received by each reducer isNumMappers * numKVPairs/numReducer = 27,777, the size ofSize(pairs1) / numReducer = 27MB.
  5. Reducer will receive<Int, Byte[]>Records with the same Int are grouped together to obtain<Int, list(Byte[], Byte[], ... , Byte[])>.
  6. Finally count adds the number of records in all reducer, and the final result is actually the total number of different ints in Pairs1.

Job logical execution diagram

The actual Job execution process is more complex than the user’s mind. You need to create a logical execution diagram (or data dependency diagram), divide the logical execution diagram to generate a DAG physical execution diagram, and then generate task execution. Analyze the logical execution diagram of the job:

Using RDD.toDebugString you can see the entire Logical plan (data dependencies for RDD) as follows

  MapPartitionsRDD[3] at groupByKey at GroupByTest.scala:51 (36 partitions)
    ShuffledRDD[2] at groupByKey at GroupByTest.scala:51 (36 partitions)
      FlatMappedRDD[1] at flatMap at GroupByTest.scala:38 (100 partitions)
        ParallelCollectionRDD[0] at parallelize at GroupByTest.scala:38 (100 partitions)
Copy the code

The graph is:

Note that data in the partition shows the results that should be computed for each partition. It does not mean that these results are stored in memory at the same time.

According to the above analysis, it can be seen that:

  • The user first init an array of 0-99:0 until numMappers
  • Parallelize () produces the initial ParrallelCollectionRDD, with each partition containing an integer I.
  • After transformation (in this case flatMap) is executed on the RDD, FlatMappedRDD is generated, where each partition contains an Array[(Int, Array[Byte])].
  • When the first count() is executed, the count is executed on each partition, the result is sent to the driver, and the sum is executed on the driver side.
/** * RDD. Scala * Return the number of elements in the RDD. * utils.getiteratorSize _: is executed in partition */
  def count() :Long = sc.runJob(this.Utils.getIteratorSize _).sum
Copy the code
  • Since FlatMappedRDD is cached into memory, the partitions inside it are colored differently.
  • GroupByKey generates the next two RDDS, and why these are generated is discussed in a later section.
  • If a job requires shuffle, ShuffledRDD is generated. The relationship between this RDD and the previous RDD is similar to that between mapper output data in Hadoop and reducer input data.
  • MapPartitionsRDD contains the groupByKey() result.
  • Finally, each value in MapPartitionsRDD (that is, Array[Byte]) is converted to an Iterable type.
  • The last count is executed similarly to the last count.

The logical execution diagram describes the data flow of the job: which transformations () the job goes through, which RDDS are generated in the process, and the dependencies between RDDS.

Job Physical execution diagram

Logical execution diagrams represent data dependencies, not task execution diagrams. In Hadoop, users directly face tasks, and mapper and Reducer have clear responsibilities: one implements block processing and the other implements aggregate. In Hadoop, the entire data stream is fixed and you just populate the Map () and Reduce () functions. Spark has more complex data processing processes and more flexible data dependence. Therefore, it is difficult to unify data flows and physical tasks. Therefore, Spark separates the data flow from the execution process of a specific task and designs an algorithm to convert the logical execution diagram into the physical execution diagram of the task. The transformation algorithm is discussed in the following sections.

For this job, let’s draw its physical execution DAG as follows:

GroupByTest generates two jobs. The first job is generated by the first action (pairs1.count).

  • The entire job consists of only one stage.
  • Stage 0 contains 100 resulttasks.
  • Each task calculates the flatMap to generate the FlatMappedRDD, and then executes action(), i.e. count(), to count the number of records in each partition. For example, Partition 99 contains only 9 records.
  • Since Pairs1 is declared to cache, after the Task calculates the FlatMappedRDD, it caches all the partitions it contains into the executor’s memory.
  • After the task is executed, the driver collects the execution results of each task and sums ().
  • Job 0 ends.

The second job is triggered by pairs1.groupByKey(numReducers).count. Analyze the job:

  • The job consists of two stages.
  • Stage 1 contains 100 ShuffleMapTasks, each of which reads a portion of pairs1 from the cache and partitions it in a manner similar to what mapper does in Hadoop. Finally, write the partition result to the local disk.
  • Stage 0 contains 36 resulttasks. Each task first shuffles its own data to be processed, and then performs aggregate and subsequent mapPartitions() operations while fetching data. Finally, count() is computed to obtain result.
  • After the task is executed, the driver collects the execution results of each task and sums ().
  • Job 1 ends.

As you can see, the physical execution diagram is not simple. Different from MapReduce, an application in Spark may contain multiple jobs. Each job contains multiple stages, and each stage contains multiple tasks. How to divide jobs, stages and tasks will be introduced in the following chapters.

Discussion

At this point, we have an idea of the overall system and job generation and execution, and we have also explored features such as cache. The following sections will discuss the core functions of the system involved in job generation and execution, including:

  1. How do I generate logical execution diagrams
  2. How do I generate physical execution diagrams
  3. How to submit and schedule jobs
  4. How are tasks generated, executed, and processed
  5. How to Shuffle
  6. The cache mechanism
  7. Broadcast mechanism

Reference documentation

  1. SparkInternals
  2. Spark Executor Driver Resource scheduling overview
  3. Why Spark uses Netty communication framework instead of Akka
  4. In-depth understanding of Spark 2.1 principle and source code analysis