1, configuration,

  • taskmanager.network.sort-shuffle.min-parallelism

    Core configuration. Set the boundary between Hash Shuffle and Sort Shuffle. If the number of concurrent requests is greater than the value, Sort Shuffle is used. The default value is int maximum, which uses Hash Shuffle.

  • taskmanager.network.sort-shuffle.min-buffers

    The minimum number of buffers used by the Result Partition of each Sort Shuffle is 64 by default. 2048 is recommended for production. However, the network memory configuration may need to be increased.

  • taskmanager.network.blocking-shuffle.compression.enabled

    Whether compression is enabled

2. Initial creation

SortMergeResultPartition is created in the create() method of the ResultPartitionFactory. There are three types of SortMergeResultPartition: PipelinedResultPartition, SortMergeResultPartition, BoundedBlockingResultPartition. PipelinedResultPartition is used in flow mode and the others are used in batch mode. The branching conditions for creating a SortMergeResultPartition are as follows

else if (type == ResultPartitionType.BLOCKING
                || type == ResultPartitionType.BLOCKING_PERSISTENT) {
            if (numberOfSubpartitions >= sortShuffleMinParallelism)

3. Member variables

  • NUM_WRITE_BUFFER_BYTES

    The value is an int, which indicates the buffer size for data writing. Currently, the buffer size is fixed at 16 MB and cannot be configured.

  • resultFile

    PartitionedFile is a persistent file of sort-merge Shuffle. It contains two files:.shuffle.data and.shuffle.index. The file root directory is TMP.

Data files are divided into multiple regions. In each region, data of the same subpartition is stored adjacent to each other. The index entries are of type (long, int), where long represents the file offset and int represents the number of buffers.

  • writeSegments

    List<MemorySegment> Type, the buffer cut from the network buffer for writing data. NumRequiredBuffer is derived from a value in the ResultPartitionFactory based on the Shuffle type. Its source value is set to taskManager.net work.sort-shuffle.min-buffers

int expectedWriteBuffers = NUM_WRITE_BUFFER_BYTES / networkBufferSize;
if (networkBufferSize > NUM_WRITE_BUFFER_BYTES) {
    expectedWriteBuffers = 1;
}

int numRequiredBuffer = bufferPool.getNumberOfRequiredMemorySegments();
int numWriteBuffers = Math.min(numRequiredBuffer / 2, expectedWriteBuffers);
int numRequiredBuffers = ! type.isPipelined() && numberOfSubpartitions >= sortShuffleMinParallelism ? sortShuffleMinBuffers : numberOfSubpartitions + 1;
  • networkBufferSize

    Int value, the network buffer and write the size of the buffer (the size of the buffer), the pageSize value source, by the taskmanager. Memory. Segment – the size set

  • fileWriter

    PartitionedFileWriter type, the file output of this ResultPartition.

  • subpartitionOrder

    Int [] type, partition order, used to write data files partition order.

  • readScheduler

    SortMergeResultPartitionReadScheduler type, partition data read from the scheduler.

  • numBuffersForSort

    Int specifies the number of buffers available for unicastSortBuffer and broadcastSortBuffer.

  • broadcastSortBuffer

    SortBuffer type, used as the buffer used by broadcastRecord

  • unicastSortBuffer

    The SortBuffer type is used for broadcastRecord

4. Write the shuffle file

Based on the content of the data sent and received, data transmission starts with the collect method of RecordWriteOutput

RecordWriteOutput.collect()
    ->pushToRecordWriter()
        ->RecordWriter.emit()
            ->ResultPartitionWriter.emitRecord()
                ->SortMergeResultPartition.emitRecord()

4.1. Obtain the SortBuffer

To determine whether the data is Broadcast, create a new buffer based on the condition and return it

SortBuffer sortBuffer = isBroadcast ? getBroadcastSortBuffer() : getUnicastSortBuffer();

The getUnicastSortBuffer() method does two things: 1. Flush Broadcast’s buffer; 2. Create a new buffer and return it. See chapter 5 for details.

private SortBuffer getUnicastSortBuffer() throws IOException { flushBroadcastSortBuffer(); if (unicastSortBuffer ! = null && ! unicastSortBuffer.isFinished()) { return unicastSortBuffer; } unicastSortBuffer = new PartitionSortedBuffer( lock, bufferPool, numSubpartitions, networkBufferSize, numBuffersForSort, subpartitionOrder); return unicastSortBuffer; }

4.2. Append Data

This step writes the generated data into the SortBuffer generated in the previous section. Note the criteria here, when the data is too large for the buffer to write, the downward execution will be performed. Otherwise, the exit method after the write is complete

if (sortBuffer.append(record, targetSubpartition, dataType)) {
    return;
}
// return false directly if it can not allocate enough buffers for the given record if (! allocateBuffersForRecord(totalBytes)) { return false; }

When data is written, metadata information is first written in the front

// write the index entry and record or event data
writeIndex(targetChannel, totalBytes, dataType);
writeRecord(source);

4.3 Processing of insufficient Buffer

This step is the follow-up to step 4.2 when the buffer is insufficient. If all data has been read, release the buffer and use other methods to write excessive data

if (! sortBuffer.hasRemaining()) { // the record can not be appended to the free sort buffer because it is too large sortBuffer.finish(); sortBuffer.release(); writeLargeRecord(record, targetSubpartition, dataType, isBroadcast); return; }

4.4 Insufficient buffer data is not read

In this step, follow 4.3. When the buffer is insufficient and data is not written to the shuffle file, add the operation to shuffle out the file and call the data write method again

flushSortBuffer(sortBuffer, isBroadcast);
emit(record, targetSubpartition, dataType, isBroadcast);

5. About sorting

Partitionsortedbuffers are sorted buffers that rely on an internal list of MemorySegments as buffers. Some of the related member variables are as follows: index and segment use the same list of Memorysegments

/** A segment list as a joint buffer which stores all records and index entries. */ @GuardedBy("lock") private final ArrayList<MemorySegment> segments = new ArrayList<>(); /** Addresses of the first record's index entry for each subpartition. */ private final long[] firstIndexEntryAddresses;  /** Addresses of the last record's index entry for each subpartition. */ private final long[] lastIndexEntryAddresses; /** Array index in the segment list of the current available buffer for writing. */ private int writeSegmentIndex; /** Next position in the current available buffer for writing. */ private int writeSegmentOffset;

5.1. Segment Application

Emit ()->append()->allocateBuffersForRecord() emit()->append()->allocateBuffersForRecord() emit()->append()->allocateBuffersForRecord() When the segment is insufficient, request new resources from bufferPool. Note that initially, the list of segments is empty, so it must be applied initially. Note that it is possible to write more than one segment, as shown in the following: writeSegmentOffset is the place where the current segment is written, and if there is enough left, data will be written.

int availableBytes =
    writeSegmentIndex == segments.size() ? 0 : bufferSize - writeSegmentOffset;

// return directly if current available bytes is adequate
if (availableBytes >= numBytesRequired) {
    return true;
}

5.2, writeIndex

At the flopped layer, index and data are partitioned. The PartitionedFile is defined as follows

public static final String DATA_FILE_SUFFIX = ".shuffle.data";

public static final String INDEX_FILE_SUFFIX = ".shuffle.index";

The writeIndex method of the PartitionSortedBuffer completes writing the index to the segment. The details are as follows

5.2.1 Obtain the currently available segment

Use the writeSegmentIndex internally to record the segments in the segments list

MemorySegment segment = segments.get(writeSegmentIndex);

5.2.2 write index to segment

Write index to segment, an index is a long data, 64 bits. The higher 32 bits record the data length and the lower 32 bits record the data type. Here uses the long64 – bit, int32 – bit, bit operation related knowledge. << is the left shift symbol

// record length takes the high 32 bits and data type takes the low 32 bits
segment.putLong(writeSegmentOffset, ((long) numRecordBytes << 32) | dataType.ordinal());

5.2.3. Update the index of the last data of the partition

Update the index of the last data of the corresponding partition. LastIndexEntryAddresses is a list of lastIndexEntryAddresses that correspond to the number of partitions in each entry. The indexEntryAddress is also a data of long type. The higher 32 bits refer only to the subindex of the segment in the segments list, and the lower 32 bits refer to the offset within the segment. This structure is a basis for subsequent ordering.

// segment index takes the high 32 bits and segment offset takes the low 32 bits
long indexEntryAddress = ((long) writeSegmentIndex << 32) | writeSegmentOffset;

long lastIndexEntryAddress = lastIndexEntryAddresses[channelIndex];
lastIndexEntryAddresses[channelIndex] = indexEntryAddress;

5.2.4 Data Association before and after partitioning

This step appoints the index of the new data to the index of the previous data. If there is no previous data, put firstIndexEntryAddresses directly to indicate that the current data is the oldest data in the partition

if (lastIndexEntryAddress >= 0) { // link the previous index entry of the given channel to the new index entry segment =  segments.get(getSegmentIndexFromPointer(lastIndexEntryAddress)); segment.putLong(getSegmentOffsetFromPointer(lastIndexEntryAddress) + 8, indexEntryAddress); } else { firstIndexEntryAddresses[channelIndex] = indexEntryAddress; }

Above, getSegmentIndexFromPointer and getSegmentOffsetFromPointer respectively to obtain the segment in the list of subscript and segment internal offset, as mentioned in 5.2.3 requires

private int getSegmentIndexFromPointer(long value) {
    return (int) (value >>> 32);
}

private int getSegmentOffsetFromPointer(long value) {
    return (int) (value);
}

GetSegmentOffsetFromPointer (lastIndexEntryAddress) + 8 means: 8 is 8 bytes, or 64 bit. As described in 5.2.2, this is the length of the index of a data, that is, the index of the current data added after the index of the previous data. ** Allocate the index space after the segment index. See section 5.2.5 below and the member variable INDEX_ENTRY_SIZE, which is a value of 4+4+8, that is, the length of the current index + the length of the next index reserved. 支那

5.2.5. Update the value of a public variable

This step updates the writeSegmentOffset value, which is the internal offset of the segment. As you can see, two 64-bit offsets are offset at once, which is the location of the two indexes, associated with 5.2.4 append indexes

// move the write position forward so as to write the corresponding record updateWriteSegmentIndexAndOffset(INDEX_ENTRY_SIZE); private void updateWriteSegmentIndexAndOffset(int numBytes) { writeSegmentOffset += numBytes; // using the next available free buffer if the current is full if (writeSegmentOffset == bufferSize) { ++writeSegmentIndex; writeSegmentOffset = 0; }}

5.3, writeRecord

To be continued