Flink’s memory management uses explicit memory management and stores objects in a serialized manner, supporting both on-heap and off-heap. Flink is very similar to Spark’s MEMORY_ONLY_SER storage level in this respect, except that Spark still stores on-heap objects, whereas Flink only supports serialized objects.

Flink Memory Management Overview

Flink memory refers primarily to the memory resources provided by the TaskManager runtime. TaskManager consists of several internal components:

The actor system that communicates with processes such as JobManager, the IOManager that overflows data to disk and reads it back when memory is low, and the MemoryManager that manages memory. Actor systems and MemoryManagers require a lot of memory. Accordingly, Flink divides TaskManager’s runtime JVM heap into three zones: Network Buffers, MemoryManager, and Free (in streaming mode only Network Buffers and Free exist Two regions, because the operator does not need to cache a large amount of data read in at one time).

Flink official documents: the Detailed Memory Model: ci.apache.org/projects/fl…

The following table lists all of the memory components described above and references the Flink configuration options that affect the size of each component:

Component configuration options for the description framework heap memory taskmanager. Memory. The framework. The heap. The size is dedicated to Flink framework of the JVM heap memory (advanced options) task taskmanager heap memory. The memory. Task. Heap. The size Dedicated to Flink application of JVM heap memory can run operator and the user code managed memory taskmanager. Memory. Managed. Size taskmanager. Memory. Managed. The fraction is managed by Flink native memory, Reserved for sorting, hash tables, State of intermediate results cache and RocksDB back-end taskmanager frame outside the heap memory. The memory. The framework. The off - heap. The size is dedicated to heap of Flink framework directly or native memory tasks outside the heap memory (advanced options) Taskmanager. Memory. Task. Off - heap. The size is dedicated to Flink outside the heap of direct application to operators (or native memory network memory) taskmanager.memory.net work. Min Taskmanager.memory.net work. Max taskmanager.memory.net work. Fraction direct memory reserved for record data exchange between tasks (such as buffer used for transmission over the network), Taskmanager.memory.jvm-metaspace. Size Flink Specifies the size of the JVM process's metaspace taskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction Overhead Other JVMS reserve native memory: space for things like thread stacks, code caches, garbage collections, and so on, which is an upper limit to the hierarchical component of total process memoryCopy the code

Flink network memory transfer

The official Ververica for this article has been deleted, so I have to quote this link

1. Determine whether the current buffer data is available according to the Buffers buffer list obtained by InputGate. 2. To lock the current operator, add the applied memory in batches to the Buffers resource 3. Skip the header of the protocol packet according to protocol packet resolution. Add the BufferConsumer unit 4 consumed from the Buffer. Update the status information of the consumed buffers. 5. Add checkpoint barrier 6 to each buffer. Each requested Buffer packet has a fixed serial number at the head, depending on the format of the packet. The sequence number is stored in the priority queue. Notify downstream message consumption according to the message number of the priority queue 7. Notify downstream operators of the messages that can be consumedCopy the code

ResultSubPartition.java

/** * adds the given buffer. * * <p> This request can be executed synchronously or asynchronously, depending on the * execution. * * <p> <strong> Important </ strong> : Before you can add a new {@link BufferConsumer}, you must add * to be completed. For performance reasons, this operation is only performed when * data is read. Priority events can be added while the previous buffer consumer is still open, * in which case the open buffer consumer will be overwritten. * * @param Buffer consumes the buffer to be added (transferring ownership to the partition object) * @ParampartialRecordLength The length of bytes to skip in order to start with a complete start * from the base {@cite MemorySegment} location index 0 starts recording. * @return Returns true if the operation succeeds and bufferConsumer is queued. * @throws IOException if an error occurs while adding a buffer. */ public abstract Boolean add(BufferConsumer int partialRecordLength) throws IOException;Copy the code

PipelinedSubpartition.java

private boolean add(BufferConsumer bufferConsumer, int partialRecordLength, boolean finish) { checkNotNull(bufferConsumer); Final Boolean notifyDataAvailable; // Final Boolean notifyDataAvailable; int prioritySequenceNumber = -1; / / lock the current buffers singleton synchronized (buffers) {if (isFinished | | isReleased) {bufferConsumer. Close (); return false; } // Add the bufferConsumer and update the stats If (addBuffer(BufferConsumer, partialRecordLength)) { prioritySequenceNumber = sequenceNumber; } // updateStatistics(bufferConsumer); // Add checkpoint barrier increaseBuffersInBacklog(bufferConsumer) to the collection of each buffer; notifyDataAvailable = finish || shouldNotifyDataAvailable(); isFinished |= finish; } if (prioritySequenceNumber ! = -1) {// notifyPriorityEvent(prioritySequenceNumber); } if (notifyDataAvailable) {// notifyDataAvailable(); } return true; }Copy the code

BufferConsumer is defined to accept the network data transmitted from each operator and parse it into the format needed internally by Flink according to the protocol protocol

/** * is not a thread-safe class for generating {@link Buffer}. * * <p> It reads the data written by {@link BufferBuilder}. * Although it is not thread-safe and can only be used by one thread, this thread can be different from a thread. * Thread that uses/to write {@link BufferBuilder}. The pattern here is simple: one thread writes data to * {@link BufferBuilder}, and with {@link BufferConsumer} you can read from different threads. */ @NotThreadSafe public class BufferConsumer implements Closeable { }Copy the code
/** Pipelined only memory subpartition, can be used once. * * <p> whenever {@link resultSubparttion# add (BufferConsumer, Boolean)} add completed {@link BufferConsumer} or second * {@link BufferConsumer} (in which case we will assume the first completed), We'll * {@ link PipelinedSubpartitionView# notifyDataAvailable () notifies the} the view of reading by creating * {@ the link ResultSubpartition# createReadView new data availability (BufferAvailabilityListener)}. Unless called * {@link #flush ()} is always notified only when the first completed buffer is opened, and * then, the reader must empty the buffer by {@link #pollBuffer ()} until it shows that its return value * has no more buffers available. This causes the buffer queue to be empty or have a * remaining {@link BufferConsumer} not yet complete, and the notification will eventually start again from these places. * * < p > explicitly call {@ # link flush ()} will be forced to do this * {@ link PipelinedSubpartitionView# notifyDataAvailable () notifies the} * {@ the link BufferConsumer} exists in the queue. public class PipelinedSubpartition extends ResultSubpartition implements CheckpointedResultSubpartition, ChannelStateHolder { private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class); / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- all the buffer / * * the child partition. Access to the buffer is synchronized on this object. . */ private final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>(); /** The current number of non-event buffers in this subpartition */ @guardedby ("buffers") private int buffersInBacklog; / * * read view consume this sub-domain. * / private PipelinedSubpartitionView readView; */ private Boolean isFinished; @GuardedBy("buffers") private boolean flushRequested; /** A flag indicating whether a subpartition has been released. /** Buffers. */ private long totalNumberOfBuffers; /** Total bytes (data buffer and event buffer). */ private long totalNumberOfBytes; Private final List<Buffer> inflightBufferSnapshot = new ArrayList<>(); private final List<Buffer> inflightBufferSnapshot = new ArrayList<>(); }Copy the code
/** * returns a reserved copy with a separate index. This allows two reads from the same {@link MemorySegment}. * * <p> Warning: the newly returned {@link BufferConsumer} will copy its reader index from its original buffer. * In other words, data consumed prior to replication will not be visible to the returned copy. Public BufferConsumer Copy () {return new BufferConsumer(buffer.retainBuffer(), writerPosition.positionMarker, currentReaderPosition); }Copy the code

Continue.. QwQ