The first personal public account spark technology sharing, synchronous personal website Coolplayer.net, without my consent, all reprinting is prohibited

It should have been in this article a long time ago, but I thought it was too interesting, so I wrote a separate article.

Because UnsafeShuffleWriter involves unified memory management, you can refer to the previous article if you are not familiar with this area

The whole process

UnsafeShuffleWriter maintains a ShuffleExternalSorter, which is used to do external sort. In my last article, I explained what external sort is. External sort is to partially sort data and output data to disk. Merge global sort (ShuffleInMemorySorter); merge global sort (ShuffleInMemorySorter); merge global sort (ShuffleInMemorySorter) The sorted data is output to a segment of a temporary file after serialization and compression, and the seek position of each partition segment is recorded, so that the subsequent data of each partition can be read separately. The read stream can be read normally after decompression and deserialization.

The whole process is to continuously insert data into ShuffleInMemorySorter. If there is no memory, the ShuffleInMemorySorter allocates memory. If there is no memory, the ShuffleInMemorySorter allocates memory to a file.

Compare SortShuffleWriter with UnsafeShuffleWriter

The difference between UnsafeShuffleWriter SortShuffleWriter
The sorting way Finally, it is only the partition level sort The keys of the same partition are in order
aggregation No meal serialization, no aggregation Support the aggregation

Conditions for using UnsafeShuffleWriter

  • Aggregation or key sorting is not specified, because the key is not encoded into the sorting pointer, so there is only partition sorting

  • The original data is serialized first, and anti-sequence is no longer required. After the corresponding metadata is sorted, Serializer needs to support relocation and read the corresponding data at the specified location. KryoSerializer and Spark SQL custom serializers support this feature.

  • The number of partitions must be smaller than 16777216 because the partition number is represented by 24 bits.

  • Each partition uses 27 bits to represent record offset, so a record cannot be larger than this value.

Memory sort and output files

Let’s look at the example of sorting records. A standard sorting procedure requires storing a set of Pointers for records and using Quicksort to swap Pointers until all records are sorted. Due to the nature of sequential scanning, sorting usually yields a good cache hit ratio. However, sorting a set of Pointers has a low cache hit rate because each comparison requires dereferencing two Pointers that correspond to data in two random locations in memory.

So, how can we improve cache localization in sorting? One way to do this is to store the sort key of each record sequentially through Pointers. We use 8 bytes (partition ID as the key, and real pointer to the data) to represent a piece of data in a sort array. Each comparison sort operation only requires a linear lookup of each pair of pointer-keys without any random scans. If all records of the partion are sorted, it is good to directly sort the data in the data, which greatly improves performance.

Of course, UnsafeShuffleWriter uses RadixSort to sort data, which is very simple and I won’t introduce it. Under different clear can refer to this document at http://bubkoo.com/2014/01/15/sort-algorithm/radix-sort/

The memory allocated is recorded as a page in allocatedPages. The memory is free during spill. There is a currentPage that is currently in use.

Insert partionId + pageNumber + offset in page as an element each time a record is inserted into the page. LongArray RadixSort; after sorting, index the original data according to the pointer elements, so as to achieve partition level order.

Spill file, UnsafeShuffleInMemorySorter generated a data iterator that returns a row is sorted according to the partition id iterator, the iterator granularity of each element is a pointer, Corresponding to the PackedRecordPointer data structure, PackedRecordPointer [24 bit partition number][13 bit memory page number][27 bit offset in page] Then according to the pointer can get real record, at the very beginning to enter UnsafeShuffleExternalSorter has been serialized, so here is really into writing section array. Data of different partitons in a file is represented by fileSegment, and the corresponding information is stored in SpillInfo data structure.

Merge files

The partition index of each spill file is stored in the SpillInfo data structure. Before the Task ends, we perform a mergeSpills operation. If fastMergeEnabled and concatenation of compressed data is supported, the compressed data of the same partition can be directly connected together without decompression and deserialization. Decompression and buffer copying can be avoided by using an efficient data copying technology such as NIO’s transferTo.

Welcome to Spark technology sharing