Apache Flink Contributor Yang Kete/Wu Chong

This article is compiled from the Flink-Forward-China-2018 conference held in Beijing on December 20. Looney, Apache Flink Committer, joined Alibaba after graduating with a master’s degree in 2011. She has been involved in the design and development of several core computing platform projects, including search engine, scheduling system, monitoring and analysis, etc. Currently, I am responsible for the development and optimization of Blink SQL. Chong Wu: Cloud Ye, Apache Flink Committer, has been working on Flink Table & SQL for three years since Flink V1.0, and is currently working on Alibaba Blink SQL project team.

This paper mainly introduces the challenges and practices of constructing a unified data processing engine based on Flink Streaming from the following five aspects.

  • Why unify batch and stream

  • What is a unified SQL processing engine

  • How to unify batch and stream

  • performance

  • Future plans

First, why to unify batch and flow

Companies usually have a traditional batch system to compute some reports every day. With more and more real-time requirements, they may use Storm, Flink, and Spark to do stream calculations, while running a batch on the side to compute a result in the granularity of hours or days to realize the verification of both sides. This is the classic Lambda architecture.

However, this architecture has many problems, for example, users need to learn the development mode of two sets of engines, and operation and maintenance need to maintain two sets of systems simultaneously. More importantly, we need to maintain two sets of processes, one set of incremental processes and one set of full processes, and there has to be some self-consistency between the two sets of processes, they have to be consistent. This consistency can be a challenge in itself as the business becomes more complex.

This is also the problem that Flink SQL hopes to solve. It hopes that through Flink SQL, whether developers or operation and maintenance personnel only need to learn a set of engines, they can solve all kinds of big data problems. Nor is it limited to batch or stream computing, or even more. For example: support high latency batch processing to achieve OLAP effect, use SQL syntax to do complex event processing (CEP) directly, use Table API or SQL to support machine learning, etc. There’s a lot of room to imagine with SQL.

What is a unified SQL processing engine

So what is a unified SQL processing engine? What are the challenges on the road to a unified SQL engine?

From the user’s point of view, the phrase is “one piece of code, one result”. In other words, you only need to write one piece of code, and the end result of the stream and batch is the same. The advantage of this is that users no longer have to guarantee consistency between incremental and full processes; this consistency will be guaranteed by Flink.

From the perspective of development, in fact, more attention is paid to the unification of the underlying architecture, such as whether some technical modules are universal enough, whether the stream and batch mode can be reused as much as possible. Is a well-designed and efficient data structure widely applicable to many modules of an engine?

User perspective

First we have a user’s score table USER_SCORES, which contains the user’s name, score, and time of score. Using this table to do a very simple statistic, take the total score of each user, and what is the most recent time to get this score? From the perspective of stream computing and batch processing, whether it is to make an offline report or to continuously produce calculation results in real time, their SQL is exactly the same, it is a simple GroupBy group, sum, find Max.

As shown above, there is a source data table with names, scores, and event times.

For batch processing, you can get the final result directly through such an SQL, and the result will only be displayed once, because the output will not be finished until the data has been consumed.

SQL is exactly the same for stream processing. Let’s say the flow task starts at 12:01, at which point no message has been received, so it doesn’t output anything. As time passes, the first score message from Julie is received, and Julie 7 12:01 is printed. When 12:04 is reached, the output is updated to Julie 8 12:03 because another score message for Julie has been received. This may not be the right result for the final result, but at least as of 12:04, it is the right result. When the time advances to the current time (say 12:08), all the generated messages have been consumed, and you can see that the output is exactly the same as the batch result.

This is “one code, one result”. In fact, from the user’s point of view, streaming computing and batch processing are not different in the correctness of the results, but in the timeliness of the results.

The development perspective

What does engine unification mean for developers? This diagram is the current Flink architecture diagram, the top Table API and SQL. Before execution, the API for DataSet or DataStream is translated according to the execution environment. There are major differences between the two apis, so we can zoom in and see. For example, the DataSet API is the Flink batch API, which has its own optimizer. Under the DataStream API, however, there are some relatively straightforward translations. Then, at run time, they also rely on different tasks. On the DataStream side, the Stream Task is run, along with various operators. Batch tasks and drivers are run in Batch processing. This is mainly the difference in execution mode.

How much can I share in the code? For example, to implement an INNER JOIN, if you want to implement an INNER JOIN on a stream at the current code distance, the first step is to convert the two inputs into two DataStream, and then connect the two inputs, and then perform keyBy JOIN key. Finally, a KeyedCoProcessOperator is implemented to complete the join operation. On the DataSet side, however, you will find the API is different because there is an optimizer underneath the DataSet. In other words, some oF the apis for DataSet are declarative, while DataStream’s are imperative. From this example, for us developers, the API for implementing JOIN in stream computation or batch processing is quite different, so it is difficult for us to reuse some code, even design reuse. This is the API difference, after the Runtime, the big difference is the Stream Task and Batch Task difference.

In a classic pull mode, an executor will start executing, which can be understood as a program entry, and it will request the final result from the last node. The last node (summing node) requests data from the previous node (filtering node), and then from the previous node to the source node. The source node reads the data itself and passes it down until the last node computes the sum and returns it to the program entry. This is very much like a function call stack.

In push mode, when the program starts executing, we can directly find the head of a DAG node, all the data and control information by the head node sends down, control flow and data flow, the equivalent of it at the same time do a function call, and the data is sent to the next operator, eventually achieve the summation operator.

As you can see from this simple example, the two execution modes are quite different, which can cause a lot of problems with runtime unification, but they actually accomplish similar functions.

How to unify the flow and batch

With all the challenges we’ve faced in deeply unifying streams and batches, how do we do it?

01 dynamic table

First, SQL is increasingly recognized as the universal language for big data processing, not only because SQL is a very easy language to express, but also because SQL is a very suitable language for stream batch unification. But in traditional SQL, SQL always works on tables, not streams. How do you make SQL work on streams that have the same semantics and results as a batch? That was our first problem. In order to solve this problem, we proposed two theoretical bases of “duality of flow table” and “dynamic table” with the community. Here, we will not expand the theoretical basis, interested students can go to the official website to read the article. All you need to know is that only on the basis of these two theories can the semantics of streaming SQL be guaranteed to be the same as those of batch, and the results will be the same.

02 Architecture Improvement

The figure shows some architectural improvements. The architectural improvements focus on the middle two layers. In the Runtime layer, we have enhanced the existing Operator framework to support batch operators. On top of Runtime, we propose a Query Processor layer, including Query optimization and Query execution. Table API-SQL is no longer translated to DataStream and DataSet, but is put on top of Query Processor.

Unified Operator framework

In the Runtime layer, the first implementation is the unification of the Runtime DAG layer, based on the unification of the DAG layer, then to build the flow operator and the batch operator. To unify the lowest level APIS for streams and batches, a unified Operator layer abstraction is introduced. Batch operators are implemented based on StreamOperator interface instead of Deriver interface. Both streams and batches are implemented using the same Operator API.

In addition, for batch scenarios, we have extended the Operator framework to allow additional optimizations for batches.

The first point is that the Operator can independently choose the input side, such as hashJoin. The hashJoin of a batch will generally finish the build side first, build the hash table first, and then process the probe side on the other side.

The second is a more flexible Chaining strategy. The default Chaining strategy of the StreamOperator can only chain single-input operators together. But in some batch scenarios, we want to be able to Chaining multiinput operators as well. For example, we want to Chaining two Join operators together so that shuffling of data between the two Join operators can be avoided.

We have started a discussion in the community about the unified Operator framework, and those who are interested should follow this discussion link.

Unified query processing

Then we move on to the unified Query Processor layer, which follows the same parsing and optimization path for both stream computation and batch SQL. At the parsing level, where SQL and Table API code is parsed into logical plans, streams and batches reuse exactly the same code. The same optimizer is then used for optimizing laminar flow and batch, where all optimization rules are pluggable. Streams and batches share most of the optimized rules, and only a few rules are stream specific or batch specific. Then, after optimization, you get a physical plan, which is translated into the ultimate Execution DAG, which is the Stream Operator we just talked about, and runs distributed.

03 Unification of optimizers

At the optimizer level, it follows the 80/20 rule that 80% of optimization rules are shared between streams and batches, such as column clipping, partition clipping, conditional pushing, and so on. In addition, 20% of the optimization rules are unique to the stream batch, and an interesting rule is found through our research.

Sort is not supported by streams, so sort can be understood as batch specific. For example, some sort merge JOIN optimizations and sort AGG optimizations are implemented.

And some of the rules that are unique to the flow side are state specific. Why is that? Because current flow jobs run an SQL job in production, RocksDB’s StateBackend is typically used. RocksDB StateBackend has a major bottleneck because every operation you do involves serialization and deserialization, so State operations can be a bottleneck for stream operations. Therefore, how to optimize a stream job is often to think about how to save State operations, how to reduce the size of State, and how to avoid storing duplicate State data. These are the immediate directions of current flow computing task optimization.

Here is a more advanced optimization rule for stream and batch sharing. This is a simplified TBCH13 query. There is a user table Customer and an order table Orders. The Customer table will join the orders table according to custKey. Then, after join, the custkey was used to group the orders and the number of orders appeared was counted.

The order number of each customer is counted, but the order data is stored in the Orders table, so we need to join this table. This query is parsed and the resulting logical plan is the middle graph. You can see that the Customer table and the Orders table are joined, followed by an AGG. There is a problem, however, that the Customer table and the Orders table are both very large tables with hundreds of millions of ranks. In batch processing, building a hash table for this join requires a large amount of buffers and even disk drops, which may result in poor join performance. The same is true for stream processing, where all the customer and Orders tables need to be stored in state. The larger the state, the worse the stream processing performance.

So how to save and avoid the amount of data is the direction of the query optimization. We noticed that Customer itself has a primary key, custKey, and the final AGG also conducts aggregate statistics for CUSTkey. In fact, we can do aggregation statistics on the Orders table first, first count the number of orders from each custkey of each user, and then make a join with the Customer table, that is to say, push agG down to the point before join. The Orders table is compressed from 1.5 billion to 100 million and then joins. This is a huge performance optimization for both streams and batches. We tested the flow scenario in 14 minutes, up from six hours.

The purpose of this example is to show that SQL has been around for decades, there are a lot of great people working in this area for years, and there are a lot of mature optimizations. These optimizations, based on the unified model of stream batch, can be taken directly to stream. We no longer need to develop and study a set of optimization rules for the flow, so as to achieve twice the result with half the effort.

04 Unification of basic data structures

In Flink, a data structure called Row was passed between operators working in streams and batches. But Row has several typical problems.

  1. For example, if you have a Row of data, the first one is an integer and the other two are strings. So there’s an Int in the row and then there’s two strings. But we know that Java has some overhead on objects, some extra space.

  2. There is also the overhead of boxing and unboxing for the main type of access.

  3. The hashcode, deserialization, and deserialization methods of a Row are iterated over each element of the array in the Row, which involves a lot of extra overhead of virtual function calls.

  4. The last point is that for some slightly more advanced data structures, such as sort, and some hashtables in AGG Join, the binary data structure of HashMap, based on the encapsulation of Row, is difficult to achieve extreme efficiency.

So to address these problems, we also proposed a new data structure, BinaryRow, which is completely based on binary structure design. BinaryRow is divided into fixed and variable lengths, starting with a null bit interval that records whether each field is null. And then fixed-length data types like int, long, double, we’re going to store that in the fixed-length area, and then variable length data like string, we’re going to store its variable length data in the variable length area, and then we’re going to store its pointer and its length in the fixed length area. When storing data, each block in BinaryRow is 8-byte aligned. Why octet alignment? On the one hand, in order to achieve faster random Access, the position of a field can be located directly without going through it from the beginning. The other side of the coin is better CPU caching.

BinaryRow has one important advantage: delayed deserialization. For example, binary data from the network or binary data from state will not be deserialized immediately, but will be wrapped into BinaryRow and deserialized when needed. This saves a lot of unnecessary deserialization operations and improves performance. After testing, the data structure not only performed very well in batch processing, but also doubled its performance in stream processing.

05 Runtime sharing

1-dimensional table association

Dimensional table association, as you probably know, is when a stream joins an external database. We will take the ID of the stream data to lookup the dimension table, the lookup process, we will achieve a synchronous mode or asynchronous mode. We know that DataStream supports asynchronous IO interfaces, but DataSet does not. However, since we have unified the Operator layer, the batch can directly reuse the Operator implementation of the stream. Although in traditional batch processing, if you want to query the dimension table, scan the dimension table first and then JOIN it. However, if the dimension table is particularly large and the probe end is particularly small, it may not be cost-effective, and the lookup method may be more efficient. Therefore, it also makes up for a shortcoming of batch in some scenarios.

2 streams of microbatch processing

To avoid frequent manipulation of state, we introduced a micro-batch mechanism on the stream. This is done by inserting micro-batch events into the data stream. Then in the Aggregate Operator, when we receive the data, we store it or Aggregate it directly into the binary hash table (cached in memory). Then, when the micro-batch event is received, the BinaryHashMap is triggered, the cached results are flushed into state, and the final result is printed. Here the BinaryHashMap is fully and batch multiplexed. The flow side did not have to rebuild, and the performance was improved tenfold.

Four, performance performance

We first tested the performance of a batch, with tPC-H to do a test. Compared with Flink1.6.0, this graph is a comparison of the time per query with 1T of data, so the smaller the time, the better its performance. We can see that Blink is much better than Flink1.6 in every query. Average performance is ten times faster than flare 1.6. The flow also managed to overcome all TBCH queries with a unified architecture. It’s worth noting that this is something no other engine can do right now. And in this year’s Tmall Double 11 flow calculation, reached a throughput of 1.7 billion TPS. This high performance was achieved thanks to the unified stream batch architecture we talked about today.

5. Future plans

We will continue to explore some combination of stream and batch, because stream and batch are not black and white, not batch is batch, stream is stream, there is a lot of space between stream and batch worth exploring. For example, a job might be one part of a stream job that runs all the time, and one part of a batch job that is scheduled at an interval, and they run in convergence. For example, after a batch job runs, how to seamlessly migrate it to a stream job is the direction of our future research.

For more information, please visit the Apache Flink Chinese community website