Finishing | the Aven (Flink community volunteers)

Abstract: This article is shared by Apache Flink Committer, Bytedance Architecture r&d engineer Benchao Li, with four chapters to introduce the application of Flink in byte. As follows:

  • Overall introduction
  • Practice optimization
  • Flow of an organic whole
  • The future planning

First, the overall introduction

In December 2018, Blink announced open source, and Flink 1.9 was released on August 22, 2019, after about a year. Prior to the release of Flink 1.9, Bytedance built an internal SQL platform based on the Master branch. After 2-3 months, Byte internal released the Streaming SQL platform based on Flink 1.9’s Blink Planner in October, and promoted it internally. In the process, I found some interesting requirements scenarios, as well as some strange bugs.

Flink SQL extension based on 1.9

While the latest Flink releases already support DDL for SQL, Flink 1.9 does not. Byte internal DDL extensions based on Flink 1.9 support the following syntax:

  • create table
  • create view
  • create function
  • add resource

The watermark definition, which was not supported in Flink 1.9, was also supported with the DDL extension.

We get a lot of feedback that SQL cannot express complex business logic when we recommend that you do your homework in SQL. After a long time, I found that some of the so-called complex business logic of many users was to do some external RPC calls. An RPC dimension table and sink were made for this scenario in bytes, so that users could read and write RPC services, which greatly expanded the application scenario of SQL. Including FaaS is similar to RPC. Inside the byte adds Redis/Abase/Bytable/ByteSQL/RPC/FaaS dimension table support.

Also implements multiple connectors for internal use:

  1. source: RocketMQ
  2. sink:

RocketMQ/ClickHouse/Doris/LogHouse/Redis/Abase/Bytable/ByteSQL/RPC/Print/Metrics

A format of PB/Binlog/Bytes has been developed for connector.

Online interface SQL platform

In addition to the extensions to Flink’s own functionality, a SQL platform has been launched within Byte to support the following features:

  • The SQL editor
  • SQL parsing
  • SQL debugging
  • Custom UDF and Connector
  • Version control
  • Task management

Second, practice optimization

In addition to the functional extensions, some optimizations have been made to address the shortcomings of Flink 1.9 SQL.

Window performance optimization

1. Windows Mini-Batch is supported

Mini-batch is a characteristic function of Blink Planner. Its main idea is to accumulate a Batch of data and conduct a state visit again, so as to reduce the number of state visits and reduce the cost of serialization and deserialization. This optimization is mainly in the RocksDB scenario. Mini-batch is not optimized if it is in Heap state. In some typical business scenarios, the feedback is about a 20-30% reduction in CPU overhead.

2. Extend the window type

At present, there are three built-in Windows in SQL, namely, scroll window, sliding window and session window. These three semantic Windows cannot meet the needs of some user scenarios. For example, in a live broadcast scenario, the analyst wanted to calculate UV(Unique Visitor) and Gross Merchandise Volume (GMV) for each hour of an anchor’s broadcast. The natural scrolling window partition can not meet the needs of users, so some customized Windows are made inside the byte to meet some common needs of users.

-- my_window is a custom window, SELECT room_id, COUNT(DISTINCT user_id) FROM MySource GROUP BY room_id, my_window(ts, INTERVAL '1' HOURS)Copy the code

3, window offset

This is a more generic feature that is supported in the Datastream API layer, but not in SQL. Here’s an interesting scenario where the user wants to open a week window, and the week window becomes an unnatural week starting on Thursday. Because no one would have guessed that January 1, 1970 was a Thursday. The correct natural cycle window is supported with offset support.

SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
TUMBLE(ts, INTERVAL '7' DAY, INTERVAL '3', DAY)
Copy the code

Dimension table optimization

1. Delay Join

In the scenario of dimension table Join, the dimension table often changes, especially the new dimension, and the Join operation occurs before the new dimension, which often leads to disassociation.

Therefore, users hope that if Join fails, the data will be cached temporarily and then try again. In addition, users can control the number of attempts and customize the rules for delaying Join. This need scenario is not only within Byte, but also many students in the community have similar needs.

Based on the above scenario, the delayed Join function is implemented and an operator is added to support the delayed Join dimension table. When a Join fails, the local cache does not cache the null result and stores the data temporarily in a state, which is then retried based on the timer and the number of retries.

2, dimension table Keyby function

The topology shows that the Cacl operator and lookUpJoin operator are chained together. Because it doesn’t have the semantics of a key.

When job parallelism is high, each dimension table joins subtask to access all the cache space, which puts great pressure on the cache.

But look at the SQL of Join, equivalent Join naturally has Hash attribute. The configuration is directly opened. The user directly takes the key of the dimension table Join as the Hash condition to partition the data. In this way, the access space between subtasks of each downstream operator can be guaranteed to be independent, which can greatly improve the initial cache hit ratio.

In addition to the above optimizations, there are two other dimension table optimizations currently under development.

1. Broadcast dimension table: In some scenarios, the dimension table is relatively small and updated infrequently, but the QPS of the operation is extremely high. If you are still accessing external systems for joins, the pressure is very high. In addition, when a Failover occurs, all the local cache fails, which causes great access pressure to the external system. Then, the improved solution is to periodically scan the dimension table in full and send it to the downstream through Join key hash to update the cache of subtask of each dimension table. Mini-batch: The system supports Batch I/O requests with a high number of requests, such as RPC, HBase, and Redis. In the past, all requests are made one by one, and Async I/O can only solve the problem of I/O delay, but not the problem of traffic. By implementing the mini-batch version of the dimension table operator, the number of times the dimension table accesses external storage is greatly reduced.

The Join optimization

At present, Flink supports three Join modes; Interval Join, Regular Join, Temporal Table Function.

The first two semantics are the same as stream and stream Join. And Temporal Table is the Join of flow and Table. The flow on the right will form a Table in the form of primary key, and the flow on the left will Join this Table. In this way, only one data can participate in a Join and only one result can be returned. Not as many as you can Join.

The differences are listed as follows:

It can be seen that the three Join methods all have their own defects.

  1. The current drawback of Interval Join is that it produces an out Join data and an out-of-order watermark.
  2. Regular Join’s biggest drawback is retract magnification (more on this later).
  3. There are more problems in Temporal table function, including three problems.
  • Do not support the DDl
  • Out Join semantics not supported (limitation of Flink-7865)
  • Watermark was not updated due to data interruption on the right side, and downstream calculation was not correct (Flink-18934)

For the above deficiencies, the internal bytes have been modified accordingly.

Improve Checkpoint recovery capability

It is difficult for SQL jobs to recover from checkpoint once conditions change.

SQL jobs do have a weak ability to recover from checkpoint, because sometimes they can’t recover after making changes that don’t seem to affect checkpoint. There are two main reasons for failure to recover;

  • First point: The Operate ID is automatically generated, and then the ID it generates changes for some reason.
  • The second point: the calculation logic of the operator has changed, that is, the definition of the state inside the operator has changed.

Example 1: The degree of parallelism has changed and cannot be recovered.

Source is one of the most common stateful operators. If the logic of source and subsequent operator chain is changed, it is completely unrecoverable.

The upper left of the figure below is a logic generated by the normal community edition operation. The source and the operator with the same degree of parallelism will be chained together, and the user cannot change it. However, the operator parallelism is often changed, for example, source is changed from 100 to 50, and the concurrency of CACL is 100. The logic of the chain changes.

In this case, the byte is modified internally to allow the user to configure the source so that it is not chained to the operator chain, even though the parallelism of the source is the same as that of the overall job.

Example 2: DAG changes and cannot be recovered.

This is a special case where you have an SQL (above), you can see that the source has not changed, the next three aggregations have no relationship to each other, and the state is unrecoverable.

The job cannot be restored because of the operator ID generation rule. In SQL, the generated rules of operator ID are related to the upstream, its own configuration, and the number of downstream operators that can be chained together. The new indicator will cause a new downstream node of Calc, which will change the operator ID.

To handle this situation, a special configuration mode is supported. When generating operator IDS, users can ignore the condition of the number of operators in the downstream chain.

Example 3: An aggregation indicator cannot be restored

This is the biggest and most complex part of users’ demands. Users expect that after adding some aggregation indicators, the original indicators will recover from checkpoint.

You can see the sqL-generated operator logic on the left. Count, sum, sum, count, DISTINCT will be stored in ValueState as a BaseRow structure. Distinct is special and is stored separately in a MapState.

As a result, if the index is added or reduced, the original state cannot be restored properly from ValueState, because the state “schema” stored in VauleState does not match the new “schema” (after modifying the index) and cannot be deserialized properly.

Before we discuss solutions, let’s review normal recovery flows. Check whether the serializer is displayed from the checkpoint and whether the serializer is displayed from the checkpoint. The new status definition will be compared with the original status definition for compatibility. If the new status definition is compatible, the status is restored successfully. If the new status definition is incompatible, the exception task will be thrown.

Another case of incompatibility is to allow the return of a migration (state recovery of two mismatched types) and then the recovery will succeed.

Make corresponding changes to the above process:

  1. The first step is to make the old and new Serializers know each other’s information, add an interface, and modify the process of StateBackend resolve compatibility to transfer the old information to the new and obtain the migrate process.
  2. The second step is to check whether the old and new are compatible. If not, perform a migration. Use the old Serializer to restore the state and use the new Serializer to write the new state.
  3. Aggregation code generation is processed. If the aggregation index is found to be null, some initialization is done.

Through the above modification can basically achieve normal, the new aggregation index from the disassembled scheme recovery.

Three, flow and batch integrated exploration

The business situation

Bytedance’s technical team did a lot of technical exploration in advance before its internal batch integration and business promotion. The overall judgment is that THE SQL layer is able to stream batch one of the semantics, but in practice has found many differences.

For example, session Windows for stream computation, or Windows based on processing time, cannot be done in batch computation. At the same time, SQL has no corresponding implementation in stream calculation for some complex over Window in batch calculation.

However, these particular scenarios may only account for 10% or less, so implementing stream batch integration with SQL is feasible.

Flow of an organic whole

So this is a very common picture and it’s similar to the architecture in most companies. What are the flaws in this architecture?

  1. Different data sources: Batch tasks typically have a pre-processing task, whether offline or real-time, which is written to Hive after a layer of pre-processing. Real-time tasks read raw data from Kafka, possibly in JSON format, avro, and so on. SQL that can be executed in a batch task is not generated or executed incorrectly in a stream task.
  2. Different computing sources: Batch tasks are based on Hive + Spark, while stream tasks are based on Flink. Different execution engines have some differences in implementation, leading to inconsistent results. Different execution engines have different apis that define UDFs and are not common to each other. In most cases, you maintain two sets of UDFs with the same functionality based on different API implementations.

In view of the above problems, the flow batch integrated architecture based on Flink is proposed to solve them.

  1. Different data sources: Streaming processing is processed by Flink and then written to MQ for downstream streaming Flink jobs to consume. For batch processing, streaming processing is processed by Flink and then written to Hive for batch Flink jobs to process.
  2. Different sources of engines: Since they are based on Flink’s development of streaming, batch jobs, naturally there is no calculation of different sources of problems, and also avoid maintaining multiple sets of UDFs with the same function.

Flow batch integrated architecture based on Flink:

The business income

  1. Unified SQL: A set of SQL is used to express both stream and batch computing scenarios, reducing development and maintenance work.
  2. Multiplexing UDFs: Streaming and batch computing can share a single SET of UDFs. This is a positive for business.
  3. Engine unification: The cost of learning the business and maintaining the architecture is much lower.
  4. Optimization unification: Most optimizations can be applied to both stream and batch computations. For example, the optimization streams and batches for Planner and Operator can be shared.

Iv. Future work and planning

Optimized retract magnification

What is Retract amplification?

In the figure above, there are 4 tables. The first table is Dedup, and then the other three tables are joined. The logic is relatively simple. Table A shows the results of input (A1) and output (A1,B1,C1,D1).

When table A inputs an A2 and the data needs to be de-duplicated due to the Dedup operator, an operation -(A1) to withdraw A1 and an operation +(A2) to add A2 are sent downstream. The first Join operator receives -(A1) and sends it downstream by changing -(A1,B1) to -(A1,B1) and +(null,B1)(to preserve what it considers correct semantics). After receiving +(A2), it sends -(NULL,B1) and +(A2,B1) downstream, doubling the size of the operation. The downstream operator operation will continue to be amplified, and the final sink output may be amplified by as much as 1000 times.

How to solve it?

Convert two retract data into a Changelog data and pass it between operators. After receiving changelog, the operator processes the change and then sends only one changelog change downstream.

The future planning

1. Function optimization
  • Checkpoint recovery capability for all types of aggregation indicator changes
  • window local-global
  • Event time for Fast Emit
  • Radio dimension table
  • Mini-batch support for more operators: dimension table, TopN, Join, etc
  • Fully compatible with Hive SQL syntax
2. Service expansion
  • Further push streaming SQL up to 80%
  • Explore the landing flow batch integrated product form
  • Promote real-time warehouse standardization