Abstract: This paper is shared by Zheng Zhisheng, head of Bilibili Real-time big data platform. Based on the pain point analysis of Bilibili real-time computing, the architecture and practice of Bilibili Saber real-time computing platform are introduced in detail. This sharing mainly focuses on the following four aspects:

I. Pain points of real-time computing; ii. Saber platform evolution; iii. Cases and practices combined with AI; iv. Future development and thinking

Important: Click “PPT” to download all Flink Forward Asia PPT.

I. Pain points of real-time computing

1. The pain points

Every business unit has a need for real-time computing when conducting business research and development. In the early stage, it was difficult to develop without the support of platform system. Because of the different language types and systems of different business departments, management and maintenance were very difficult. Secondly, Bilibili has a lot of BI analysis tasks such as user growth and channel launch analysis. It is also necessary to clean the real-time data of the real-time data warehouse. In addition, bilibili, as a content-oriented video website, also has a strong demand for real-time computing in AI recommendation scenarios.

2. Common pain points

  • High development threshold: Based on the underlying real-time engine to do development, need to pay attention to more things. Including environment configuration, language foundation, and coding process also need to consider the reliability of data, code quality, etc. Secondly, there are various types of real-time engines in the market, which makes it difficult for users to choose.

  • High O&M costs: The o&M costs are mainly reflected in two aspects. The first is poor operation stability. Early teams have Spark clusters and YARN clusters, which makes job stability and fault tolerance difficult to manage. Second, the lack of a unified alarm monitoring system, business teams need to work repeatedly, such as calculation delay, interruption, fluctuation, failover, etc.

  • AI real-time engineering is difficult: Bilibili client’s home page recommendation page relies on the support of AI system, and encountered many problems in AI machine learning in the early stage. Machine learning is a system where algorithms and engineering intersect. Engineering focuses on efficiency and code reuse, while algorithm pays more attention to feature extraction and model output. In fact, the AI team has to undertake a lot of engineering work, which to some extent restricts the implementation of experiments. In addition, the language system and framework system of THE AI team differ greatly, so the engineering is an infrastructure system, which needs to be improved to speed up the PROCESS of AI and reduce the engineering input of algorithm personnel.

3. Streaming computing platform based on Apache Flink

To solve the above problems, Bilibili hopes to build a streaming computing platform based on Apache Flink according to the following three requirements.

  • First, you need to provide SQL-like programming. Bilibili extends SQL, called BSQL. BSQL extends the upper layer of Flink’s underlying SQL, the SQL syntax layer.

  • Second, DAG drag-and-drop programming, on the one hand, users can build their own pipelines through the sketchpad, on the other hand, users can also use the native Jar way to code.

  • Third, the integrated escrow operation and maintenance of operations.

** Covered scenarios: ** Bilibili streaming computing platform mainly covers four scenarios.

  • AI engineering direction, solve the advertising, search, recommended streaming Joiner and dimension table Joiner;

  • Real-time computing feature support, Player and CDN quality control support. Including live broadcast, PCU, lag rate, CDN quality, etc.

  • User growth, that is, how to use real-time computing to analyze channels and adjust the effect of channel delivery;

  • Real-time ETL, including Boss real-time broadcasting, real-time large screen, kanban, etc.

Saber’s platform evolution

1. Platform architecture

The real-time platform is composed of real-time transmission and real-time computing. The bottom layer of the platform uniformly manages metadata, blood relationship, permissions, operation and maintenance, etc. Real-time transmission is mainly responsible for the data into the big data system. Real-time computing provides support for various application scenarios based on BSQL.

As shown in the following figure, APP logs, database binlogs, server logs, or system logs are transmitted in real time. Bilibili’s internal Lancer system handles data landing to Kafka or HDFS. The computing system mainly builds a SET of BSQL based on Saber and manages the scheduling based on YARN.

The upper core builds the running pool based on Flink. The upper layer applies to multiple dimension tables, including MySQL, Redis, and HBase. The State part also extends MapDB and Redis on the basis of RocksDB. Flink requires IO density, which is a very troublesome problem, because the resource scheduling system of Flink has memory and CPU, but the IO unit is not managed uniformly. If a job has a strong DEMAND for I/O, a large amount of CPU or memory resources must be allocated, which may not be sufficient for I/O expansion. Therefore, in essence, Bilibili at this stage transfers the State of IO intensive resources to Redis for relief. After BSQL calculation, data is transferred to real-time data warehouse, such as Kafka, HBase, ES, MySQL, TiDB. Eventually to AI or BI, reports, and log center.

2. Development architecture design

(1) Development architecture diagram: as shown on the left side of the figure below. The top layer is Saber-Streamer, which does job submission and API management. The next layer is the BSQL layer, mainly for SQL extension and parsing, including custom operators and personality operators. Just below that is the run tense, and below that is the engine layer. Run tenses mainly manage the upper and lower levels of engine layer jobs. Bilibili’s early engine was Spark Streaming and later extended Flink, reserving some of the engine layer extensions in the development architecture. The lowest layer is the status storage layer, and the indicator monitoring module is on the right.

(2) Platform Design Guidelines: The Saber platform system design team focused on its boundaries and specifications and guidelines, with the following four key points. The first is to abstract the Streaming workflows. Second, data standardization ensures schema integrity. The third is the generic BSQL parsing layer. The fourth is engineering efficiency.

  • Streaming workflows: Below is an abstraction of a computational model. The essence of a big data computing engine is that data input gets output through a function, so function is essentially a Transform capable of DAG conversion. The abstract form of flow calculation expected by Saber platform is to provide the corresponding Source, which is a DAG of Transform in the calculation process, and finally has the output of Sink.

Normalize semantically in the above abstraction process. That is, the final input and output are given specifications, and the underlying job is submitted through Json representation. In the absence of an interface, you can pull up a job directly using Json.

  • Let the data speak: Data abstraction. The data in the calculation process is reported through data integration. There is a unified platform entry for data integration reporting. Users first need to build a data source of input on the platform. After a user selects a data source, the platform distributes it to Kafka, HBase, and Hive. During the distribution process, users are required to define a Schema. So during data integration, you can easily manage the input language’s Schema. During calculation, the user selects the Input Source, such as an HBase table or a Kafka table, and the Schema is already strongly constrained. Users can output the result table or index by means of BSQL or DAG provided by the platform.

  • BSQL universal design: BSQL follows the design idea of Streaming workflows, and the core work revolves around Source, Transform and Sink. Transform mainly relies on Flink SQL, so BSQL is mainly assembled on Source and Sink, and DDL is supported. DDL here refers to aliyun external data for expansion. In addition, BSQL optimizes the calculation process. For example, it adopts the bucket + hash policy to clean data skew in operator calculation. For the distinct class count, HyperLogLog of Redis is used for imprecise calculation.

  • BSQL parsing model: The BSQL parsing model topology is expanded as shown in the following figure. When a user submits an SQL, the goal is to convert the SQL into a tree. You can then obtain the SqlNode node. SqlNode nodes have a lot of metadata information. Table parser is implemented in the case of SqlNode tree, and different SqlNode nodes are converted into Flink corresponding Streamers for mapping.

  • BSQL execution flow: The user submits THE SQL, BSQL first validates and builds the SQL tree. Verification and construction mainly extract table names and field information, and extract schema from the metadata database to verify the standardization, integrity and validity of SQL. After validation, the input and result tables are registered with Flink’s run tense, which also includes the refinement of UDF and watermark information. In addition, the platform has some extensions to SQL. The third block is the core work of the extension, converting the extended subtree in the SQL tree to the new node, and then committing the SQL DAG to run on Flink.

  • Effect display -DAG: DAG product display, as shown in the following figure, includes the design of parallelism, log, and alarm output of monitoring indicators.

  • Effect -BSQL: Users write SQL according to the schema of the input source of the selected table. Finally, select the corresponding UDF to commit to the corresponding cluster.

  • Effect display – Job debugging: The following figure shows the job debugging supported by the platform. If there is only SQL development without job debugging, it is painful for users. Therefore, the platform supports job debugging SQL through file upload and online sampling.

  • Effect Demonstration – Operation and maintenance: The platform provides users with some monitoring indicators, indicators that can be customized by users and some customized indicators of special SQL implemented by Bilibili. The following figure shows part of the queue in action.

Third, combining AI case practice

1.AI – Machine learning status quo

The AI system has Offline and Online processes. Online (Online training) Do A/B experiments according to the flow, and make recommendations according to the effects of different experiments. At the same time, each experiment needs to have a corresponding model pushed to the line. The AI’s pain point is Offline. Offline trains by streaming. The following figure shows the early stage of Offline flow training. Users need to build streams and live joins of streams to produce live Label streams. The real-time instance stream is produced by the join of stream, dimension table and characteristic information. However, the early related engineering services have single point problems, and the maintenance cost brought by service quality and stability is also very high, resulting in the large input of AI in the construction of early pipelines.

2. Drawbacks and pain points

  • Data timeliness: Data timeliness cannot be guaranteed. Much of the data is computed offline, but many features are very time-sensitive.
  • Engineering quality: single point engineering is not conducive to service expansion and stability assurance.
  • Engineering efficiency: Each experiment has a high threshold, which requires Label production, Features calculation and Instance splicing. Behind the recommendation of different business lines and different scenes, algorithm students do engineering work. They don’t have the same language, which makes the engineering language very confusing. In addition, the flow and batch are inconsistent, and the training of the model in the real-time environment and the offline batch environment is very different. The logic behind it is similar, which leads to the doubling of personnel investment.

3. Engineering of model training

Construct a data calculation Pipeline based on Saber-BSQL and Flink engine, which greatly simplifies the construction of Instance stream. Its core needs to solve the following three problems: Streaming Join Streaming (SJoin), Streaming Join Table (DJoin) and real-time Feature.

  • SJoin- Engineering Background: Large volume of traffic, such as the traffic recommended on the homepage of Bilibili. Click Join to display AI, resulting in clicks and displays from the whole site. In addition, there are not only dual-stream Join, but also three-stream or above Join, such as advertising display stream, click stream, search query stream, etc. Thirdly, ETL is cleaned differently by different joins. If it cannot be expressed in SQL, it is necessary to provide users with a general extension to solve the problem of customized ETL cleaning for different services before joining. Fourth, atypical A Left Join B On time-based Window model. After mainstream A joins successfully in the window time, it needs to wait for the end of the window time before spitting out data, which extends the stay time of mainstream A in the window. This scenario is critical, and bilibili requires similar scenarios not only for advertising, AI, search, but also for live streaming. Since AI machine learning requires uniform positive and negative samples to ensure training effects, the fourth problem is a strong requirement.

  • SJoin- Project scale: Joiner based on online real-time recommendation. The peak QPS of the original feed stream and click stream is 15W and 2W, respectively. The peak QPS of the Join output is 10W and the peak byte volume is 200 M/s. The number of keyState queries remained at the peak of 60w, including read, write, and exist. In the window of one hour, the key quantity of Timer is 15W * 3600 = 5.4 billion, and the RocksDBState quantity is 200M * 3600 = 700G. In practice, the use of native Flink at this scale will encounter more performance problems, such as the earlier version of Flink 1.3.*, which will have poor stability.

  • SJoin- Technical pain point: Below is Flink’s internal topology with WindowOperator. When the user opens a Window, each record is a Window. The first problem is the huge amount of window allocation, QPS and window allocation are basically constant. The second problem is that the Timer Service opens a window for each record. In the early native Flink, it was a memory queue, and there were many problems in the memory queue. In the early days of the underlying queue, the single thread mechanism, data Cache in memory, there are many problems.

Firstly, Timer performance is poor and memory consumption is large. Second, Value RocksDB State can cause traffic jitter in Compact. Similar to HBase, multi-level Compact causes performance jitter and write magnification. Third, when the restart traffic is too large, the recovery cycle of Window and Keystate cannot be controlled because there are only memory queues in the early stage of Timer. Loading a large amount of data from disks takes a long time, and the recovery service takes a long time.

  • Sjoin-optimization idea: Firstly, Timer optimization upgrade. When the community didn’t have a better solution early on, Bilibili tried to develop PersistentTimerManager and later upgraded Flink to use Rocksdb-based Timers. Second, enable Redis as ValueState to improve State stability. Thirdly, the SQL syntax is extended to support SQL semantics in the scenario of atypical A Left Join B On time-based Window.

  • SJoin optimization – Self-developed Timer: Realize the overflow of memory data to disk when it reaches Max. The bottom layer uses MapDB to do disk overwrite. Disk overwrite principle is LSM model, also has the problem of data jitter. Since the window is one hour, it means that data is managed by State in the unit of one hour. As shown on the right side of the figure below, when the hour from 0 o ‘clock to 1 o ‘clock is over, the data will only be written because the record will be spit out after an hour. From 1 o ‘clock to 2 o ‘clock, the data will be written to the new State. The State from 0 o ‘clock to 1 o ‘clock has reached the window time, and the data will be spit out. Self-developed Timer solves the problem of data reading and writing and jitter well. However, the self-developed Timer lacks the CheckPoint mechanism. If the disk on the node fails, State data will be lost.

  • SJoin optimization -RocksDBTimer: Upgrade Flink to introduce rockSDB-based Timer. The architecture after upgrade is shown in the following figure. The data gets topic-feed and topic-click data from Kafka, first performs a layer of cleaning, and then goes into the custom Joiner Operator. The operator does two things: spit the mainstream data into Redis, and then Redis makes the State. Meanwhile, it registers the Key store that needs to open a window into the Timer Service. Next, use the Timer Service’s native CheckPoint to start the incremental CheckPoint process. When the OnTimer reaches the time, it can spit out the data. This scheme meets the requirements of SJoin in high-throughput operation.

  • SJoin optimization – Adding KVStore: The original Flink State cannot meet requirements and jitter occurs when the Value and I/O requirements are high. Therefore, jitter occurs in the RocksDBState. Bilibili has tried a variety of improvements. When the 1-hour window is opened, the data volume is about 700G, and the total flow of the dual-stream 1-hour window reaches TB level. A distributed KVStore is used to store about 700 GB of data.

  • SJoin optimization – Extends SQL syntax: The functional appeal of extended SQL is to display the stream and wait for the 1-hour window. When the click stream arrives, it does not immediately spit out the data completed by the Join, but wait for the end of the window and then spit out. Therefore, the SQL syntax is extended. Although it is not universal at present, it can meet the AI needs of many departments. Select * from A left (global)Time delay join B on A.x =B.x where A.x =xx. It brings great benefits to users.

There are two key points for SQL semantic extension. The definition of SQL semantics extends JoinType on top through Calcite. Start by expanding the SQL into an SQL tree. One node of the SQL tree is left (global)The time delay the join. Extract the subtree and customize the logical transformation rules. StreamingJoinRute is defined here to convert the subtree to a new node. With the asynchronous IO capabilities provided by Flink, the step-trees are converted into Streaming tables and registered into the Flink environment. SQL expressions are supported through the above procedure.

  • DJoin- Engineering Background: Bilibili has different requirements for dimension table data. For example, some dimension table data is very large and the unit is T. At this time, Redis storage will cause waste. And some dimension table data is very small, real-time characteristics. At the same time, dimension table data update granularity is different, can be updated by day, update by hour, update by minute, etc.

In addition, dimension table performance requirements are very high. Because the AI scene will conduct a lot of experiments, for example, if a feature is good, many models will be opened and different parameters will be adjusted for experiments. Under single operation, the more experimental groups, the higher QPS, the higher RT requirements. The storage medium of different dimension table is different, which has significant influence on stability. There are two scenarios in the study. The equivalent is relatively small, can use Redis storage, good stability. However, HBase CP architecture cannot guarantee stability.

  • DJoin- Project optimization: Syntax support is required for SQL of dimension table Join. For Cache optimization, when users write multiple SQL dimension tables to Join, they need to extract keys from multiple SQL dimension tables and query the dimension tables by merging requests to improve I/O and optimize traffic balancing. Second, KV storage supports different scenarios, such as JDBC and KV. In KV scenario, Redis is used for real-time update and query of 100 G level. T level when multiple HBase clusters are used, for example, two sets of HBase are used. The Failover+LoadBalance mode ensures that the 99 line RT is less than 20ms to improve stability.

  • DJoin- Syntax extension: DJoin syntax extension is similar to SJoin syntax extension. It transforms SQL tree subtrees and extends AsyncIO to realize dimension tables.

  • DJoin-HBase High availability: HBase is used to store data when the dimension table data reaches T level. HBase HA Adopts two HBase clusters and Failover AB mode. There are two things to consider. The first is the data update mechanism. Data can be updated by hour or by day. HFile BulkLoad is imported in serial mode and Interval is imported at intervals. After data is imported, data is preheated to ensure the stability of two HBase clusters. The second is the data query mechanism. Hystrix is introduced to implement service fusing and rollback. When the availability of cluster A decreases, data is dynamically switched to cluster B based on THE RT quality of CLUSTER AB to ensure data traffic balance.

The following figure shows the HBase dual-cluster architecture. The right side is offline. In days, a DAG is pulled up through the scheduling framework for calculation. DAG output passes through HBase Sink with two layers of serial data to ensure that data is written from A to B. In the run-time, through Flink and AsyncIO, through two layers of HystrixClient. The Layer-1 HystrixClient collects THE RT communication quality of HBase from the Layer-2 HystrixClient and dynamically distributes the traffic to two HBase clusters based on the RT communication quality. When the stability of cluster A is good, traffic flows through cluster A. If jitter occurs in cluster A, A certain ratio of traffic is dynamically switched to cluster B based on the failure rate.

4. Real-time Pipeline for model training

The whole system solved the AI model training pre-generated data to the model Pipeline. Show and click to implement Joiner through BSQL solution. Real-time feature data is calculated by BSQL, and offline data is solved by offline scheduling. The Join of dimension table will form Pipeline through BSQL, so as to give machine learning team Instances of flow, training model, output model.

Iv. Future development and thinking

1.Saber- Complete basic functions

As more and more people use platforms, basic operations are critical. The Saber platform will improve SQL IDE development by providing richer version management, online and offline, task debugging, resource management, basic operations, and more. At the same time, operation and maintenance of enrichment operations. The operations include SLA, online approval, priority, various system monitoring indicators, user-defined indicator alarms, and job OP operations.

2.Saber- Application ability enhancement

Saber application capabilities will evolve toward AI. For example, in the engineering aspect of model training, the concept of experimental dimension will be introduced to pull UP SQL Pipeline through experiments. At the same time will do model training students unified stream, batch SQL reuse. And conduct model experiment effect, evaluation, warning and so on. Real-time feature engineering will support multi-feature composite computing, covering feature calculation, storage, query and other scenarios.