concept

1. Programs and data flows

Parallel data stream

Task and operator chains

2. Distributed execution

Workers, jobs, resources

3. Time and Windows

time

4. State and fault tolerance

Fault-tolerant checkpoints

State of the backend

5. Batch flow

Programs and data flows

The basic building blocks of a Flink program are streams and transforms (note that data sets are also streams internally). A stream is an intermediate result, whereas a transformation is an operation that takes one or more streams as input and computes one or more result streams from them.

When executed, the Flink program is mapped to a stream of data streamed by the stream with the transition operator. Each data flow starts with one or more. Source and end with one or more sinks. The data flow may resemble any directed acyclic graph (Dagg). (Allows special forms of loop iterative construction, omitted here for simplicity).

In most cases, there is a one-to-one correspondence between the transformations in the program and the operators in the data flow. However, sometimes a transformation may consist of more than one transformation operator.

 

Parallel data stream

Programs in Flink are parallel and distributed in nature. Streams split into flow partitions and operators split into operator subtasks. The tasks of the operator are performed independently on different threads and on different machines or containers.

The number of operator subtasks is parallel to that particular operator. The parallelism of a flow is always the parallelism of its generating operator. Different operators of a program may have different parallelism.

 

Streams can be in one-to-one (or forward) mode, or in redistribution mode:

  • A pair of first-class (for example, in source while map ()) preserves the partitioning and ordering of elements. This means that the map () operator will see the same elements in the same order as if they were by the source operator.
  • Redistribute streams (between) maps () and keyboard/window, as well as keyboard/window and sink) to change the partitioning of streams. The tasks of each operator send data to the tasks of the different targets based on the selected transformation. Examples are: keyBy()(repartition by hash code), (broadcast), or rebalance (random redistribution). The order between the. Reallocation elements is reserved only for each pair of send and receive tasks (for example, subtasks [1]). Maps (and sub-missions [2] keyboard/window).

Task and operator chains

For distributed execution, the Flink chain operator subtasks are merged into tasks. Each task is executed by a thread. Linking operators to tasks is a useful optimization: it reduces thread-to-thread switching and buffering overhead, improves overall throughput, and reduces latency. Link behavior can be configured in the API.

The example data flow in the figure below is executed by five subtasks and therefore by five parallel threads.

Distributed execution

Master, worker, client

The Flink runtime consists of two types of processes:

  • This master process (also known as jobman) coordinates distributed execution. They arrange tasks, coordinate checkpoints, coordinate fail-overs, etc. Always have at least one main process. The high availability setting will have multiple main processes, one of which is always the lead and the others are standby.
  • This worker process (also known as the task manager) performs the data flow of tasks (or, more accurately, subtasks), buffers and exchanges data. Streams. There must always be at least one working process.

Main processes and workflows can be started in any way: directly on the machine, through containers, or through resource frameworks such as yarn. Workers connect to the master, declare themselves available, and get assigned jobs.

This client is not part of the runtime and program execution, but is used to prepare and send data flows to the main program. After that, the client can disconnect or remain connected to receive progress reports. The client runs either as part of the Java/Scala program that triggers execution, or in a command line process.

 

Workers, jobs, resources

Each worker (task manager) is a JVM process and can perform one or more subtasks in a separate thread. To control how many tasks an employee accepts, the employee has invoked at least one task slot.

Each task slot represents a fixed subset of the task manager’s resources. For example, a TaskManager with three slots dedicates one-third of its managed memory to each slot. Timeslot resources means that subtasks do not compete for managed memory with subtasks from other jobs, but instead have a certain amount of reserved managed memory. Note that there is no CPU isolation and the timeslot currently only separates the managed memory of the task.

Therefore, by adjusting the number of task slots, users can define how subtasks are isolated from each other. Having a slot for each TaskManager means that each task group runs in a separate JVM (for example, can be launched in a separate container). Having multiple slots means that more subtasks share the same JVM. Tasks within the same JVM share TCP connections (via multiplexing) and heartbeat messages. They can also share data sets and data structures, reducing the overhead per task.

 

By default, Flink allows subtasks to share slots if they are tasks of different tasks, but from the same job. The result is that an entire assembly line can fit in a single slot. Allowing such slot sharing has two main benefits:

  • The task slots required by the Flink cluster are exactly the same as the maximum parallelism used in the job. There is no need to calculate how many tasks the program contains (with different parallelism).
  • It’s easier to get better utilization of resources. Without timeslot sharing, non-intensive source/map () tasks will block the same resources as resource-intensive ones. Window mission. By using timeslot sharing, increasing its parallelism from 2 to 6, you can take full advantage of timeslot resources while still ensuring that each TaskManager only gets a fair share of the heavy work.

Timeslot sharing behavior can be controlled in the API to prevent sharing where it is not desired. The mechanism for this is the resource group, which defines which (sub-) tasks can share slots.

As a rule of thumb, a good default number of task slots would be the number of CPU cores. With hyperthreading, each slot will accept two or more hardware thread contexts.

 

Time with Windows

Aggregate events (such as count, and) work slightly differently on a stream than batch. For example, it is not possible to first count all elements in a flow and then return a count, since flows are often infinite (unbounded). Instead, aggregations on streams (count, sum, etc.) are made by Windows, such as “Count the past 5 minutes,” or “Sum of last 100 elements.”

Windows can be time driven (e.g., every 30 seconds) or data driven (e.g., every 100 elements). It is common to distinguish between different types of Windows, such as tumbling Windows (no overlap), sliding Windows (overlap), and conversation Windows (active gaps).

 

More examples of Windows can be found below

time

When referring to time in a flow program (for example, defining a window), you can refer to different concepts of time:

  • Event Time Time when the event is created. It is typically described by a timestamp in an event, such as a timestamp attached by a production sensor or production service. Flink accesses the event timestamp timestamp specifier in the following way.
  • Ingestion time The time at which the event enters the Flink data stream at the source operator.
  • Processing time The local time of each operator that performs a time-based operation.

 

State and fault tolerance

While many operations in the data stream only look at one per event (for example, event parsers), some operations remember information across a single event (for example, window operators). These operations are called stateful.

The state of stateful operations is maintained in situations that can be thought of as embedded key/value stores. State is rigorously partitioned and distributed along with streams read by stateful operators. So, only in the key stream, in. Then keyBy(), and is limited to the value of the key of the current event. Adjusting the flow and state keys ensures that all state updates are local, ensuring consistency without transaction overhead. This alignment also allows Flink to reassign state and transparently adjust flow partitions.

 

Fault-tolerant checkpoints

To achieve fault tolerance. Stream replay and checkpoint. Checkpoints define points of agreement in the flow and state from which the flow data flow can be recovered and consistency maintained. (Precisely – processing semantics at a time). Events and status updates since the last checkpoint are replayed from the input stream.

Checkpoint intervals are a way of using recovery time (the number of events that need to be replayed) to trade fault-tolerant overhead during execution.

 

State of the backend

The exact data structure that stores the key/value index depends on the state back end chosen. One state back end stores data in an in-memory hash map, and the other state back end uses RocksDB as a key/value index. In addition to defining the data structure that holds the state, the state back end implements the logic to take a real-time snapshot of key/value state and store that snapshot as part of a checkpoint.

Batch high

Flink executes a batch program as a special case of a stream program, where the stream is bounded (finite number of elements). A data set is internally treated as a data stream. Thus, the above concepts apply equally to batch and stream programs, with a few exceptions:

  • Programs in the DataSet API do not use checkpoints. Recovery is achieved by fully replaying the stream. This is possible because the input is bounded. This makes the cost more in favor of recovery, but makes routine processing cheaper because it avoids checkpoints.
  • Stateful operations in the DataSet API use simplified in-memory/out-of-core data structures instead of key/value indexes.
  • The DataSet API introduces special synchronous (superstep-based) iterations that can only be used on bounded streams.