The author | Alan

Didi’s real-time computing engine has been seamlessly upgraded from Flink-1.4 to Flink-1.10, making it completely transparent to users. And in the new version of the index, scheduling, SQL engine optimization, performance and ease of use compared with the old version have been greatly improved.

This article introduces the difficulties and thoughts we encountered during the upgrade process, and hopefully gives you some inspiration.

The background,

Prior to this upgrade, the main version we were using was Flink-1.4.2, with some enhancements to the community version that provide both StreamSQL and low level APIS as a service. The existing cluster scale has reached 1500 physical machines, running more than 12,000 tasks, and processing about 3 trillion pieces of data per day.

However, with the development of the community, especially after Blink merged into master, there are many functional and architectural upgrades. We hope to provide better streaming computing services through version upgrade. In February of this year, the milestone version flink-1.10 was released, and we started working on the new version, embarking on a challenging upgrade.

New features of Flink-1.10

As the largest release update to the Flink community to date, the new features address many of the pain points that were previously encountered.

1. Native DDL syntax and Catalog support

Flink SQL natively supports DDL syntax, such as CREATE TABLE/CREATE FUNCTION, and can use SQL to register metadata without using code.

There is also support for Catalog, which by default uses InMemoryCatalog to temporarily store information in memory, as well as HiveCatalog for integration with the HiveMetastore. You can also extend your own Catalog interface to implement custom metadata management.

2.Flink SQL enhancements

  • The ROW_NUMBER implementation of TopN and de-replay syntax extends the use of StreamSQL.
  • The BinaryRow type is implemented as an internal data interaction, building the data directly in binary rather than an array of objects. For example, when using a field ina piece of data, you can unsequence only part of the data, reducing unnecessary serialization overhead.
  • A number of new built-in functions, such as string handling, FIRST/LAST_VALUE, etc., are more efficient than custom functions because they do not need to be converted to external types.
  • MiniBatch optimization was added to improve task throughput through microbatch processing

3. Optimize memory configuration

Previously, managing Flink memory has been a bit of a headache, especially with RocksDB, because there may be multiple instances of RocksDB in a TaskManager, and it is not easy to estimate the memory usage, resulting in frequent memory overruns.

On the new added some memory configuration, such as the state. The backend. Rocksdb. Memory. The fixed – per – slot can easily limit each slot rocksdb memory usage limit, to avoid the risk of OOM.

Challenges and responses

The biggest challenge for this upgrade was ensuring StreamSQL compatibility. The purpose of StreamSQL is to shield users from low-level details and focus more on business logic, and we can provide better service through version upgrades or even engine changes. Smooth escalation of tasks is the most basic requirement.

1. How is the internal patch compatible

Due to the huge gap in architecture across multiple versions, the internal patch cannot be directly integrated, and it needs to be re-implemented in the new version. We first sorted through all the historical commits, screened out the necessary changes and reimplemented them in the new version to cover all the existing functionality and ensure that the new version supported all the existing mission requirements.

Such as:

  • Add or modify Connectors to support internal requirements, such as DDMQ (Didi Open Source message queue product), access authentication function, etc.
  • New Formats implementation, such as binlog, parsing of internal log collection Formats, etc.
  • ADD JAR syntax, can reference external dependencies in SQL tasks, such as UDF JAR, custom Source/Sink.
  • Add SET syntax, you can SET TableConfig in SQL, guide the generation of execution plan

2. StreamSQL syntax compatibility

As of version 1.4, FlinkSQL was still in its infancy and there was no native DDL syntax support, so we implemented a custom DDL syntax using Antlr. However, with flink version 1.10, the community already provides native DDL support, which is quite different from our internal syntax. There are several paths open to us:

  • Discard internal syntax support and modify all tasks to the new syntax. (Contrary to the original intention of smooth migration, and high learning cost for existing users)
  • Modify Flink syntax parsing module (SQL-parser) to support internal syntax parsing. (The implementation is complicated and not conducive to subsequent version upgrades)
  • A syntax conversion layer is encapsulated on TOP of SQL-Parser, which extracts valid information from the original SQL parsing and then organizes it into community syntax through string splicing.

Finally, we choose the third scheme, which can minimize the coupling with the engine and run as a plug-in. In the future, the existing logic can be reused in the engine upgrade, which can reduce a lot of development costs.

For example, in older versions we used the “jSON-path” library for JSON parsing, and defined an expression like $.status in a table statement to indicate how to extract this field.

The native JSON type parsing on the new version can use the ROW type to represent nested structures. In the conversion to the new syntax, the original expression is parsed into a tree and the new field type is constructed, and the fields in the original table are extracted using computed columns to ensure that the table structure is the same as before. Type names and configuration properties are also mapped into community syntax.

3. Compatibility test

Finally, there is the testing phase, where thorough testing is required to ensure smooth escalation of all tasks. Our original plan was to prepare regression testing and play back all the existing tasks after replacing the configuration, but there were many problems in the actual operation:

  • The test process is too long and can take hours to run at a time.
  • Problems are difficult to locate when they occur and can occur at any stage in the life cycle of a task.
  • Cannot verify the computed results, that is, the semantics of the old and new versions are consistent

Therefore, according to the task submission process, we divided the test into multiple stages. Only after all the tests were passed in the current stage, we entered the next stage to find problems in advance, narrow the problem location scope to the current stage, and improve the test efficiency.

  • Transformation test: All tasks are transformed, the test results meet expectations, and the typical scenario is abstracted as a unit test.
  • Compile the test: Make sure all the tasks can be executed on TablePlanner and finished before it can be compiled into JobGraph and actually committed to run.
  • Regression testing: Play back the task replacement configuration in the test environment to confirm that the task can be committed for execution
  • Comparison test: The sampled data are submitted to the old and new versions in the form of files for operation, and the comparison results are completely consistent (because some task results are not deterministic, the old version is used for two consecutive runs to screen out deterministic tasks as test cases)

Fourth, engine enhancement

In addition to compatibility with older versions, we have also enhanced the engine by incorporating features from newer versions.

1. The Task – the Load indicator

We always hope to accurately measure the load of tasks, and the use of backpressure indicators can only roughly judge whether the resources of a task are enough or not.

Combined with the new Mailbox thread model, all mutually exclusive operations are run in TaskThreads, and the percentage of task load can be accurately calculated by counting the time occupied by threads.

Metrics can be used to recommend resources for tasks in the future, keeping task loads at a healthy level.

2. SubTask Balancing scheduling

After flip-6, Flink modified the resource scheduling model to remove the –container parameter and slot on demand to ensure no idle resources. However, this also leads to the problem that the number of concurrent sources is often less than the maximum number of concurrent sources, and SubTask scheduling is based on the topological order of the DAG, so that the SourceTask is concentrated in some TaskManagers, resulting in hot spots.

We added a “minimum number of slots” configuration to ensure that a corresponding number of slots are applied immediately after Flink session starts and that they are not actively quit when they are idle. A cluster. Even-spread-slots parameter ensures that subtasks will be evenly distributed across all TaskManagers if there are enough slots.

3. Window function enhancement

Take TUMBLE(time_attr, INTERVAL ‘1’ DAY) as an example. If a window is opened in a DAY, its start and end time is fixed from 00:00 to 24:00 each DAY. Therefore, it cannot be created between 12:00 each DAY and 12:00 the next DAY.

This can be done for code, but not SQL, by adding the TUMBLE parameter (time_attr, INTERVAL ‘1’ DAY, TIME ’12:00:00′) to set the offset to 12 hours.

There is another scenario, such as counting UV for the day, and wanting to show the results of the calculation at the current moment, such as triggering window calculations per minute. For the way of code development, the Trigger logic of the window can be determined by the way of custom Trigger, and Flink also has some built-in Tigger implementation, such as ContinuousTimeTrigger is suitable for this scenario. So we added an optional parameter to the window function, TUMBLE(time_attr, INTERVAL ‘1’ DAY, INTERVAL ‘1’ MINUTES), representing the window’s trigger period.

Extend the use of Windows in SQL by adding offset and Tiggger cycle parameters (TUMBLE(time_attr, size[,offset_time][,trigger_interval])). Scenarios like the one above can be developed directly using SQL without using code.

4. RexCall results are reused

In many SQL usage scenarios, the result of the previous calculation will be used multiple times, such as parsing JSON into a Map and extracting multiple fields.

Although it looks like json parsing is called only once through subqueries, it is optimized by the engine to generate RexCall chains through Projection of the result table, and the results are similar:

As a result, the jSON-parsed calculation was repeated three times, even though the view was split into two steps, which was optimized by the Planner.

For deterministic (isDeterministic=true) functions, the same input must represent the same result, and it is meaningless to perform the JSON parsing three times. How to optimize the function to reuse the result?

During code generation, the mapping between the unique Digest generated by RexCall and the variable symbol is stored in CodeGenContext. If you encounter the same function call from Digest, you can reuse the existing result variable so that parsing JSON only needs to be done the first time, And then you can reuse the first result.

Five, the summary

After several months of hard work, the new version is up and running, and as the default engine for StreamSQL, tasks are restarted and run directly with the new version. Compatibility test pass rate reached 99.9%, can basically achieve transparent upgrade to users. Users new to StreamSQL can develop using community SQL syntax, and existing tasks can modify part of the DML statements to use the new features. Now the new version has supported many business scenarios in the company. For example, the real-time data warehouse team of the company takes on a variety of data requirements to achieve stable operation and keep consistent with the offline caliber based on the stronger presentation ability and performance of the new version.

Versioning is not the end of the story. With the development of real-time computing, there are more and more teams in the company that need to use Flink, which presents us with more challenges, such as integration with Hive to write results directly to Hive or using Flink as a batch engine. These are also our exploration and development direction, through continuous iteration to provide users with simpler and more user-friendly streaming computing services.