Mooring floating purpose in this paper, starting from Jane books: www.jianshu.com/u/204b8aaab…

version The date of note
1.0 2021.12.20 The article first

The foreword 0.

When I first encountered Flink, it was shared by some of the first players in the industry who would use it to process huge amounts of data. In this scenario, the question of how to avoid the StopTheWorld side effects of JVM GC kept nagging at me. It was only after using Flink and reading the relevant source code (based on 1.14.0) that I finally got some answers. I will share it with you in this article.

1. Insufficient JVM memory management

In addition to the StopTheWorld mentioned above, the JVM’s memory management causes the following problems:

  • Memory waste: A Java object is stored in memory in three parts: object header, instance data, and aligned padding. First, in 32-bit and 64-bit implementations, object headers occupy 32bit and 64bit, respectively. To provide overall efficiency, data in JVM memory is not stored consecutively, but in multiples of 8 bytes. Even if you only have 1byte, it will automatically padding7 bytes.
  • Cache miss: It is well known that the CPU has levels L1, 2, and 3 caches. When the CPU reads data from memory, it reads neighboring data from memory into the cache – a practice of the principle of program locality.The data recently accessed by the CPU will be accessed by the CPU for a short time. Data in the vicinity of the data accessed by the CPU is also accessed for a short period of time.However, as we mentioned earlier, Java objects are not stored contiguous on the heap, so when the CPU reads objects on the JVM, the adjacent memory region of the cache is often not needed for the CPU’s next calculation. The CPU then idles and waits for data to be read from memory (the speed is not of the same order of magnitude). If the data happens to be swapped to the hard drive, it’s even more difficult.

2. Flink evolution scheme

Prior to V0.10, Flink used an implementation of memory on the heap. Unsafe allocates memory and refers to it in byte arrays. The application layer maintains its own type information to retrieve data. But there are still problems:

  • In the case of too much heap memory, the JVM can take a long time to start and the Full GC can reach the minute level.
  • Low I/O efficiency: At least one memory replication is required to write data from the heap to the disk or network.

So after V0.10, Flink introduced out-of-heap memory management. See Jira: Add an off-heap variant of the managed Memory. In addition to solving the heap memory problem, there are several benefits:

  • Off-heap memory can be shared between processes. This means Flink can do some handy failover.

Of course, everything has two sides, and the downside is:

  • Allocating objects with short life cycles is more expensive to allocate on off-heap memory than on heap memory.
  • Troubleshooting is more complicated when out-of-heap memory fails.

This implementation can also be found in Spark, which is called MemoryPool and supports both in-heap and off-heap memory modes. For details, see Memorymode.scala. Kafka has a similar idea — it stores its messages through Java NIO’s ByteBuffer.

3. Source code analysis

In general, the implementation of Flink in this area is fairly clear – it has memory segments and data structures such as memory pages, just like the operating system.

3.1 the segment

The main implementation is MemorySegment. Before V1.12, a MemorySegment was just an interface, and its implementation had two HybridMemorySegment and HeapMemorySegment. In the later development, we found that HeapMemorySegment was basically not used by anyone, but HybridMemorySegment was used. In order to optimize performance — to avoid checking the function table every time when running to confirm the function called, HeapMemorySegment was removed. And moving the HybridMemorySegment to the MemorySegment — this results in a nearly 2.7x call speed optimization. : Off-heap Memory in Apache Flink and the Curious JIT Compiler and Jira: Don’t explicitly use HeapMemorySegment in raw format serde.

MemorySegment refers to and reads from memory segments — it supports basic types well, whereas complex types need to be serialized externally. The implementation is relatively simple, as can be seen from field’s declaration. The only thing to mention is LITTLE_ENDIAN: Different CPU architectures use different storage orders — PowerPCS use Big Endian, which stores the least significant bytes at the lowest address; X86 stores data in a Little Endian fashion, with low addresses storing the most significant bytes.

To be honest, I was a little shocked when I read this code, as I had written Java for so many years with almost no awareness of the underlying hardware. I didn’t expect Java code to consider CPU-architecture compatible logic.

How does MemorySegments work in Flink? We can look ata test case: testPagesSer in BinaryRowDataTest: MemorySegments writes data to RowData using the corresponding BinaryRowWriter, and then writes RowData to RandomAccessOutputView using BinaryRowDataSerializer:

    @Test
    public void testPagesSer(a) throws IOException {
        MemorySegment[] memorySegments = new MemorySegment[5];
        ArrayList<MemorySegment> memorySegmentList = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]);
            memorySegmentList.add(memorySegments[i]);
        }

        {
            // multi memorySegments
            String str = "La la la la la la I'm the happy painter, la la la la la la I'm the happy painter," + "La la la la la LA I am the happy painter.";
            BinaryRowData row = new BinaryRowData(1);
            BinaryRowWriter writer = new BinaryRowWriter(row);
            writer.writeString(0, fromString(str));
            writer.complete();

            RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64);
            BinaryRowDataSerializer serializer = new BinaryRowDataSerializer(1);
            serializer.serializeToPages(row, out);

            BinaryRowData mapRow = serializer.createInstance();
            mapRow =
                    serializer.mapFromPages(
                            mapRow, new RandomAccessInputView(memorySegmentList, 64));
            writer.reset();
            writer.writeString(0, mapRow.getString(0));
            writer.complete();
            assertEquals(str, row.getString(0).toString());

            BinaryRowData deserRow =
                    serializer.deserializeFromPages(
                            new RandomAccessInputView(memorySegmentList, 64));
            writer.reset();
            writer.writeString(0, deserRow.getString(0));
            writer.complete();
            assertEquals(str, row.getString(0).toString());
        }
     // ignore some code
    }
Copy the code

3.2 pages

By default, a MemorySegment corresponds to a 32KB chunk of memory. In stream processing, it is easy to have data larger than 32KB, which requires cross-memorySegment. Therefore, Flink provides the implementation of Memory pages, which hold multiple MemorySegment instances, so that framework developers can quickly write memory-related code. You don’t have to worry about memorysegments.

The abstractions are DataInputView and DataOutputView, which read and write data respectively.

Next, let’s look at the actual code. Take our most common use of KafkaProducer as an example:

| - KafkaProducer# invoke here / / specifies the serializedValue \ - KeyedSerializationSchema# serializeValue / / serialization record the valueCopy the code

We pick a look, achieve TypeInformationKeyValueSerializationSchema, for example:

| - TypeInformationKeyValueSerializationSchema# deserialize / / KeyedSerializationSchema implementation classes | - DataInputDeserializer#setBuffer // this is an implementation of DataInputView, which uses an internal byte array to store data. What's odd here is that memory seinterfaces are not used. | - TypeSerializer# deserialize / / it will implement according to different types, from the DataInputView read data backCopy the code

Actually, this is not a good example. KeyedSerializationSchema has been marked as obsolete. The community preferred to use KafkaSerializationSchema. The first reason is that the KeyedSerializationSchema abstraction is not appropriate for Kafka. When Kafka adds new fields to Record, it is difficult to abstract when the interface — the interface only focuses on keys, values, and topics.

To KafkaSerializationSchema, we can see a typical implementation – KafkaSerializationSchemaWrapper, the place where we care about are easy to find:

    @Override
    public ProducerRecord<byte[].byte[]> serialize(T element, @Nullable Long timestamp) {
        byte[] serialized = serializationSchema.serialize(element);
        final Integer partition;
        if(partitioner ! =null) {
            partition = partitioner.partition(element, null, serialized, topic, partitions);
        } else {
            partition = null;
        }

        final Long timestampToWrite;
        if (writeTimestamp) {
            timestampToWrite = timestamp;
        } else {
            timestampToWrite = null;
        }

        return new ProducerRecord<>(topic, partition, timestampToWrite, null, serialized);
    }
Copy the code

The declaration of serializationSchema is an interface named serializationSchema. You can see that there are a number of implementations, many of which correspond to DataStream and format in the SQL API. We take TypeInformationSerializationSchema continue to follow:

@Public
public class TypeInformationSerializationSchema<T>
        implements DeserializationSchema<T>, SerializationSchema<T> {

    //ignore some filed

    /** The serializer for the actual de-/serialization. */
    private finalTypeSerializer<T> serializer; .Copy the code

See the familiar TypeSerializer interface again. As mentioned above, its implementation interacts with DataInputView and DataOutputView for different types, providing serialization and deserialization capabilities. It can also be seen in its method signature:

/** * Serializes the given record to the given target output view. * * @param record The record to serialize. * @param target The output view to write the serialized data to. * @throws IOException Thrown, if the serialization encountered an I/O related error. Typically * raised by the output view, which may have an underlying I/O channel to which it * delegates. */ public abstract void serialize(T record, DataOutputView target) throws IOException; /** * De-serializes a record from the given source input view. * * @param source The input view from which to read the data. * @return The deserialized element. * @throws IOException Thrown, if the de-serialization encountered an I/O related error. * Typically raised by the input view, which may have an underlying I/O channel from which * it reads. */ public abstract T deserialize(DataInputView source) throws IOException; /** * De-serializes a record from the given source input view into the given reuse record instance * if mutable. * * @param reuse The record instance into which to de-serialize the data. * @param source The input view from which to read the data. * @return The deserialized element. * @throws IOException Thrown, if the de-serialization encountered an I/O related error. * Typically raised by the input view, which may have an underlying I/O channel from which * it reads. */ public abstract T deserialize(T reuse, DataInputView source) throws IOException; /** * Copies exactly one record from the source input view to the target output view. Whether this * operation works on binary data or partially de-serializes the record to determine its length * (such as for records of variable length) is up to the implementer. Binary copies are * typically faster. A copy of a record containing two integer numbers (8 bytes total) is most * efficiently implemented as {@code target.write(source, 8); }. * * @param source The input view from which to read the record. * @param target The target output view to which to write the record. * @throws IOException Thrown if any of the two views raises an exception. */ public abstract void copy(DataInputView source, DataOutputView target) throws IOException;Copy the code

So how exactly is TypeSerializer#deserialize called? Such details are not the concern of this article. Here we show the call chain. Interested readers can follow the call chain to see the code:

|-- TypeSerializer#deserialize |-- StreamElementSerializer#deserialize |-- TypeInformationKeyValueSerializationSchema#deserialize |-- KafkaDeserializationSchema#deserialize |-- KafkaFetcher# partitionConsumerRecordsHandler / / here already very clear, this is by FlinkKafkaConsumer new objectsCopy the code

3.3 the buffer pool

Another interesting class is LocalBufferPool, which encapsulates memorySegments. NetworkBuffer is a package of network exchanged data. When the ResultParition starts to write data, you need to apply for Buffer resources from the LocalBufferPool.

Write logic:

| | - Task# constructor / / construction task - NettyShuffleEnvironment# createResultPartitionWriters / / | - created for writing results of partitions ResultPartitionFactory# create \ - ResultPartitionFactory# createBufferPoolFactory here / / creates a simple BufferPoolFactory | - PipelinedResultPartition#constructor |-- BufferWritingResultPartition#constructor |-- SortMergeResultPartition#constructor or BufferWritingResultPartition#constructor |-- ResultPartition#constructor \-- ResultPartition#steup // Register the buffer pool to the result partitionCopy the code

In addition, NetworkBuffer realized Netty AbstractReferenceCountedByteBuf. This means that the classic reference counting algorithm is used, and buffers are recycled when they are no longer needed.

4. Other

4.1 Related Flink Jira

Here is a list of JIRas I referred to while writing this article:

  • Add an off – heap the variant of the managed memory:issues.apache.org/jira/browse…
  • Separate type specific memory segments.:issues.apache.org/jira/browse…
  • The Investigate potential out – of – the memory the problems due to managed the unsafe allocation:issues.apache.org/jira/browse memory…
  • Adjust the GC Cleaner for unsafe memory and Java 11:issues.apache.org/jira/browse…
  • FLIP – 49 Unified Memory Configuration for TaskExecutors:issues.apache.org/jira/browse…
  • Don ‘t explicitly use HeapMemorySegment in raw format serde:issues.apache.org/jira/browse…
  • Refactor HybridMemorySegment:issues.apache.org/jira/browse…
  • Use flink ‘s buffers in netty:issues.apache.org/jira/browse…
  • The Add copyToUnsafe, copyFromUnsafe and equalTo the to MemorySegment:issues.apache.org/jira/browse…