Apache Flink is an open source stream processing framework for distributed, high performance stream processing applications.

This article is shared from Huawei cloud community “[Cloud in Co-create] Hand to hand teach you to play Flink stream batch integrated distributed real-time processing engine”, the author: About the rabbit.

Apache Flink is an open source stream processing framework for distributed, high performance stream processing applications. Flink not only provides real-time computing that supports both high-throughput and exact-once semantics, but also batch data processing. Compared with other data processing engines on the market, it uses stream-based computing to simulate batch processing.

I. Flink principle and architecture

Flink profile

Apache Flink is an open source stream processing framework for distributed, high performance stream processing applications. Flink not only provides real-time computing that supports both high-throughput and exact-once semantics, but also batch data processing. Mainly implemented by Java code, it supports real-time streaming and batch processing, and batch data is only an extreme case of streaming data. Support for iterative computation, memory management and program optimization.

Compared to other data processing engines on the market, both Flink and Spark support both streaming and batch processing. However, the Spark concept is based on batch processing to simulate the computation of streams; Flink, on the other hand, uses stream-based computing to simulate batch processing.

Flink key mechanism

Four mechanisms: state, time, checkpoint, and window

There are four most important key mechanisms in Flink, which will be discussed in more detail later. Here we will introduce the basic concepts and main uses of Flink. First of all, the most important mechanism in Flink is the State mechanism. Flink is a stateful flow computing engine. The main function of state is that we Flink is a kind of flow calculation, which requires intermediate calculation results of storage nodes. In addition, the state preservation is also conducive to Flink fault tolerant recovery. Status is closely related to Flink’s Checkpoint, which is also the Checkpoint mechanism. Checkpoint can store Flink’s status, which is equivalent to making a snapshot for Flink to perform fault-tolerant recovery. In addition, as Flink is a stream computing engine, its data is continuously generated without boundaries, so we need a mechanism to segment data. We will use Time as the point of segmentation. In addition, Flink needs to know the Time point from which to recover for fault tolerance. So time is also an important mechanism in Flink. Finally, there is the window, which is used for data segmentation in Flink and convenient for data aggregation calculation.

Flink core concept

The biggest difference between Flink and other streaming computing engines is state management.

Flink provides built-in state management to store work-time state inside Flink rather than on an external system. The benefits of this:

  • It reduces the dependence of the computing engine on external systems, making deployment, operation and maintenance easier.

  • This provides a significant performance boost.

Flink Runtime architecture

Flink runtime architecture can be divided into three layers from bottom to top. At the bottom layer are some configuration modes of Flink. Flink can be installed in a single machine or a cluster, and can also be deployed in the cloud. In most cases, Flink is configured and installed in a cluster manner. It supports two cluster modes, one is Standalon, which uses Flink’s own resource scheduling manager. The other method is to configure and install the YARN.

YARN provides a dedicated resource manager. In the middle layer is Flink’s computing engine, which supports both streaming and batch processing and receives submissions from the upper-layer API. Runtime consists of two modules: DataStream API and DataSet API. Flink handles batch and stream datasets separately from dataset and datastream, but uses the same computing engine. Based on the two types of API, Flink also provides more high-level abstract APIS. The more abstract an API is, the weaker its expression ability is, but the stronger its data processing ability and abstraction are. In the upper Table API and SQL, it is mainly aimed at relational operations, that for relational data query, Flink provides a unified interface, based on streaming data API, and provides complex event processing API. Complex events refer to apis that handle events that do not have enough time to indicate the start, sequence, and end of the event. It also provides a machine learning API and a graph computing API for data and apis.

Flink Core Concepts – DataStream

DataStream: Flink uses the class DataStream to represent streaming data in the program. Users can think of them as immutable collections of duplicate data, and DataStream has an infinite number of elements.

As you can see from the figure, DataStream can use operators such as KeyBy. After processing and transforming DataStream, it will be converted to another DataStream, also known as keyedstream. So based on KeyedStream, we can further use the window operator, which is mainly Flink program design for some of the data flow processing.

Flink core concept – DataSet

DataSet: The Flink system can transform (e.g., filter, map, join, group) datasets that can be created from read files or from local collections. The results are returned by Sink, which can write data to a (distributed) file or standard output (such as a command line terminal)

Flink program

Flink program consists of Source, Transformation, and Sink. Source is mainly responsible for reading data and supports HDFS, Kafka, and text. Transformation Is used to convert data. Sink is responsible for final data output, supporting HDFS, Kafka, and text output. Data that flows between parts is called a stream.

Flink data source


Files: HDFS,Local file system, MapR file System; Text, CSV, Avro, Hadoop input formats

JDBC, HBase, and Collections

Stream processing:

Files, Socket Streams, Kafka, RabbitMQ, Flume, Collections, Implement Your own and sourceFunction.collecto

Flink program operation diagram

Flink is a master-slave architecture that generates JobManger and TaskManager at startup. In fact, there are two components in Flink program, one is called Resource Manager, which is mainly responsible for resource scheduling and management, and the other is called Dispatcher. It is mainly used for client, and JobManager is distributed and published. Let’s take a look at the specific operation process.

First, the user submits the Flink program, which translates into a logical data flow graph. The client receives the logical data flow diagram and submits it to JobManger, along with jar packages and dependencies. The JobManger receives the logical data flow diagram and turns it into a physical data flow diagram, which is actually executable and can be used to place tasks on TaskManager. In TaskManager, the resources it owns are divided into taskslots. Each TaskSlot is essentially a specific thread of the JVM. Each TaskSlot occupies a portion of the TaskManager’s resources, which are mostly allocated in memory. TaskSlot does not allocate CPU resources, so CPU resources are not isolated.

Flink Operation Process (I)

Users first submit Flink programs to JobClient, which are processed, analyzed and optimized by JobClient and submitted to JobManager. Finally, TaskManager runs the task.

In Flink, it submitted the task through JobClient, optimized, analyzed and processed the task submitted by JobClient, and submitted it to JobManager. JobManager converts the logical data flow diagrams submitted by jobClient into physical data flow diagrams and assigns these tasks to TaskManager. The TaskManager receives the task, processes it accordingly, and reports the status of the task to The JobManager, which then reports the results back to The jobClient.

JobClient is the bridge between Flink and JobManager. It is mainly responsible for receiving the program, parsing the program’s execution plan, optimizing the program’s execution plan and then submitting the execution plan to JobManager. There are three main types of operators in Flink.

Source Operator: Data Source operations, such as files, sockets, Kafka, etc.

Transformation Operator: Data conversion operations, such as map, flatMap, and reduce operators.

Sink Operator: data store operation. For example, data is stored in HDFS, Mysql, Kafka, etc.

A complete Flink procedure – Java

Flink data processing

Apache Flink supports both batch and stream processing and can also be used for event-based applications.

Firstly, Flink is a pure flow computing engine, and its basic data model is data flow. A flow can be an infinite flow without boundaries, that is, flow processing in the general sense. It can also be a finite stream with boundaries, which is batch processing. So Flink has an architecture that supports both stream and batch processing.

Second, one of Flink’s advantages is its support for stateful computing. If the result of processing an event (or a piece of data) is only relevant to the contents of the six events themselves, it is called stateless processing; The reverse is also associated with previously processed events, known as stateful processing.

Bounded and unbounded flows

Unbounded flow: a defined flow begins, but does not end. Data sources produce data endlessly. Data for an unbounded stream must be processed continuously, that is, immediately after it is read. You can’t wait until all the data has arrived, because the input is infinite and will never be complete at any time. Processing unbounded data often requires ingesting events in a particular order, such as the order in which they occurred, so that the integrity of the results can be inferred.

Bounded flow: There is the beginning and end of a defined flow. A bounded stream can be evaluated after all data has been read. Bounded streams all data can be sorted, so no sequential ingestion is required. Bounded stream processing is often referred to as batch processing.

Batch example

Batch processing is a very special case of stream processing. In flow processing, we define a slide window or scroll window for the data and generate results every time the window slides or scrolls. Batch processing is different. We define a global window where all records belong to the same window. For example, the following code represents a simple Flink program that counts visitors to a web site every hour and groups them by region.

If you know that the input data is limited, you can implement batch processing with the following code.

If the input data is finite, the following code will run the same as the code above.

Flink batch processing model

Flink supports both stream and batch processing through an underlying engine.

On top of the stream processing engine, Flink has the following mechanisms:

Checkpoint mechanism and state mechanism: for fault tolerant, stateful processing;

Watermarking mechanism: used to realize the event clock;

Windows and triggers: Used to limit the range of calculations and define when to render the results.

On top of the same streaming engine, Flink also has another set of mechanisms for efficient batch processing.

  • Backtracking for scheduling and recovery: introduced by Microsoft Dryad and now used on almost all batch processors;

  • Special in-memory data structures for hashing and sorting: some data can be spilled from memory to hard disk as needed;

  • Optimizer: Minimize the time to produce results.

Streaming and batch mechanisms

The two mechanisms correspond to their respective apis (DataStream API and DataSet API). When creating a Flink job, you can’t take advantage of all the capabilities of Flink by mixing the two together.

Flink supports two relational apis, Table APl and sQL. Both apis are unified APl for batch and stream processing, meaning that relational apis execute queries with the same semantics and produce the same results on both real-time data streams without boundaries and history data streams with boundaries.

  • The Table API/SQL is becoming the primary API for analytical use cases in a streambatch unified fashion.

  • The DataStream API is the primary API for data-driven applications and data pipelines.

2. Flink’s Time and Window

Time background

Time handling is critical in stream processor programming. As an example of counting, event stream data (such as server log data, web page click data, and transaction data) is constantly being generated, and we need to group events with keys and count events for each key at regular intervals. These are known as “big data” applications.

Time classification in stream processing

In the process of data flow processing, we often use the system processing time, namely, processing time as the time of an event. In fact, the system processing time is the time imposed on the event. Due to network delay and other reasons, the sequence of events cannot be well reflected.

In actual scenarios, the time of each event can be divided into three types:

  • Event Time, that is, the time when the event occurs;

  • Ingestion Time, the time when events arrive in the stream processing system;

  • Processing Time Indicates the time when an event is processed by the system.

Three examples of times

For example, a log enters Flink at the time of 2019-11-1210:00:00.123 and reaches Windows at the time of 2019-11-1210:00:01.234. The content of the log is as follows:

2019-11-0218:37:15.624 INFO Fail over to rm2

The 2019-11-02 18:37:15. 624 is the Event Time.

The 2019-11-12 10:00:00. 123 is Ingestion Time;

The 2019-11-12 10:00:01. 234 is Processing Time.

The difference between the three times

In actual situations, the actual sequence of events is different from the processing time of the system. These differences are mainly caused by network delay and processing time. As shown in the figure:

The abscissa represents Event Time and the ordinate represents Processing time. Ideally, the coordinates of EventTime and Processing Time should form a line with an Angle of 45 degrees. However, in actual applications, processing time lags behind eventTime, which results in inconsistent event arrival sequence.

Time semantics supported by Flink

Processing Time refers to the system Time of the machine on which the event data is processed by the Operator. It provides the best performance and lowest latency.

Event Time refers to the corresponding Time on the device when the data is generated, and this Time already exists in the data record before entering Flink.

Ingestion Time refers to the Time when event data enters Flink.

Summary of the Window

Streaming computing is a data processing engine designed to handle infinite datasets, an ever-growing set of essentially infinite data, whereas Window is a means of cutting infinite data into finite chunks for processing. Window is at the heart of infinite stream processing. It splits an infinite stream into buckets of finite size on which you can perform calculations.

The Window type

Windows can be divided into two categories based on application types:

  • CountWindow: Data-driven, generates a Window based on the specified number of data items, regardless of time.

  • TimeWindow: time driver that generates Windows based on time.

Apache Flink is a distributed computing framework that naturally supports infinite stream data processing. In Flink, Windows can split infinite stream into finite stream. In Flink, a Window can be either a Time Window or a Count Window.

TimeWindow classification

Timewindows can be divided into three categories based on the implementation principle of Windows: Tumbling Windows. Sliding Window and Session Window.

Rolling window

Slice the data according to the fixed window length. Features: Time aligned, fixed window length, no overlap.

Application scenario: Suitable for Bl statistics (aggregation calculation for each time period).

As an example, suppose you want to sum the values of the sensor output. The one-minute scroll window collects the values of the last minute and outputs the total at the end of the minute, as shown in the figure below.

The sliding window

Sliding window is a more generalized form of fixed window. Sliding window consists of fixed window length and sliding interval. Features: time alignment, fixed window length, overlap.

Application scenario: Collects statistics within the latest period (calculate the failure rate of an interface within the latest 5 minutes to determine whether to report an alarm).

Example: The one-minute sliding window computes the sum of values for the last one minute, but slides every half minute and outputs the results, as shown in the following figure.

The session window

The session window consists of a series of events combined with a timeout interval of a specified length, similar to the Session of a Web application, that is, a new window will be generated if no new data is received for a period of time. Features: Unaligned time.

Code definition

In Flink, a one-minute scroll window is defined as follows:


In Flink, a one-minute sliding window that slides every half minute (i.e., 30 seconds) looks like this:


Flink’s Watermark

Out-of-order problem

There is a process and time in the process of stream processing from event generation to flow through source and then to operator. Although in most cases, the data flowing to operator is in the time order of event generation, it cannot be ruled out that out of order is caused by network, distribution and other reasons, so called out of order. It means that the sequence of events received by Flink is not strictly in accordance with the Event Time of the events.

Once the order is out of order, if the operation of the window is determined only by eventTime, we cannot be sure whether all the data is in place, but we cannot wait indefinitely. At this time, there must be a mechanism to ensure that the window must be triggered to calculate after a certain time. This special mechanism, Is the Watermark.

Order sample

Example: An App will record all click behaviors of users and send back logs (if the network is not good, save them locally first and send back later). User A operated on the App at 11:02, and user B operated on the App at 11:03. However, the network of user A was not stable, and the log sending was delayed. As A result, we received the message from user B at 11:03 first, and then from user A at 11:02, which was out of order.


For infinite data sets, there was a lack of an effective way to measure data integrity, hence Watermark, a concept built around event time that characterized the integrity of a data stream. If events were measured in terms of processing time, everything was in order and perfect, and there was no need for Watermark. In other words, event time created an out-of-order problem, and Watermark was used to fix the out-of-order problem. For delayed elements, we cannot wait indefinitely. There must be a mechanism to ensure that after a certain time, the Window must be triggered to calculate. This particular mechanism, called Watermark, tells the operator that delayed messages should no longer be received.

  • Watermark is a mechanism to measure the progress of Event Time.

    Watermark is used to handle out-of-order events, and properly handle out-of-order events using the Watermark mechanism combined with Windows.

  • Watermark in the data stream is used to indicate that data with timestamp less than Watermark has already arrived, so the execution of window is also triggered by Watermark.

  • Watermark can be understood as a delay triggering mechanism. We can set the delay time t, and each time the system will verify the maximum maxEventTime among arrived data, and then determine that all data whose eventTime is less than maxEventtime-t have arrived. If there is a window whose stop time is equal to maxEventTime -t, the window is triggered to execute.

    Watermark is used to allow the application to balance delay and result correctness by itself

The principle of Watermark

How does Flink ensure that event-based Windows have processed all the data before they are destroyed?

That’s what watermark does. Watermark carries a monotonically increasing timestamp t. Watermark(t) means that all data with a timestamp less than or equal to T has already arrived, and data with a timestamp less than or equal to t will not come again, so the window can be safely triggered and destroyed.

When Flink receives the data, it generates a Watermark based on a set of rules, and the Watermark is the maExertT me of all the data that’s currently arriving. “- Extended N, so the Watermark is generated based on the timestamp that the data is carrying. If Watermark stops later than the current unactivated window, the execution of the corresponding window is triggered. Because EventTime is carried by data, Windows that are not fired will never be fired if new data cannot be retrieved during the run.

In the figure above, we set the maximum allowable delay arrival time to 2s, so the Watermark corresponding to the event with the timestamp of 7s was 5s, and the Watermark corresponding to the event with the timestamp of 12s was 10s. If our window was 1s-5s and window 2 was 6s~-10s, Then the Matermarker when the event with a timestamp of 7s arrives. The Watermark that triggers window 1 happens to trigger window 2 when an event with a timestamp of 12s arrives.

Watermark is the “window closing time” of the previous window, once the closing time is triggered, all data within the window based on the current time is added to the window. As long as the water level is not reached then no matter how long the actual time advances the window will not trigger the closing.

Delayed data

Watermark can handle out-of-order data, but you can’t get a perfect Watermark value in the real world. It was either unattainable or too expensive, so it would approximate the Watermark(t) in practice, and there was a low probability of receiving data before the timestamp, which Flink defined as “late Elements.” You can also specify the maximum time to be allowed at a “wise man” Window using the following code:

Delayed data processing mechanism

Delayed events are a special case of out-of-order events, which are different from normal out-of-order events in that they exceed the expectation of the Watermark, causing Windows to close before they arrive.

When the delayed event occurs, the window has been closed and the calculation result has been produced. There are three ways to deal with this situation:

  • Reactivate the closed window and recalculate to correct the result.

  • Collect delayed events for separate processing.

  • Treat delayed events as error messages and discard them.

Flink’s default processing method is direct discarding. The other two methods use Side Output and AllowedLateness respectively.

Side Output mechanism

The Side Output mechanism can place delayed events separately into a data stream branch, which is a by-product of the Window’s computed results for the user to retrieve and special process.

Side Output gets delayed data:

After allowedLateness is set, the late data can also trigger the window for output. By using sideOutput mechanism of Flink, these delayed data can be obtained in the following ways:

Allowed Lateness mechanism

Allowed Lateness The Allowed Lateness mechanism allows users to set a maximum Allowed latency. Flink will save the state of the window after the window closes until the allowed delay period is exceeded, during which delay events will not be discarded, but will trigger window recalculation by default. Because of the extra memory required to save the window state, and the possibility that each delayed event triggers a full window calculation if the window calculation uses ProcessWindowFunction APl, it is costly to allow too much latency and too many delayed events.

Four, Flink fault tolerance

Flink fault tolerance mechanism

To ensure fault-tolerant recovery of the program and state recovery when the program starts, Flink starts Checkpoint or triggers Savepoint for state saving.

  • Checkpoint mechanism. This mechanism ensures that when a real-time program is running, it can recover itself even if it encounters a sudden exception. Checkpoint is transparent to the user, so the user does not feel that the Checkpoint process exists.

  • The mechanism of the Savepoint. It is the global mirror of the program state at a certain point in time. After the program is upgraded, or the concurrency is modified, it can continue to start the recovery from the saved state bit. Savepoint is a snapshot of Checkpoint status at a specific time.


How does Flink guarantee exactly-once? It uses a feature called “Checkpoint” to reset the system back to the correct state in the event of a failure. Flink status is saved using the Checkpoint mechanism. The Checkpoint mechanism periodically creates distributed snapshots to back up status in the program.

Checkpoint Checkpoint mechanism

The asynchronous and lightweight distributed snapshot technology in Flink provides a fault tolerance mechanism that provides the ability to capture the status data of tasks/operators at the same time. Flink will periodically generate checkpoint barriers on the input data set, and divide the data in the interval period into corresponding checkpoint through the barrier. When an exception occurs, the Operator can restore the status of all operators from the last snapshot to ensure data consistency.

For applications with small space occupied by the state, the snapshot generation process is lightweight and frequently created, and the impact on Flink task performance is relatively small. During Checkpoint, status data is stored in a configurable environment, usually on the JobManager node or HDFS.

Checkpoint configuration

Flink does not open checkpoints by default, and the user must configure and open checkpoints in the program by calling the enableCheckpointing(n) method, where n is the interval of checkpoint execution in milliseconds.

Exact-once and at-least-once semantic selection

Exactly -once: Ensures end-to-end data consistency with high data requirements, data loss and data repetition are not allowed, and Flink performance is relatively weak;

At-least-once: a scenario that requires high latency and throughput but not high data consistency.

Flink uses the exact-once mode by default, and the semantic mode can be set using the setCheckpointingMode() method.


Checkpoint timeout

This parameter specifies the upper limit of the Checkpoint execution time. If the Checkpoint execution time exceeds this threshold, Flink interrupts the Checkpoint execution and processes the Checkpoint as timeout.

This indicator can be set using the setCheckpointTimeout method. The default value is 10 minutes.


Minimum interval between checkpoints

Set the minimum interval between two checkpoints to prevent the Checkpoint execution time from being too long due to excessive status data, resulting in excessive Checkpoint backlog. Flink applications will trigger Checkpoint operations. It consumes a large amount of computing resources and affects the performance of the entire application.


Maximum number of checkpoints for parallel execution

Set the number of checkpoints that can be executed simultaneously. By default, only one Checkpoint can run. Multiple checkpoints can be triggered based on the specified number of checkpoints to improve the overall Checkpoint efficiency.


External checkpoint

Setting up periodic external checkpoints and persisting state data to the external system does not clean up checkpoint data during a task stop. Instead, checkpoint data is kept in the external system medium and can be recovered from the external checkpoint.


How does the job recover data?

Flink allows Checkpoint retention on external media at Cancel; On the other hand, Flink has another mechanism called SavePoint.

Savepoints is a unique implementation of Checkpoints that use a mechanism that I’ve discovered recently. Savepoints users are triggered by manual command, and the results will be persisted to the specified storage paths, the purpose is to help users save in the process of upgrading and maintenance of the cluster in the system state data, avoid because of the outage operations or update applications such as normal termination caused by the operation of the application system is unable to return to the original state of computing, Therefore, end-to-end exact-once semantic guarantee cannot be achieved.

The Savepoint and Checkpoint

Checkpoint focuses on “fault tolerance”. If a Flink job fails unexpectedly and restarts, it can be restarted directly from the previous checkpoint without affecting the accuracy of the job logic. The focus of SavePoint is “maintenance”, that is, when Flink jobs need manual restart, upgrade, migration or A/B test under manual intervention, the overall status will be written into reliable storage first, and then recovered from SavePoint after maintenance.

Savepoints are created “via the checkpoint mechanism”, so savepoints are essentially special checkpoint.

Checkpoint is designed for the Flink Runtime itself, and Flink taskManagers periodically trigger snapshots and automatically clean them without user intervention. Savepoint is user-oriented and is triggered and cleaned entirely according to the needs of the user.

  • In the trigger management mode, Checkpoint is automatically triggered and managed by Flink. Savepoint is manually triggered and managed by the user

  • The Checkpoint function is mainly used to quickly recover tasks when exceptions occur, such as timeout exceptions caused by network jitter. Savepoint makes planned backups, such as code changes and concurrency adjustments

  • A lightweight Checkpoint automatically recovers from a fault and is cleared by default after operations stop. Savepoint persists and is stored in a standard format, allowing code or configuration changes to manually trigger recovery from Savepoint.

State storage mode -MemoryStateBackend

Construction mode:

MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots)

Storage mode:

– State: TaskManager memory. Checkpoint: JobManager memory.

Capacity limits:

Single state maxStateSize defaults to 5M; MaxStateSize <=akka. Framesize, default 10 M. · Total size does not exceed JobManager memory

Recommended scenarios: Local tests; Almost stateless jobs, such as ETL.

Status storage mode – FsStateBackend

Construction mode:

FsStateBackend(URI checkpointDataUri ,boolean asynchronousSnapshots)

Storage mode:

State: TaskManager memory;

CHeckpoint: an external file storage system (local or HDFS).

Capacity limits:

  • The total number of states on a Single TaskManager does not exceed its memory;

  • The total size does not exceed the configured file system capacity.

Recommended scenarios: regular use state jobs, such as minute-level window aggregation, Join; The HA job needs to be started. It can be used in production scenarios.

State storage – RocksDBStateBackend

Construction mode:

RocksDBStateBackend(URI checkpointDataUri ,boolean enableIncrementalCheckpointing)

Storage mode:

State: KV database on TaskManager (actual memory + disk);

CHeckpoint: an external file storage system (local or HDFS).

Capacity limits:

The total number of states on a single TaskManager does not exceed its memory + disk;

The maximum value of a Key is 2G.

The total size does not exceed the configured file system capacity.

Recommended scenarios: Large state jobs, such as day-level window aggregation; The HA job needs to be started. Undemanding work; It can be used in production scenarios.


This chapter mainly describes the architecture and technical principle of Flink, and the running process of Flink program. The point is that Flink stream processing is different from batch processing. In the long run, DataStream API should fully contain DataSet APl via bounded DataStream.

Click to follow, the first time to learn about Huawei cloud fresh technology ~