Lin Jia, responsible person of real-time business of netease Interactive Entertainment Billing Data Center, main program of real-time development framework Jflink-SDK and real-time business platform JFlink, Flink Code Contributor.

This article is shared by Teacher Lin Jia, who is in charge of real-time business of netease Interactive Entertainment Billing Data Center. It mainly introduces why netease Data Center chooses Flink and TiDB when dealing with real-time business, and the combined application of the two.

Today, from the perspective of development, I will talk with you about why netease data center chooses Flink and TiDB to deal with real-time business.

First of all, TiDB is a hybrid HTAP distributed database, with one-button horizontal scaling, strong consistency of multi-copy data security, distributed transactions, real-time OLAP and other important features, compatible with MySQL protocol and ecology, easy migration, low operation and maintenance costs. Flink is the most popular open source computing framework at present. In terms of real-time data processing, its excellent performance of high throughput, low latency and guarantee of Exactly Once semantics provide convenient support for real-time business processing of netease games.

What kind of business value can Flink on TiDB create? This article will be from a real-time cumulative story to share with you.

Let’s start with a real time cumulative story

Those of you who are familiar with online business should be familiar with the above data. This is a classic online real-time business table, which can also be understood as a log or some kind of monotonically increasing data, including time stamps, accounts, items purchased, quantities purchased, etc. For the analysis of such data, it is assumed that the real-time computing framework such as Flink can be used for bucket processing, such as groupby user ID and groupby props, and then bucket time, which will eventually produce the following continuous data.

If the above data into TiDB continuously, meanwhile TiDB remained existing online dimension tables, such as account information, property information, through the table to do a JOIN operation from the fact that statistical data can be quickly in time-series data, represented by the value of receiving visualization applications, can find a lot of different things.

The whole process seems simple and perfect, with Flink solving computing problems and TiDB solving mass storage problems. But is this really the case?

Students with actual access to online data may encounter similar problems, such as:

  • Multiple data sources: external system logs of various business parties, and some data is stored in the database, some need to be called as logs, and some need to be called as REST interfaces.

  • Data formats vary: Each business or channel has a completely different data format, either JSON or Encoded URL.

  • Out-of-order arrival: Data arrival order is out of order.

Based on the above problems, we introduced Flink. In the data center, we have encapsulated a set of framework called JFlink-SDK, which is mainly based on Flink to modularize and configure ETL, out-of-order processing, group aggregation and some common requirements. Then, through the configuration of online data sources, we can calculate and obtain some statistics or factual data. Finally, into the TiDB, which can hold a large amount of data.

However, when Flink processes this batch of data, it uses CheckPoint to save the current computing status of the data for fault recovery. If a commit occurs between the two saves, that is, the result of the calculation has been flushed out of TiDB, and then a failure occurs, Flink automatically falls back to the last CheckPoint, that is, to the last correct state. At this point, the 4 pieces of data in the figure will be recalculated and may be updated into TiDB after recalculation.

If the data is a cumulative value, you can see that the cumulative value is mistakenly accumulated twice, which is one of the problems that can occur with Flink on TiDB.

Flink’s guarantee of accuracy

Flink’s guarantee of accuracy

How does Flink provide accuracy assurance? First, you need to understand Flink’s CheckPoint mechanism. A CheckPoint is similar to a transaction savepoint in MySQL. It is used to save temporary states during real-time data processing.

CheckPoint is divided into At least Once and Exactly Once, but even choosing Exactly Once does not solve the problem of double calculation of the above accumulation value. For example, if the data is read from Kafka, based on the above fact table, the account is 1000, the purchase item is A, and the purchase quantity is 1 and 2 respectively, the data processed by Flink will be divided into buckets. At the same time, the other Key is divided into a bucket by Keyby, which is equivalent to MySQL groupby, and then flushed into TiDB Sink by aggregation function.

Saving of computed state

Flink uses CheckPoint mechanism to ensure Exactly Once data. Suppose you need to do a relatively simple execution plan DAG with only one source and then brush TiDB Sink through the MAP. In this process, Flink is linear, which is accomplished by inserting a CheckPoint barrier mechanism into the data stream. It is equivalent to the CheckPoint barrier triggering the operator savepoints in the linear execution plan wherever it goes.

If you start with source, you save source, and if Kafka, you need to save Kafka’s current consumption location. After the node is saved, the state of the next operator needs to be saved. The MAP here assumes bucket calculation, so it has already stored the accumulated data in the bucket.

After that, the CheckPoint barrier reaches the sink, and the sink also stores the corresponding state. After the corresponding state stores are completed, the total Job Manager (equivalent to the Master) reports that the CheckPoint of the state stores is complete.

When the Master confirms that all subtasks have CheckPoint completed the distributed task, it sends a Complete message. As shown in the model above, it can be thought that it is actually 2PC, distributed two-phase commit protocol. Each distributed task submits its own transaction separately, and then submits the whole transaction as a whole. The saved state is stored in RocksDB, and in the event of a failure, the data can be recovered from RocksDB and the entire process recalculated from the breakpoint.

Exactly Once semantic support

Looking back at Exactly Once, can the above methods really achieve Exactly Once? Not really, but why does Flink officially call it Exactly Once? Here’s why.

As can be seen from the code above, Exactly Once CheckPoint is not guaranteed end-to-end, only Exactly Once of Flink’s internal operator. Therefore, when calculating data is written to TiDB, if TiDB cannot be linked with Flink, end-to-end Exactly Once cannot be guaranteed.

Kafka supports this kind of semantics because Kafka exposes the 2PC interface and allows users to manually adjust the interface to control the 2PC process of Kafka transactions, thus using CheckPoint mechanisms to avoid errors.

But what if you can’t control it manually?

If the user is still set to 1000 and the item bought by ITEM A is written to the TiDB cumulative table, the following SQL will be generated: INSERT VALUES ON DUPLICATE UPDATE When CheckPoint occurs, can we ensure that the statement is executed to TiDB?

If the SQL query is executed without special processing, it is not guaranteed whether the SQL query is executed or not. If the SQL query is not executed, an error is reported and the system returns to the last CheckPoint, and everyone is happy. Because it doesn’t actually count, it doesn’t add up, it doesn’t double count, so it’s correct. However, if you have already written a CheckPoint and then go back to the previous CheckPoint, you will repeat and add up by 3.

To solve this problem, Flink provides an interface that can manually implement SinkFunction, control the start of transactions, Pre Commit, Commit, Rollback.

The CheckPoint mechanism is essentially a 2PC. When distributed operators perform internal transactions, they are actually associated with Pre Commit. Similarly, assume that in Kafka, you can pre-commit a Kafka transaction via a Pre-commit transaction. When the CheckPoint status of all the operators synchronized from the Job Manager (Master) has been saved, the transaction is committed.

If the other operators fail, Rollback is required to ensure that the transaction has not been successfully committed to the remote end. If there is 2PC SinkFunction plus XA full section semantics, then Exactly Once can be achieved.

However, not all sinks support the two-stage submission protocol. For example, TiDB uses the two-stage submission protocol to manage and coordinate its transactions. However, at present, the two-stage submission protocol is not provided to users for manual control.

Idempotent calculation

So, how to ensure that the Exactly Once result of the business falls on TiDB? At Least Once with a Unique Key.

How to select Unique Key? If a piece of data has a unique marker, we naturally choose its unique marker. For example, if a table has a unique ID, when a table is synchronized to another table via Flink, it is a classic semantic deduplication using its Primary key to do insert ignore or replace into. In the case of logs, you can select properties specific to log files. If Flink is used to calculate the aggregation result, the aggregation Key plus the window boundary value or other idempotent methods can be used to calculate the value as the only Key in the final calculation.

In this way, the result can be reentrant. Since reentrant and CheckPoint’s retractable feature, we can combine Flink with TiDB to write Exactly Once results.

Flink on TiDB

In the Flink on TiDB section, our internal JFlink framework encapsulates Flink and then what does it do to interconnect with TiDB? More on this below.

Data connector design

First, the design of the data connector. Because Flink support for TiDB or relational database support is relatively slow, Flink Conector JDBC only appeared in Flink version 1.11, which is not that long ago.

Currently, we use TiDB as the data source and put the data into Flink for processing, mainly through the CDC tools provided by TiDB, which is equivalent to listening for TiDB changes and dropping the data into Kafka. Kafka is a very classic streaming data pipeline, so Kafka will consume the data, and then Flink for processing.

However, not all services can use CDC mode. For example, when data is dropped, complex filtering conditions need to be added, some configuration tables need to be read regularly, or some external configuration items need to be understood before the sharding situation is known, manual self-defining source may be required.

JFlink encapsulates a monotonic table of business fields for slice reading. Monotonic means that a table must have a certain field, monotonously varying, or append only.

In terms of implementation, JFlink TiDB Connect is encapsulated between TiDB and Flink, and a link word is used to create a link with TiDB. The data is then retrieved by an asynchronous thread and blocked by a blocking queue. Blocking queues are mainly used for flow control.

For the main thread of Flink, it mainly blocks non-empty signals on the queue by listening. When a non-null signal is received, the data is pulled out and passed through the deserializer as a flow object for the entire real-time processing framework, which can then be interfaced with various subsequent modular UDFs. When implementing the At Least Once semantics of source, Flink’s CheckPoint mechanism makes it very simple.

Because we already have a major premise, that is, this table is a monotonic table composed of a certain field, in the monotonic table when the data is shard, we can write down the current shard position. If a fault occurs, the whole stream will fall back to the last CheckPoint, and the source will also fall back to the last saved slice. In this case, no data consumption can be guaranteed, that is, the At Least Once of the source can be realized.

For sink, Flink officially provides JDBC sink, of course source also provides JDBC sink, but Flink officially provides JDBC sink implementation is relatively simple, using synchronous batch insert semantics.

Actually synchronous bulk insert is more conservative, when relatively large amounts of data, and there is no strict first come, first submit semantics, this time using synchronous submit relative performance is not very high, if use asynchronous submission, performance will improve a lot, equivalent to take full advantage of the characteristics of the distributed database TiDB, high concurrency support small affairs, Helps improve QPS.

When we implement sink, the principle is actually very simple. Let’s first talk about how Flink is officially implemented. Flink officially writes the main thread of Flink to a buffer, feeds when the buffer is full, and pulls up a thread to synchronize data to TiDB.

Our improvement is to use a blocking queue for flow control and write data to a buffer page. When the buffer page is full, an asynchronous thread is pulled to flush out immediately, thus ensuring improved QPS performance in non-FIFO semantics. Practice has proved that in this way, we can increase the number of official QPS from around 30,000 to close to 100,000.

However, the implementation of Sink’s At Least Once semantics is relatively complicated. If we want to CheckPoint the sink At Least Once, we must ensure that the sink is clean by the time the CheckPoint is done, that is, all the data is flushed out, so that the CheckPoint can be At Least Once. In this case, you might want to add CheckPoint threads, normal spawns, and other page-feed threads. When a CheckPoint is triggered, all data is flushed clean before the CheckPoint is completed. Once CheckPoint is completed, the sink must be clean, which means that all the previous data has been correctly updated to TiDB.

After optimization, OPS of about 100K was achieved. In our test environment, about three physical machines mix PD, TiKV and TiDB nodes.

The business scenario

At present, there are many application scenarios in which TiDB and Flink are used in the billing data center of the technical center. Such as:

  • Real-time formatting and storage of massive business log data;

  • Analysis and statistics based on massive data;

  • Payment link analysis for real-time TiDB/Kafka dual-stream connection;

  • Data map;

  • Time series data.

Therefore, it can be seen that Flink on TiDB is widely used in the business layer of netease data center. Here is a quote: “Peaches and plums do not speak, yet a path becomes a path yet”. Since Flink can be used so widely, it proves that this path is very valuable in fact.