This article was first published at www.yuque.com/17sing

version The date of note
1.0 2022.3.8 The article first

The analysis is based on Flink 1.14 code.

The foreword 0.

Recently met a strange phenomenon: in the production of the amount of homework from otherwise normal operations, the log is filled with Java. Util. Concurrent. TimeoutException: Heartbeat of TaskManager with ID container XXXX (HOSTNAME:PORT) timed out failed.

In the scenario where Oracle is fully pumped to Hive, data flows through Kafka at the T level, and one partition is created every day based on the time field. The Job is responsible for extracting Kafka data and writing it to Hive using TableAPI.

1. Check your thinking

When this problem was reported to me, some students had already gone through a round of screening. According to the online search, you will be informed that yarn is under too much pressure or the network is temporarily unstable. You can adjust the heartbeat. Timeout to alleviate the problem, but the problem is not resolved.

Another statement will tell you why GC is frequent. It is recommended to adjust the memory, after adjustment, there is a certain effect (the time of the problem becomes slower). That obviously has something to do with the code.

Since the previous version had no problems with data synchronization, I started to look for recent code changes, and after several rounds of searching, I did not find any suspicious code. Suddenly feel a little scalp pins and needles. So let the students switch to the previous version to continue to do full, the phenomenon will still happen.

At this point, I was a little skeptical about the characteristics of the production environment — such as the data characteristics, but the students on site told me that there was nothing special about the data. So I ordered a live heapdumps, lost to view analysis software, found org. Apache. Flink. Streaming.. API functions provides. Sink. Filesystem. The object of the Bucket.

So we look at the definition of a Bucket object:


/**
 * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
 *
 * <p>For each incoming element in the {@code StreamingFileSink}, the user-specified {@link
 * BucketAssigner} is queried to see in which bucket this element should be written to.
 */
@Internal
public class Bucket<IN.BucketID> {
Copy the code

Good boy. One directory and one object. At this moment, I have already doubted the “data is nothing special” that my classmates told me on the scene, but in order to confirm, I still followed the code:

|-- HiveTableSink
   \-- createStreamSink
|-- StreamingFileSink
  \-- initializeState
|-- StreamingFileSinkHelper
  \-- constructor
|-- HadoopPathBasedBulkFormatBuilder
  \-- createBuckets
|-- Buckets
  \-- onElement
  \-- getOrCreateBucketForBucketId
Copy the code

After going through the code, I have a number in mind. I asked the scene whether the time span of synchronized data was particularly large. After the students on the scene confirmed, the time span was more than 3 years. It is recommended to reduce the time span or partition time. Finally, the problem was solved after the full batch was sliced.

2. Curiosity after problem solving

If each directory produces a Bucket, it’s only a matter of time before you run a stream job and run into the same problem. Such an obvious question must have been expected by the community gods, and my curiosity kept me searching for an answer — until I came across this code:

    public void commitUpToCheckpoint(final long checkpointId) throws IOException {
        final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
                activeBuckets.entrySet().iterator();

        LOG.info(
                "Subtask {} received completion notification for checkpoint with id={}.",
                subtaskIndex,
                checkpointId);

        while (activeBucketIt.hasNext()) {
            final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
            bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);

            if(! bucket.isActive()) {// We've dealt with all the pending files and the writer for this bucket is not
                // currently open.
                // Therefore this bucket is currently inactive and we can remove it from our state.activeBucketIt.remove(); notifyBucketInactive(bucket); }}}Copy the code

A Checkpoint commit determines whether to remove the data structure maintained in memory based on whether the Bucket is active.

So what is active? The code is short:

    boolean isActive(a) {
        returninProgressPart ! =null| |! pendingFileRecoverablesForCurrentCheckpoint.isEmpty() || ! pendingFileRecoverablesPerCheckpoint.isEmpty(); }Copy the code

The next step is to clarify the three triggers.

2.1 inProgressPart = = null

This object is of type InProgressFileWriter, and the trigger condition is related to the FileSystem’s rolling policy.


/**
 * The policy based on which a {@code Bucket} in the {@code Filesystem Sink} rolls its currently
 * open part file and opens a new one.
 */
@PublicEvolving
public interface RollingPolicy<IN.BucketID> extends Serializable {

    /**
     * Determines if the in-progress part file for a bucket should roll on every checkpoint.
     *
     * @param partFileState the state of the currently open part file of the bucket.
     * @return {@code True} if the part file should roll, {@link false} otherwise.
     */
    boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException;

    /**
     * Determines if the in-progress part file for a bucket should roll based on its current state,
     * e.g. its size.
     *
     * @param element the element being processed.
     * @param partFileState the state of the currently open part file of the bucket.
     * @return {@code True} if the part file should roll, {@link false} otherwise.
     */
    boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element)
            throws IOException;

    /**
     * Determines if the in-progress part file for a bucket should roll based on a time condition.
     *
     * @param partFileState the state of the currently open part file of the bucket.
     * @param currentTime the current processing time.
     * @return {@code True} if the part file should roll, {@link false} otherwise.
     */
    boolean shouldRollOnProcessingTime(
            final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException;
}

Copy the code

These three interfaces correspond to whether the currently open file should be closed under certain circumstances:

  • ShouldRollOnCheckpoint: check before Checkpoint.
  • ShouldRollOnEvent: Check whether it should be closed based on the current state. For example, if the current buffer size exceeds the limit.
  • ShouldRollOnProcessingTime: check whether the currently open time is too long to set judgment in closed condition.

2.2 pendingFileRecoverablesForCurrentCheckpoint isNotEmpty

Elements are also triggered according to RollingPolicy without too much explanation.

2.3 pendingFileRecoverablesPerCheckpoint isNotEmpty

Based on the pendingFileRecoverablesForCurrentCheckpoint isNotEmpty. Use a dictionary to preserve a CheckpointId with the List < InProgressFileWriter. PendingFileRecoverable > relationship.

2.4 Inactive Bucket

An inactive Bucket is a directory that has been Checkpoint closed. The timing of the inspection is generally:

  1. When the Task is restored, it reads the previous state from StateBackend and checks it
  2. After a Checkpoint, a check is performed

When the Bucket becomes Inactive, an Inactive notification is made. Inform downstream that data for the partition has been committed and become readable. See issue: Artition commit is delayed when records keep coming

3. Clean architecture in FileSystemConnector

After understanding the above knowledge points, I noticed such a Proposal: FLIP-115: Filesystem connector in Table. According to this Proposal, I briefly looked through the relevant source code and found that its implementation is also a manifestation of a clean architecture.

Above we have done the source code analysis, next we on the inside of the abstract design and responsibility, hierarchy analysis:

| - HiveTableSink # API level Table, is responsible for the foreign, the user can directly call | - level StreamingFileSink # Streaming API, can also be foreign, Underneath their TableAPI | - # StreamingFileSinkHelper integration for logic TimeService, easy to close the Bucket on a regular basis; And distribution of data to buckets. This class is also used by AbstractStreamingWriter, Comments on Suggestions after used RichSinkFunction or StreamOperator | - BucketsBuilder # scenario is transferred to the concrete class HadoopPathBasedBulkFormatBuilder, This class will focus on the implement in the realization of a BucketWriter | Buckets, Bucket Buckets # this is a management class of the life cycle. Several key members of the object | - BucketWriter # will correspond to a specific FileSystem with written Format | - RolingPolicy # rolling strategy, mentioned earlier, No longer discussed BucketAssigner # |, decided to output to which each element in the Bucket. Such as the key or the date, etc. | - BucketFactory # is responsible for creating each BucketCopy the code

Due to the fine granularity of responsibility segmentation, the data flow logic is decoupled from the external concrete implementation. Let’s take a few examples:

  1. If we were to call Hive writes based on our own DSL, we would only need to write a sumHiveTableSinkSimilar HiveDSLSink.
  2. If a warehouse (data lake) is constantly adding support to its underlying file system, then when the first set of code is built, the subsequent implementation of the correspondingBucketWriterandFileSystemCan.
  3. If a data store (data lake) is constantly adding its own Format support, then when the first set of code is constructed, the subsequent implementation of the correspondingBucketWriterCan.

With this design, the core logic tends not to change, and the easily changeable parts are isolated, the quality of the whole module will be more easily guaranteed.