Abstract:

December 13-14, by the cloud community and Alibaba Technology Association co-hosted the “2017 Alibaba Double 11 technology twelve lectures” successfully ended, focused on sharing the black technology behind the 2017 Double 11. This article is the compilation of the speech “Race against Time: Alibaba’s Real-time Big Data Technology to Support the Double 11”, mainly explains Alibaba’s real-time big data and related machine learning technology, and how these technologies are used in alibaba’s dozens of business units to achieve big data upgrade, and finally achieve outstanding results on Double 11. The content is as follows.


Sharing Guests:
Dasha is a senior technical specialist at Alibaba, responsible for real-time computing of Flink SQL. He previously worked at Facebook USA, Apache Flink Committer.

Real-time computing in Alibaba

Since 1999, Ali has been continuously expanding its business from e-commerce platforms, and has derived many products in various fields such as finance, payment, logistics and entertainment, such as e-commerce platforms based on Taobao and Tmall, Ali Mom advertising platform, Ant Financial Alipay, Ali Cloud and Dayentertainment. Today, Alibaba is not just an e-commerce platform, but a huge application ecosystem. Alibaba is now the world’s largest e-commerce platform, with 25 subsidiaries and $550 billion in revenue last fiscal year. There are nearly 500 million users on Alibaba’s platform, equivalent to one third of China’s population, and nearly 10 million users make transactions through Alibaba’s platform every day.


Alibaba has become a huge commercial aircraft carrier, on which, in addition to a large number of users and applications, a large amount of data is bound to be generated. At present, Alibaba’s data level has reached EB level, daily growth has reached PB level, daily real-time calculation data has also reached PB level, daily peak processing data can reach 100GB/S, this year’s Double 11 is a staggering 470GB/S.


Real-time computing is widely used within Alibaba. With the development of new economies, technological innovation and the improvement of user demand, people increasingly need the ability of real-time computing, its biggest feature is that data is changing. Next, two examples are given to illustrate the application scenarios of real-time computing in Alibaba:

1. Double 11 large screen

Alibaba aggregates valuable data to present to the media on November 11 every year, and the LARGE SCREEN of GMV is one of them. The whole GMV large screen is a very typical real-time calculation. Every transaction data is aggregated and displayed on the large screen. Data is written to the DataBase, processed in HBase in real time, and displayed on a large screen. The link in the whole process is very long. There are many challenges to the overall app:


1) Large screen display requires second-level delay, which requires real-time calculation of sub-second delay
2) A large amount of data on Double 11 needs to be aggregated in one Job
3) Exactly-Once keeps the accuracy of data calculation
4) The system is highly available, and there is no lag or unavailability


This application scenario has a very high SLA, requires second latency and data accuracy, but it is not complicated to calculate, and more complex applications will be introduced next.

2. Real-time machine learning

Machine learning generally has two important components: Feature and Model. Traditional machine learning has a low frequency of Feature collection and Model training, which cannot adapt to changing application requirements. For example, on Double 11, commodity prices and activity rules are completely different from normal times, so training based on previous data cannot achieve optimal results. Therefore, only by collecting features in real time can the training Model fit satisfactory results. To that end, we developed this platform.



This real-time machine learning platform mainly consists of two parts: real-time Feature calculation and real-time Model calculation. The system also has a number of challenges, including the following:


1) Machine learning requires all kinds of Metrics, and there are many DataSource’s
2) Multiple dimensions, such as user dimension and commodity dimension. The stacking of dimensions and even the Cartesian product leads to the final Metrics being massive, the states being huge
3) Machine learning is complex and consumes a lot of CPU
4) Some data cannot be stored in State, which requires external storage and a large number of external IO

3. Real-time A/B Testing

A user’s Query can also change over time. A typical example is real-time A/B Testing.
Algorithm engineers will involve a variety of models when tuning models, and different models have different calculation modes and methods, resulting in different calculation results. Therefore, there are often different Query subscriptions to real-time data, and the results are generated and the Model is iterated based on user feedback to finally get the optimal Model. The challenge of A/B Tesing is that algorithm engineers tend to calculate A lot of Metrics, and all Metrics are calculated in real time, which wastes A lot of resources.



In response to this challenge, we designed A/B Tesing framework development platform. It’s used to synchronize Metrics that are of interest to algorithm engineers, aggregate them, and send them to the Druid engine. In this way, algorithm engineers screen out the advantages and disadvantages of the results according to different jobs, and finally carry out statistical analysis on different Metrics according to Druid to build the Model.


To sum up, real-time computing has the following challenges within Alibaba:
1) Huge business and multiple scenarios lead to complex logic
2) Large amount of data, with many jobs and machines
3) Low latency, data accuracy, high throughput requirements

Selection and optimization of Flink

To address these challenges, we investigated a number of computing frameworks and finally chose Flink for the following reasons:
1. Flink has introduced and designed State in a good way, and complex logical calculations based on State such as JOIN can be well described
2. Flink introduced chandy-Lamport algorithm, which can perfectly realize Exactly-Once and achieve high throughput under low delay.


However, Flink still has many defects in State, Chandy-Lamport algorithm and other aspects, for which Ali opened a project named Blink.



Blink is a combination of open source Flink and Alibaba Improvement, mainly divided into two parts:
1. BlinkRuntime
Different companies use Flink differently in terms of storage, scheduling, and underlying optimization. This layer is not consistent with the community. We call it BlinkRuntime.


2. Flink SQL
Native Flink only has a relatively low-level DataStream API, so users need to design a lot of code when using DataStream. Moreover, DataStream itself also has design problems, and all user codes need to be modified every time. The Alibaba team redesigned Flink SQL for streaming computing and pushed back into the community. We named it Flink SQL because we wanted to be consistent with the community at the API layer and embrace the open source ecosystem.

BlinkRuntime core optimization decryption

1. Deployment and model optimization




Optimization includes the following:
1) Solve the problem of mass deployment. A Cluster in Flink has only one JobMaster to manage all jobs. As the number of jobs continues to increase, a single Master cannot accept more jobs, resulting in a bottleneck. Therefore, we restructured the architecture so that each Job has its own Master.


2) In early Flink, TaskManager managed many tasks, and problems of one Task would cause TaskManager to crash, thus affecting other jobs. We have enhanced Job isolation by giving each Job its own TaskManager.


3) Import ResourceManager. ResourceManager communicates with JobMaster to dynamically adjust resources in real time to achieve optimal cluster deployment.


4) We have applied these optimizations not only to YarnCluster, but also to Mesos and Standalone deployments.


With this work, Flink can be applied to large-scale cluster deployments

2. Incremental Checkpoint




Flink stores State in different ways: memory and external storage. In the face of a variety of states such as machine learning, memory cannot meet the storage requirements, so external memory is often needed. Early Flink designs had a flaw: Checkpoint would compress all data and write to disk at each checkpoint. As State increases, checkpoint reads and writes a huge amount of data. As a result, the checkpoint of the Job cannot be completed within one minute. As a result, a large number of rollback is performed during failover, resulting in a long delay.


Therefore, we came up with Incremental Checkpoint. The short answer is to checkpoint incrementally. Since all historical checkpoints have been completed, later checkpoints only need to store different data, which makes the checkpoint lighter and can be completed in seconds, reducing the failover delay.

3. The asynchronous I/o




A lot of times we put data in external storage and need IO to read the data. Using sync-IO in the traditional way, waiting for results to return causes a large delay and a waste of CPU resources. To this end, we designed async-IO, which allows asynchronous multithreading to read data. When the data arrives, the system calls callback to process the data. If the order needs to be preserved, we provide buffer to temporarily store the first data, and send it in batches after all the previous data arrives. The overall performance of the system can be improved tens of times and hundreds of times according to the size of buffer, which greatly improves the CPU utilization and data throughput of a single machine.


Most of the optimizations described above have been pushed back into the community.

Flink SQL core function decryption

1. Alibaba completed 80% of the R&D work of Apache Flink SQL

Currently, 80% of Apache Flink SQL is contributed by Alibaba, including 200 submissions and over 100,000 lines of code. The reason for using Flink SQL is that we found the underlying API to be very inconvenient for users to migrate and go online. So why did we choose SQL? Here’s why:
1) SQL is a descriptive language, which is suitable for describing the requirements of jobs.
2) SQL has a good optimization framework, which enables users to focus on business logic without caring about State, and thus has a low threshold of use.
3) SQL is easy to understand and suitable for people in different fields.
4) SQL API is very stable, the Engine update does not need to change the user’s Job.
5) Some application scenarios require streaming updates, batch validation, and simultaneous batch and streaming of SQL can bring huge benefits. Batch computing using SQL, we can achieve batch and stream unification on this basis.

2. Stream vs. batch

The core difference between the two is that the data in stream is infinite and the data in batch is finite, which leads to three other differences:
1) Stream processing does not end and produce a result, batch processing ends after returning a result. For example, after double 11, the batch process calculates the total amount spent by all buyers that day, while the stream process needs to track the transaction amount in real time, counting over and over again.
2) In flow computing, we need to checkpoint and retain the status. When the machine is down, a large number of jobs need to be rolled back. Batch computing does not, and its input data is often persisted.
3) The stream data will be updated constantly. For example, the total amount spent by a buyer is constantly changing, while the batch data is the total amount spent in a day and is fixed. Stream data is changed while batch data is not.

3. QueryConfiguration

In order to define when to generate flow calculation results and how to preserve state, we designed the Query Configuration, which consists of two parts:
1. Latency SLA
Defines the delay from data generation to presentation, such as double 11 large screen is second level.


2. State Retention/TTL
The State in the stream data cannot always exist, and the user sets TTL (expiration time) to resolve this problem.
In this way, we eliminate the distinction between streams and batches and achieve unification. Next we need to consider how to design streaming SQL.

Dynamic-table implements streaming SQL

The key problem is that SQL operates on tables in batch processing and there are no tables in the stream data. Therefore, we create dynamic tables whose data changes over time. Dynamic tables are another form of flow that has duality, meaning they can be converted to each other without breaking the consistency of the data. Here’s an example:



In the figure on the left is the input stream. We generate dynamic-table for each piece of data, and then send out the changes of the Table using Changelog. As data is entered, the data on both sides remains the same, proving that there is no semantic or data loss from dynamic-table.


This way, we can do SQL based on the table. A Stream is understood as a dynamic-table. A Dynamic query produces a new Table. It is worth mentioning that dynamic-table is a virtual layer and does not require storage to land. Let’s look at another example:



As shown, we perform continuous queries when there is an input stream. Since continuous query convert was added, the left and right streams have been transformed. In summary, dynamic tables greatly support our ability to perform continuous query SQL on streams.

5. Stream SQL should not exist on earth

As the example shows, there is no need to create new streaming SQL with dynamic-table, and we can probably conclude that streaming SQL should not exist on earth. Keeping ANSI SQL was our principle in building Flink SQL, and ANSI SQL is perfectly capable of describing Stream SQL.

6. ANSI SQL function implementation

In addition, we need to implement all the functionality of ANSI SQL. Alibaba internally implements all the functions required by the Batch framework: DML, DDL, QueryConf, UDF/UDTF/UDAF, join Join, withdraw, Window aggregation, query optimization and so on. Let me elaborate on a few of them:
1) JOIN
Streams and dynamic tables have duality. An SQL join that appears to be a join of a Table is actually a join of a stream. The underlying implementation is as follows:
A result is produced immediately when both sides receive data, for example order 5 and 6 arrive in close time. Data from one side will be stored in the State and queried for the opposite State. If it does not exist, the output will not be generated until the opposite data comes. In a word, the two streams have two states. The data on one side is saved and waited for the data on the other side after arrival. After arrival of all the data, the inner join produces the result. In addition, the diagram introduces joins for streams and external tables. In machine learning, a large amount of data is stored in HBase. The operation of connecting to HBase is actually connecting to an external table. There are two modes:
A) Look up Query the external table for results when the stream data arrives.
B) Send the version number to the external storage service, and the storage gives the result based on the version number.


It is worth noting that there is no new design or introduction of the Query syntax for this functionality (which is fully implemented in SQL-2011). Again, it works on batch computing.


2) Retraction
Retracting is an important concept in stream computation. here is an example: calculating word frequency
Calculate the frequency of each word when the English text arrives. “Hello World Bark” occurs once for each word, producing numbers of 1 to 3. When the data is updated to add a Hello, we insert 2 — 1 data into the word frequency table, but this causes a problem with the number of words with frequency 1. The problem arises because the stream data is constantly updated, and we need to be able to detect this error and have a rollback mechanism. In fact, the Query Optimizer can be used with SQL to determine when to withdraw, which is user-insensitive. This is where SQL has the advantage of having a natural optimization framework.
As shown above, the first scenario does not need to be withdrawn and the second scenario does, which is entirely up to the optimization framework and not the user.


3) the Window aggregation
Window aggregation is an important capability of Flink SQL. In this example, we count each hour of data aggregation. We also support sliding Windows and Session Windows. Window aggregation is actually a small batch processing in accordance with the Window standard.


4) Query Optimization
In addition to adding new features, we also did a lot of query optimization. For example, in the async-join service Table, we will automatically optimize the Table into Async state and rewrite the final Runtime implementation. We also merge Multiple joins and do micro-batching. Without micro-batching, the arrival of a piece of data would be accompanied by read and write IO. With micro-batching we can process thousands of pieces of data in two IO. In addition, there are also join/ Aggregate Pushdown optimization and TopN optimization.
As shown in the figure above, we want to take the top three cities by sales volume. There are two solutions to the user’s Query:
A) Sort the saved cities for each piece of data, and then intercept the first three cities, consuming a large amount of storage computing resources
B) The Query Optimizer automatically recognizes the Query statement and saves only the first three cities, greatly optimizing computation and storage complexity

Alibaba real-time computing application

1. Alibaba Cloud Stream computing development platform




The platform allows users to write SQL, and input data generates output to determine whether the logic is correct or not. If yes, you can use the platform to deploy jobs in clusters and check the Job running status. The whole platform has fulfilled all the requirements of real-time computing, integrating development, Debug, online, deployment, operation and maintenance, greatly accelerating the efficiency of user development and online. It is worth mentioning that most jobs were released through this platform during Singles Day this year. Ali Cloud, including public cloud and private cloud, is also exported to small and medium-sized enterprises through this platform, allowing them to share alibaba’s real-time computing capabilities.

2. Porsche, alibaba’s real-time machine learning platform




This platform is a UI drag and drop platform for algorithm students, providing standard components for them to develop complex components. Users can generate graphs after connecting components according to rules, which can be online and deployed after being optimized and translated into SQL. This platform eliminates the cost of learning SQL algorithm students, mainly internal open.

Double 11 real-time calculation summary




The figure above shows alibaba’s real-time computing architecture, with hundreds of machines at the bottom, Resource Management and Storage deployed uniformly on top, Blink Runtime and Flink SQL. Users submit jobs via StreamCompute and Porsche platforms, and hundreds of engineers within Ali have submitted thousands of Flink SQL jobs. That’s the state of alibaba’s real-time computing.
With the help of real-time computing, double 11 won 168.2 billion yuan of results. The contribution of real-time computing is mainly reflected in the following points:


1. This Double 11 is the largest concurrent event in Internet history. The real-time aggregation of hundreds of thousands of transactions and payments was all brought about by Blink computing
2. The presentation of 10 billion Data in 3 minutes and 01 seconds not only requires the high throughput of a high Data Base, but also tests the speed of real-time computing
3. The algorithm platform has obtained good search and recommendation results and achieved overall GMV growth