This series of blogs summarizes and shares examples drawn from real business environments, and provides Spark source code interpretation and business practice guidance. Please stay tuned for this series of blogs. Copyright: This set of Spark source interpretation and commercial practice belongs to the author (Qin Kai xin) all, prohibit reprint, welcome to learn.

Spark business environment practice and tuning advanced series

  • Spark Business Environment Combat -Spark built-in framework RPC communication mechanism and RpcEnv infrastructure
  • Spark Business environment -Spark event listening bus process analysis
  • Spark Business Environment Deployment – Analysis of Spark storage system infrastructure
  • Spark Business Environment Combat -Spark has multiple MessageLoop loop threads to perform process analysis
  • Spark Business Environment: Stage division algorithm and optimal task scheduling details of Spark secondary scheduling system
  • Spark Business Environment Deployment -Spark task delay scheduling and scheduling Pool architecture analysis
  • Spark business environment practice -Task granularity cache aggregation sort structure AppendOnlyMap detailed analysis
  • Spark Business Environment -ExternalSorter Describes the design concept of the external sequencer during Spark Shuffle
  • Spark Business Environment: ShuffleExternalSorter Describes the design of the external sequencer during Shuffle
  • Spark Business Environment Practice-Analysis of the design idea of unified storage service SortShuffleWriter for ShuffeManager
  • Spark Business environment combat -ShuffeManager unified storage service UnsafeShuffleWriter design idea Analysis]
  • [Spark unified storage service of business environment of actual combat – ShuffeManager BypassMergeSortShuffleWriter design analysis]
  • [Spark Business Environment Practice -Shuffle Reader in the implementation of data pull analysis of design ideas]
  • Spark business environment -StreamingContext startup process and Dtream template source code analysis
  • Spark Business Environment Combat n/A ReceiverTracker and BlockGenerator Data flow receiving process analysis

1 ShuffleExternalSorter Specifies the arena status of the external sorter

1. Shuffle Basic data structure

1.1 Derived topology of memory buffers

The above figure shows Spark’s data structure based on task cache. The following describes the uses of different data structures:

  • AppendOnlyMap: encapsulates the basic operation methods of inserting, updating, aggregating, and sorting tasks based on memory.

  • SizeTrackingAppendOnlyMap: with its own size and collect samples of size estimation.

  • PartionedAppendOnlyMap:

    (1) overloads the idiosyncratic WritablePartitionedPairCollection partitionedDestructiveSortedIterator method, In the virtual method calls the AppendOnlyMap destructiveSortedIterator to the underlying array to sort and sorted the iterator.

    (2) insert method overloading the WritablePartitionedPairCollection character, make its insertion, according to the partition (partition, key) as the key.

  • PartitionedPairBuffer: memory buffer structure, which is used to buffer sequential inserts without updating or aggregating. That is, there are no changeValue(aggregation) and update (update) operations.

2. Mission of ShuffleManager

2.1 ShuffleManager logical architecture design

ShuffleManager is a feature whose only implementation class is SortShuffleManager. HashShuffleManager has been removed in 2.0. So what is its main function? It is used to manage shuffle. SortShuffleManager relies on the storage system to implement the AppendOnlyMap (MapTask) task data transfer process in the Shuffle process to the memory and Spill to disks. This process involves complex operations such as sorting and aggregation.

3. Exquisite design of ExternalSorter architecture

3.1 ExternalSorter Function Analysis

As shown in Figure 2.1 logical Architecture, ExternalSorter is one of the underlying components of SortShuffleManager and provides the following functions.

  • (1) Store the output of Map task into the heap of JVM by partition. If aggregation function is specified, the data will be aggregated.
  • (2) Use a partitioning calculator to group keys into partitions, and then use a custom comparator to optionally sort the keys for each partition.
  • (3) Output each partition to a single file, search the partition through the index file, and obtain the partition through shuffle on the Reduce end.

3.2 Key members of ExternalSorter

  • Aggregator: an aggregator that aggregates the output data of map tasks.

  • Tioner: A partitioning calculator partioner that performs partitioning by key on the output data of a MAP task.

  • Ordering: An implementation class that orders the output data of a map task by key

  • Map: Did you find anything? Used for aggregation. New PartitionedAppendOnlyMap[K, C] : When an aggregator is set, the map uses this data structure to aggregate intermediate results in memory before overflows the intermediate results to disk.

       Data structures to store in-memory objects before we spill. Depending on whether we have an
       Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
       store them in an array buffer.
    Copy the code
  • Buffer: Did you find anything? When no aggregation is used, new PartitionedPairBuffer[K, C] is used. When no aggregator is set, the Map side uses this data structure to store intermediate results in memory before overflows them to disk.

  • KeyComparator: The default comparator used to compare keys against the hash value within a partition.

  • Spill (new ArrayBuffer[SpilledFile])

     private[this] case class SpilledFile(
     file: File,
     blockId: BlockId,
     serializerBatchSizes: Array[Long],
     elementsPerPartition: Array[Long])
    Copy the code
  • _peakMemoryUsedBytes: Peak size of data structures in memory, used for maybeSpillCollection judgment.

       Peak size of the in-memory data structure observed so far, in bytes
    Copy the code
  • initialMemoryThreshold : The collection of memory used to track the initial memory threshold, by SparkEnv. Get. Conf., getLong (” spark. Shuffle. Spill. InitialMemoryThreshold “, 5 * 1024 * 1024) to decide.

  • MyMemoryThreshold: initialMemoryThreshold is used to determine the maybe Spill.

    3.3 ExternalSorter core design – memory buffering

    After the Spark Map task is executed, data is written to disks. Caution Before writing the Map task to the disk, Spark sorts or aggregates the Map task output in the memory. Note that it’s in memory, not disk. Look at the source code below, a clear explanation of everything.

    • The mergeValue function merges the new Value into the result of the aggregation.

    • The createCombiner function is used to create the initial value of the aggregation.

    • The update function is used to merge the new value into the previously aggregated result when a new value is available, that is, when records.hasNext has a value, by calling mergeValue. Otherwise, the createCombiner function is called with value as the initial value for the aggregation.

    • You can use the partition index and key as arguments to the changeValue method that calls AppendOnlyMap.

    • The maybeSpillCollection function makes possible disk overflows.

    • InsertAll: If you want to achieve memory buffer, insertAll is MapTask data write entry.

      def insertAll(records: Iterator[Product2[K, V]]): Unit = { val shouldCombine = aggregator.isDefined if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), Update) ====> maybeSpillCollection(usingMap = true)}} else {// Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, Kv._2.asinstanceof [C]) ====> maybeSpillCollection(usingMap = false)}}Copy the code

    3.4 ExternalSorter core design – Memory overflow judgment

    • MaybeSpillCollection is used to determine whether the memory overflows. Although ExternalSorter uses PartitionedAppendOnlyMap and PartitionedPairBuffer, if the data volume is small, it will not be a problem. The system will receive OOM.

    • MaybeSpillCollection controls the rate at which data is written to disks. If the rate at which data is written to disks is too high, the disk I/O efficiency maybe reduced.

    • PartitionedAppendOnlyMap and PartitionedPairBuffer are used for memory sample collection and size estimation.

         private def maybeSpillCollection(usingMap: Boolean): Unit = {
          var estimatedSize = 0L
          if (usingMap) {
            estimatedSize = map.estimateSize()
            if (maybeSpill(map, estimatedSize)) {
              map = new PartitionedAppendOnlyMap[K, C]
            }
          } else {
            estimatedSize = buffer.estimateSize()
            if (maybeSpill(buffer, estimatedSize)) {
              buffer = new PartitionedPairBuffer[K, C]
            }
          }
      
          if (estimatedSize > _peakMemoryUsedBytes) {
            _peakMemoryUsedBytes = estimatedSize
          }
        }
      Copy the code
  • When the threshold is estimated to be exceeded, an overflow operation is performed based on myMemoryThreshold.

    * Spill some data to disk to release memory, which will be called by TaskMemoryManager * when there is not enough memory for the task. protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = acquireMemory(amountToRequest) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 LogSpillage (currentMemory) spill(collection) ====> Elementsread = 0 _memoryBytesSpilled += currentMemory releaseMemory() } shouldSpill }Copy the code

3.4 ExternalSorter core Design – Memory data overflow and drop disk

It’s going to be quite interesting, because I’m going to go into a lot of detail about Spark’s nine Swords in memory usage. Come on:

3.4.1 PartitionedAppendOnlyMap First Trick

Because the underlying PartitionedAppendOnlyMap is a hash store, the overflow process

  • (1) the first call partitionedDestructiveSortedIterator method, to implement the elements in the array to the low index.
  • (2) Then sort the elements according to the specified comparator, first by partition ID, then by key.
  • (3) Finally return the Iterator data Iterator, used to iterate outward from memory data.

3.4.2 ExternalSorter second trick

Discovered what, call PartitionedAppendOnlyMap destructiveSortedWritablePartitionedIterator, use the first type lonely nine swords. ==> Collate ==> sort ==> return iterator

      override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
        val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
        val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
        spills += spillFile
      } 
Copy the code

Rule 3.4.3 spillMemoryIteratorToDisk third trick

  • (1) Iteratively write to disk

  • (2) Create a unique blockId and file and call diskBlockManager to start writing the file.

  • (3) Sort according to partition order.

    private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator) : SpilledFile = { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. val (blockId, file) = diskBlockManager.createTempShuffleBlock() // These variables are reset after each flush var objectsWritten: Long = 0 val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics val writer: DiskBlockObjectWriter = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) // List of batch sizes (bytes) in the order they are written to disk val batchSizes = new ArrayBuffer[Long] // How many elements we have in each partition val elementsPerPartition = new Array[Long](numPartitions) // Flush the disk writer's contents to disk, and update relevant variables. // The writer is committed at the end of this process. def flush(): Unit = { val segment = writer.commitAndGet() batchSizes += segment.length _diskBytesSpilled += segment.length objectsWritten = 0 } var success = false try { while (inMemoryIterator.hasNext) { val partitionId = inMemoryIterator.nextPartition() require(partitionId >= 0 && partitionId < numPartitions, s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})") inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1 if (objectsWritten == serializerBatchSize) { flush() } } if (objectsWritten > 0) { flush() } else { writer.revertPartialWritesAndClose() } success = true } finally { if (success) { writer.close() } else { // This code path only happens if an exception was thrown above before we set success;  // close our stuff and let the exception be thrown further writer.revertPartialWritesAndClose() if (file.exists()) { if (! file.delete()) { logWarning(s"Error deleting ${file}") } } } } SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)Copy the code

    }

3.5 ExternalSorter core design – Memory buffers and overflow disk files are finally combined and delivered

ExternalSorter’s writePartitionedFile makes its debut. By consolidating memory and overwriting multiple files, each MapTask generates only one formal Block file.

Notice that the data of each partition is output in the final official file in the order of partition ID and key. That is, only one disk file is generated per MapTask. Prior to Spark1.6, because of HashSorter, a MapTask would generate N bucket files (depending on the number of Reduce tasks).

3.5.1 Let’s look at the code snippet for writePartitionedFile:

  • Spill reminder (1) nO files spill into disk (2)

  • Notice that there is an overflow file in the disk, let’s begin to read the disk file into memory and write it into a formal Block file once again.

    * Write all the data added into this ExternalSorter into a file in the disk store. This is * called by the SortShuffleWriter. def writePartitionedFile( blockId: BlockId, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics().shuffleWriteMetrics) if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { val partitionId = While (it.hasNext &&it.nextPartition () == partitionId) {it.writenext (writer)} val segment = writer.commitAndGet() lengths(partitionId) = segment.length } } else { // We must perform merge-sort; get an iterator by partition and write everything directly. for ((id, If (elements. HasNext) {for (elem < -elements) {====> writer.write(elem._1, elem._2) } val segment = writer.commitAndGet() lengths(id) = segment.length } } } writer.close() context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) lengths }Copy the code

Reminder of the merge iterator () reminder of the merge iterator () ¶

Using groupByPartition generates an IteratorForPartition iterator for each partition:

* Return an iterator over all the data written to this object, grouped by partition and * aggregated by the requested aggregator. For each partition we then have an iterator over its * contents, and these are expected to be accessed in order (you can't "skip ahead" to one * partition without reading the previous one). Guaranteed to return a key-value pair for each * partition, in order of partition ID. * * For now, we just merge all the spilled files in once pass, but this can be modified to * support hierarchical merging. * Exposed for testing. def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { val usingMap = aggregator.isDefined val collection: WritablePartitionedPairCollection [K] C = the if (usingMap) map the else buffer if spills. IsEmpty () {= = = = > he breathed his last bright spot of the ancient and modern, Judge again if we have only in-memory data, we don't need to merge streams, *and perhaps we don't even need to sort by anything other than partition ID if (! ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None))) } else { // We do Need to sort by both partition ID and key groupByPartition(destructiveIterator(==== collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } } else { // Merge spilled and in-memory data Merge (spills, destructiveIterator (collection. PartitionedDestructiveSortedIterator (comparator))) = = = = > he breathed his last the highlight of the ancient and modern}}Copy the code

IteratorForPartition: IteratorForPartition: IteratorForPartition: IteratorForPartition: IteratorForPartition: IteratorForPartition: IteratorForPartition: IteratorForPartition

       * Given a stream of ((partition, key), combiner) pairs 
       * assumed to be sorted by partition ID*,group together the pairs for each partition 
       *  into a sub-iterator.param data an iterator of elements, assumed to already be sorted
       * by partition ID

          private def groupByPartition(data: Iterator[((Int, K), C)])
              : Iterator[(Int, Iterator[Product2[K, C]])] =
          {
            val buffered = data.buffered
            (0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered)))
          }
Copy the code

IteratorForPartition: IteratorForPartition: IteratorForPartition: IteratorForPartition: IteratorForPartition: IteratorForPartition:

* An iterator that reads only the elements for a given partition ID from An * underlying buffered, stream, assuming this partition is the next one to be read. Used to * make it easier to return partitioned iterators from our in-memory collection. private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)]) extends Iterator[Product2[K, C]] { override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionId override def next(): Product2[K, C] = { if (! hasNext) { throw new NoSuchElementException } val elem = data.next() (elem._1._2, elem._2) } }Copy the code

Spill reminders Reminders of The Merge spill spill reminders

   * Merge a sequence of sorted files, giving an iterator over partitions and then over elements
   * inside each partition. This can be used to either write out a new file or return data to
   * the user.
   *
   * Returns an iterator over all the data written to this object, grouped by partition. For each
   * partition we then have an iterator over its contents, and these are expected to be accessed
   * in order (you can't "skip ahead" to one partition without reading the previous one).
   * Guaranteed to return a key-value pair for each partition, in order of partition ID.
Copy the code

Peaches and plumes do not speak, yet a path is formed beneath them. Merge finally returns an iterator that makes it easy to write formal files in partition order:

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) : Iterator[(Int, Iterator[Product2[K, C]])] = {val readers = impromptu map(new SpillReader(_)) ====> Read the disk to the memory val inMemBuffered = inmemory.buffered (0 until numPartitions).iterator.map {p => val inMemIterator = new IteratorForPartition(p, InMemBuffered) val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator) ====> If (aggregator.isDefined) { // Perform partial aggregation across partitions (p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator, } else if (order.isdefined) {// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey); // sort the elements without trying to merge them (p, mergeSort(iterators, ordering.get)) ====> Iterators. Iterator. Flatten) = = = = > he breathed his last bright spot of the ancient and modern, simple accumulation, output}}}Copy the code

3.6 Super Summary

After the Spark Map task is executed, it needs to be persisted. Therefore, the following situations occur:

  • Overflow files Merge the overflow files with the files in memory and write them to the disk.
  • No overflow file Sort the data in the memory and write it to the disk. The actual process is as follows:

4 the last

After several drafts, finally written, need to be further improved.

Qin Kaixin in Shenzhen