This series of articles is based on JerryLead’s SparkInternals. This article is based on the author’s own understanding, annotations, and some source code for learning purposes. After comparison, it is found that the core part has not changed much and is still worth reference

The Cache and Checkpoint

An important feature that distinguishes it from Hadoop, the cache mechanism ensures that applications that need to access repetitive data (such as iterative algorithms and interactive applications) can run faster. Unlike Hadoop MapReduce jobs, Spark’s logical/physical execution graph may be large, the Computing chain in a task may be long, and calculating certain RDD may be time-consuming. In this case, if the task fails, the entire computing chain of the task needs to be recalculated. Therefore, it is necessary to check the RDD whose calculation cost is high. In this way, if the downstream RDD fails to be checked, data can be directly read from the RDD whose calculation cost is high.

The Cache mechanism

[FlatMappedRDD] [GroupByTest] [FlatMappedRDD] [GroupByTest] [FlatMappedRDD] [FlatMappedRDD] A visible cache enables duplicate data to be shared between jobs in the same application.

Logical execution diagram:

Question: Which RDDS need a cache?

It will be reused (but not too much).

Q: How do users specify which RDD to cache?

Since the user only deals with driver programs, only rdD.cache () can be used to fetch the RDD that the cache user can see. Visible refers to the RDD generated after transformation(). Some RDD generated by Spark in Transformation () cannot be cached directly. For example, the ShuffledRDD and MapPartitionsRDD generated in reduceByKey() cannot be directly cached by users.

Rdd.cache () is set in driver program.

Instead of implementing it, imagine how to complete the cache: When the task obtains the first record of a partition in the RDD, it determines whether the RDD is to be cached. Throw the record and subsequent calculated records directly to the local blockManager memoryStore, if the memoryStore can not store the disk to the diskStore.

The implementation is basically the same as expected, except that the partition will be cached when the RDD partition is about to be computed (rather than when the first record has been computed). If the partition is to be cached, the partition is calculated first and then cached to the memory. A cache uses only memory, which is called checkpoint when writing to a disk.

After rdd.cache() is called, RDD becomes persistRDD and the StorageLevel is MEMORY_ONLY. PersistRDD tells the driver that it needs to be persisted.

Sparkcontext.persistrdd () is called to add the RDD to the persistentRdds map (ConcurrentMap[Int, RDD[_]]). PersistRDD is not generated

If expressed in code:

rdd.iterator()
=>  ifstorageLevel ! = storagelevel. NONE getOrCompute(split, context)// Read cache RDD or computeOrReadCheckpoint RDDelseComputeOrReadCheckpoint (split, context) // // If already Checkpointed, read from the first parent RDD and call its iterator()if (isCheckpointedAndMaterialized)  firstParent[T].iterator(split, context)
        elsecompute(split, Context) // getOrCompute() without Checkpointed // pull data if there is data, otherwise recalculate and persist data SparkEnv.get.blockManager.getOrElseUpdate(RDDBlockId(id, Partition. Index),storageLevel) getLocalValues(blockId) // Find memoryStore.getValues(blockId) Diskstore.getbytes (blockId) // Or disk read getRemoteValues[T](blockId) // remote pull BlockManagerMaster. GetLocations (blockId) / / send the information to the driver for block address foreach block, the location / / block corresponds to a set of position (copy), as long as there is a pull to the data is returned blockTransferService.fetchBlockSync(host, port, executorId,blockId).nioByteBuffer()doPutIterator () / / front, computing and storage memoryStore putIteratorAsValues (computeOrReadCheckpoint ()) entries. The put (blockId, entry)//LinkedHashMap[BlockId, MemoryEntry[_]]ifspace not enough && level.useDisk : diskStore.put(blockId){ channel => Channels.newOutputStream(channel) ... } reportBlockStatus(blockId, putBlockStatus)// Send a message to BlockManagerMaster in Dirver to update the block information blockLocations. Locations)// Driver (JHashMap[BlockId, mutable.HashSet[BlockManagerId]]) replicate(doGetLocalBytes(blockId, info)...) / / copy a block of data to other nodes blockTransferService. UploadBlockSync () / / (spark. Storage. MaxReplicationFailures, 1) : to copy the allowed maximum number of failureCopy the code

When rdd.iterator() is called to calculate a partition in the RDD, cacheManager will first get a blockId that indicates which partition of the RDD to save. This blockId type is RDDBlockId (memoryStore may also hold data such as the result of a task, so blockId is used to distinguish between different types of data). Check whether the partition has been checkpoint in blockManager. If yes, it indicates that the task has been checkpoint before. All records of the partition are read directly from checkpoint into an ArrayBuffer called Elements. If the partition has not been checkpoint, calculate the partition first and place all records in elements. Finally, elements are handed to blockManager for caching.

1. I can’t find a place in the code that says “Go and get an blockId from cacheManager first “, I can only see val ID: Int = sc.newrddid () is an immutable object. Val blockId = RDDBlockId(id, Partition. Index), which would be the logical way to get blockId and find the cache, instead of simply creating an id 2. The data type returned is BlockResult(containing Iterator), not the ArrayBuffer 3 called Elements. Should the calculated data be placed in entries of LinkedHashMap[BlockId, MemoryEntry[_]] or directly on disk, independent of elements’ ArrayBuffer

BlockManager stores elements (partitions) in the LinkedHashMap[BlockId, Entry] managed by the memoryStore. If the partition is greater than the memoryStore storage limit (60% heap by default), it simply returns no memory. If the remaining space is possible to fit, the cached RDD partitions are dropped to make room for the new partition. If enough space is available, the new partition is added to the LinkedHashMap. Teng can not return to say can not save. Note That the partition that belongs to the same RDD as the new partition is not dropped. Drop The partition that is first dropped from the cache. (What about the LRU replacement algorithm?)

Memory space management also needs to read information to supplement

Question: How is the cached RDD read?

If cached RDD is used for the next calculation (usually the next job calculation of the same application), the Task reads the cached RDD directly from the memoryStore of the blockManager. Iterator () is used to compute partitions in an RDD. BlockManager checks whether the partition has been cached. If the partition is cached locally, Directly using blockManager. GetLocalValues () to read in the local memoryStore. If the partition by other nodes blockManager cache, through blockManager. GetRemoteValues () to the other nodes on the read, read below process.

Get where cached partitions are stored: After the partition is cached, the blockManager on the node where the partition resides notifies the BlockManagerMaster on the driver that the partition in the RDD has been cached The blockManager. GetOrElseUpdate () {reportBlockStatus}, this information will be stored in BlockManagerMasterEndpoint blockLocations: HashMap. When the task execution needs the cached RDD, In blockManager. GetRemoteBytes () call blockManagerMaster getLocations (blockId) to ask some partition storage location, This inquiry information will be sent to a driver (where BlockManagerMasterEndpoint), driver query blockLocations get location information and will be back.

Read cached Partitions on other nodes: Once the task has the cached partition location information, it sends the GetBlock(blockId) request to the destination node via connectionManager. The target node receives the request, reads the cached partition from the memoryStore at the local blockManager, and sends it back.

Spark 2.x does not have a connectionManager for data transfer. Instead, ShuffleClient is used to read data in Java asynchronous NIO mode

Checkpoint

Question: Which RDD requires checkpoint?

An RDD can be obtained only when the computing chain is too long or requires too much computing. An RDD can be obtained only when the computing chain is too long or depends on too many other RDD. Actually, saving ShuffleMapTask output to a local disk is a checkpoint, but its main purpose is to partition the output data.

Question: When to checkpoint?

The cache mechanism is to cache a partition directly into memory every time it is calculated. However, checkpoint does not use the method of storing the first calculation. Instead, it starts a special job after the job ends to checkpoint. This means that the RDD requiring checkpoint is computed twice. Therefore, when rdd.checkpoint() is used, it is recommended to add rdd.cache() so that the second running job does not need to calculate the RDD, but directly reads the cache and writes to the disk. In fact, Spark provides rdd.persist(storagelevel.disk_only), which is equivalent to cache to disk. In this way, the RDD is stored on disk when it is calculated for the first time. However, there are many differences between this persist and checkpoint, which will be discussed later.

  def runJob[T.U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext.Iterator[T]) = >U,
      partitions: Seq[Int],
      resultHandler: (Int.U) = >Unit) :Unit= {... dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll())// The checkpoint operation is performed after runjob
    rdd.doCheckpoint()
  }
  
   /** * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD. */
  def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = - 1) :ReliableCheckpointRDD[T] = {...// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    / / repeat runJob, writePartitionToCheckpointFile mainly is serialized and IO operations
    sc.runJob(originalRDD,writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

    if (originalRDD.partitioner.nonEmpty) {
    // Serialize the partitioner and save the IO
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }
    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)
    newRDD
  }
Copy the code

Question: How is checkpoint implemented?

The RDD needs to go through [Initialized > marked for checkpointing –> checkpointing in progress –> checkpointing] Checkpoint.

// Spark 2.2 has three phases
private[spark] object CheckpointState extends Enumeration {
  type CheckpointState = Value
  val Initialized.CheckpointingInProgress.Checkpointed = Value
}

//RDDCheckpointData.scala
// The member attribute defaults to checkPoint state: Initialized
protected var cpState = Initialized
final def checkpoint() :Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        cpState = CheckpointingInProgress
      } else {
        return}}/ / will be back in the parent RDD, call above ReliableCheckpointRDD. WriteRDDToCheckpointDirectory (RDD, cpDir)
    val newRDD = doCheckpoint()
    // Update our state and truncate the RDD lineage
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)  // Type ReliableCheckpointRDD[T]
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }
Copy the code
Dependencie private[spark] def markCheckconservative (): Unit = { clearDependencies() partitions_ = null deps = null // Forget the constructor argumentfor dependencies too
  }
Copy the code

The following states are different in Spark 2.2. It is a good idea to look at the code above

Initialized: The driver program uses RDD.checkpoint () to check which RDD checkpoint is required. After setting the checkpoint, the RDD is managed by RDDCheckpointData. You also need to set the checkpoint storage path, which is usually on the HDFS.

Marked for checkpointing: After initialization, RDDCheckpointData will mark RDD as MarkedForCheckpoint.

Checkpointing in progress: Finalrdd.docheckpoint () is called after each job runs. FinalRdd scans back up the Computing chain. Mark an RDD to checkpoint as CheckpointingInProgress, Then broadcast the configuration files (such as core-site. XML) required for writing disks (such as HDFS) to the blockManager on other worker nodes. Is complete, start a job to do: (use RDD. Context. RunJob (RDD, CheckpointRDD writeToFile (path. ToString, broadcastedConf))).

Checkconservative: After the job finishes checkpoint, MarkCheckconservative () clears all RDD dependency and sets the RDD status to Checkconservative. Then, impose a dependency on the RDD and set the parent RDD of the RDD to CheckpointRDD, which is responsible for reading checkpoint files on the file system and generating partitions of the RDD.

What is interesting is that I checkpoint two RDD in driver program, and only one (the following result) is checkpoint successful, but pairs2 is not checkpoint. We don’t know if it’s a bug or just checkpoint downstream RDD on purpose:

val data1 = Array[(Int.Char)]((1, 'a'), (2, 'b'), (3, 'c'), 
    (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)
    
val data2 = Array[(Int.Char)]((1, 'A'),2, 'B'),3, 'C'),4, 'D'))
val pairs2 = sc.parallelize(data2, 2)

pairs2.checkpoint

val result = pairs1.join(pairs2)
result.checkpoint
Copy the code

Question: How to read the CHECKPOINT RDD?

When runJob(), finalRDD partitions() are called to determine that there will be multiple tasks at the end. Rdd. RDD () checks to check whether the RDD is checkpoint checked, but RDDCheckpointData is not checked by RDDS. If the RDD has been checkpoint returned, Array[Partition] is returned.

//eg:rdd.count()
=> sc.runJob(this.Utils.getIteratorSize _).sum
=> runJob(rdd, func, 0 until rdd.partitions.length)

//in rdd.partitions
final def partitions: Array[Partition] = {
    // Check cpRDD saved by ReliableCheckpointRDD
    checkpointRDD.map(_.partitions).getOrElse {
      if (partitions_ == null) {
        partitions_ = getPartitions
        partitions_.zipWithIndex.foreach { case (partition, index) =>
          require(partition.index == index,
            s"partitions($index).partition == ${partition.index}, but it should equal $index")
        }
      }
      partitions_
    }
  }
Copy the code

Iterator () computeOrReadCheckpoint(split:) computeOrReadCheckpoint(split:) computeOrReadCheckpoint(split:) Iterator () : iterator() : iterator() : iterator() : iterator() : iterator(); CheckpointRDD reads files on the file system and generates partitions of the RDD. This explains why trickly added a parent CheckpointRDD to the Checkpointed RDD.

runJob() 
    ...
    task.runTask()
        func(context, rdd.iterator(partition, context)) //ResultTask
        writer.write(rdd.iterator(partition, context) //ShuffleMapTask
            rdd.iterator()
                if(storageLevel ! =StorageLevel.NONE) {
                  getOrCompute(split, context)
                } else {
                  computeOrReadCheckpoint(split, context)
                }
Copy the code

Q: What is the difference between cache and checkpoint?

Tathagata Das has an answer to this question: There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory and/or Disk (actually only memory). But the lineage (also known as computing chain) of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication).

To dig deeper, rdd.persist(storagelevel.disk_only) is also different from checkpoint. The former can persist RDD partitions to disks, but this partition is managed by blockManager. Once the driver end of program execution, is the executor’s process CoarseGrainedExecutorBackend stop, blockManager will also stop, The RDD cached to disk is also cleared (the local folder used by the entire blockManager is deleted). The checkpoint persists the RDD to HDFS or a local folder. If the RDD is not removed manually, check whether the RDD has been checkpoint. Cached RDD can be used by the next driver program. Cached RDD cannot be used by other dirver programs.

// In Spark 2.2 about the RDD checked by remove checkpoint
//object ReliableRDDCheckpointData
 /** Clean up the files associated with the checkpoint data for this RDD. */
  def cleanCheckpoint(sc: SparkContext, rddId: Int) :Unit = {
    checkpointPath(sc, rddId).foreach { path =>
      path.getFileSystem(sc.hadoopConfiguration).delete(path, true)}}Copy the code

Discussion

When Hadoop MapReduce executes a job, it continuously performs persistence once after each task runs and once after each job runs (it is written to HDFS). Tasks are constantly swapping between memory and disk during execution. Ironically, tasks in Hadoop are so stupid that they need to be completely reshuffled if something goes wrong, such as shuffling half of the data to disk and having to be reshuffled the next time they are reshuffled. The good thing about Spark is that it does not persist, so it uses pipeline, cache, etc. If you feel that a job may fail, you can manually check some critical RDD data. If a job fails, data is directly read from checkpoint. The only downside is that checkpoint requires two runs of the job.

Example

I haven’t found an official checkpoint example yet. Here’s one:

package internals

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext. _import org.apache.spark.SparkConf

object groupByKeyTest {

   def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
    val sc = new SparkContext(conf) 
    sc.setCheckpointDir("/Users/xulijie/Documents/data/checkpoint")
     
	val data = Array[(Int.Char)]((1, 'a'), (2, 'b'),
		    						 (3, 'c'), (4, 'd'),
		    						 (5, 'e'), (3, 'f'),
		    						 (2, 'g'), (1, 'h')
		    						)    							
	val pairs = sc.parallelize(data, 3)
	
	pairs.checkpoint
	pairs.count
	
	val result = pairs.groupByKey(2)

	result.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "]" + x))
	
	println(result.toDebugString)
   }
}
Copy the code