Ali Mei Search offline data processing is a typical huge amounts of data batch/real-time calculation combined, ali search China team based on internal technology combined with open source large data storage and computing systems, for their own business and technical characteristics of the search platform, provides the complex business scenarios in a single batch processing billions of level data, second grade millions of TPS throughput of real-time computing power.

background

What is offline search?

A typical product search architecture is shown in the figure below. This article will focus on the Offline data processing System shown in the figure below.

What is offline? In ali search engineering system, we call the mS-level services responding to user requests such as search engine, online score, SearchPlanner as “online” services. On the other hand, all kinds of online services such as search engines are referred to as “offline” systems. The business characteristics of commodity search (massive data and complex business) determine that offline system is a big data system from the very beginning. It has the following characteristics:

1. Distinguish between full and incremental tasks in the task model

1) Full volume refers to the process of all search business data generated and transmitted to the online engine, generally once a day. There are two reasons for doing this: some business data is daily updated; The engine needs full data to efficiently index and preprocess and improve online service efficiency.

2) Delta refers to the updating of real-time data changes from upstream data sources to the online engine.

3) High performance requirements. Full volume requires extremely high throughput, ensuring that billions of data can be completed in a matter of hours. Increments need to support tens of thousands of TPS seconds in real time, but also need to have extremely high availability.

2. Diverse input and output data sources should be supported, including Mysql, ODPS, TT and other databases and message queues as input, and search, Ranking, graph, recommendation and other engines as output.

3. Data processing capabilities, such as multi-table Join and UDTF support, are required to facilitate the development and access of search services.

In subsequent paragraphs we will see the evolution and evolution of offline systems architecture around these features in response to changes in the search business.

Development profile

Ali commodity search system started from Taobao search, around the beginning of 2008, the first generation of search system was born, offline system went online. The search offline system has experienced many years of development, the technical architecture has undergone several iterations, and the data processing ability and business support ability have been continuously improved. The main technical architecture and features of search offline are described in stages below.

Taobao search stage

From 2008 to 2012, we focused on supporting the business development of Taobao search. With the increasing commodity quantity of Taobao, we gradually introduced open source big data computing and storage framework such as Hadoop and Hbase to realize the distributed search offline system, which strongly supported the development of Taobao search business. However, at this stage, we only supported less than 5 business lines in Amoy department, for which we invested about 10 developers and the overall efficiency was not high. On the other hand, the relevant system framework code is highly coupled with Amoy business, and many special codes are tailored, which is not conducive to the promotion of architecture and support of other businesses.

★ Component & platform stage

Since the end of 2013, especially the last two years, with comb lines of business and China group technology, the implementation of the strategy, search an offline system requires for more and more different business team (flying pigs, nailing, 1688, AE, Lazada, etc.) to provide support, framework reuse technology, the development efficiency and platform support demand more and more intense. On the other hand, with the development of big data computing and storage technology, especially the rapid development of streaming computing engine, the further evolution of offline system technology architecture also has excellent soil.

We can see that the evolution of the whole search offline system is a step by step step along the two main lines of performance and efficiency, driven by business and technology. This is a typical example of technology and business highly integrated and interactive, mutually promoting development.

Offline platform technology architecture

In the last section, we briefly introduced the development history of offline systems and the evolution of the technical architecture. The technical architecture of offline platform will be introduced in the following sections, which is mainly divided into the platform process and computing and storage architecture.

Platform components and task flows

The figure above describes the structure of offline platform technology components, some of which are briefly described as follows:

  • Maat: Distributed Task scheduling platform based on Airflow with major improvements in scheduling performance optimization, FaaS of the executive, containerization, API and scheduling extensions, providing significant performance and stability improvements while maintaining Airflow compatibility. Multiple Blink jobs of an offline task will establish a dependency relationship and be scheduled through Maat.

  • Bahamut: Execution engine. As the core of the offline platform, Bahamut is responsible for creating, scheduling, and managing offline tasks.

  • Blink: Alibaba internal version of Flink has made a lot of optimization and reconstruction in large-scale distribution, SQL, TableAPI and Batch. All computing tasks on offline platforms are Blink jobs, including stream and Batch.

  • Soman: UI module, connected with Bahamut backend, provides visual functions such as task information display and state management, and is also the main entrance of development business logic for users to create applications.

  • Catalog: Manages storage table information, provides DDL capabilities for various data source tables, and applies for, releases, and changes storage resources on offline platforms.

  • Hippo: A distributed resource management and task scheduling service developed by Alibaba Search. Similar to Yarn, Hippo provides Docker management capability and mainly serves online systems.

  • Swift: Ali Search self-developed high-performance distributed message queue, supporting message throughput capacity of 100 million levels, storage back-end HDFS, storage and computing separation architecture.

The following figure describes the whole process of an offline task from data source to output engine service data. The flow chart is divided into three layers:

  • Data synchronization layer: Synchronizes full and incremental data of user-defined data source tables to Hbase internal tables. This mirror contains two column families cf and D, which store the mirror of the database and the Daily updated data respectively.

  • Data association computing layer: according to the various relations defined in the data source, the data of different dimensions are associated together, the data is sent to the customized UDTF for processing, and the full and incremental data required by the engine is produced.

  • Data interaction layer: Provides storage information for full and incremental data and interacts with the online service Build module.

Fully incremental unified computing model

So how do you shield the user from the technical details inside the offline platform so that the user only needs to focus on the business implementation? Recalling the concept of offline tasks introduced in section 1, offline tasks include full and incremental tasks that have the same business logic but different execution modes. In order to enable users to focus on the development of Business logic, we introduce the concept of Business Table to realize the full incremental unified computing logic by shielding the technical details of offline platform.

Business Table: The Business Table is an abstract Table consisting of a full data Table and/or an incremental flow Table. The full and incremental tables have the same Schema and the same service meaning.

Based on Business tables and data processing components, users can develop a Business logic Graph that describes the process of offline processing, called the Business Graph. The figure below is an example of a Business Graph, in which the top red box identifies the Business Table containing only ODPS full data source, and the bottom red box identifies the Business Table containing Hdfs+Swift. In addition, we also support the combination of Mysql+DRC/ODPS+Swift and other business tables. Common data processing components such as Join and UDTF can also be seen in the figure. The combination of business table and processing components can describe common offline business processing logic.

So how do you turn the Business Graph into a true offline task? As the execution engine of offline platform, Bahamut will transform a Business description into executable offline tasks according to the order of Business Graph->APP Graph->Job Graph->(Blink Job/Maat Job), as follows:

1. Business Graph->APP Graph: We have two important tasks in this link:

1) Correctness verification: verify the validity of connection between nodes (for example, two input source nodes cannot be directly connected), correctness of node configuration (whether database configuration /ODPS configuration is correct) and correctness of Schema derivation according to node information in BG.

2) Task hierarchical optimization: In order to uniformly complete full and incremental execution with Blink Stream mode, we need to store the input source data in internal Hbase and directly use the Blink dimension table Join function to complete data connection. Therefore, when Join or Merge components are encountered during node traversal, an internal HTable node needs to be inserted into AppGraph to synchronize data upstream of Merge or Join to Hbase.

    2. APP Graph->Job Graph: JobGraph is a Blink/Maat task configuration DAG, where each node contains configuration information, which can be directly converted into computing or scheduling tasks in the subsequent process.

    1) Blink JobGraph: The AppGraph is traversed from the data source business table node. Whenever an internal HTable node is encountered, two (incremental/full) synchronization layers of Blink JobGraph will be generated. After all synchronous layer Blink JobGraphs are generated, two (incremental/full) Blink Jobgraphs of associated processing layers are generated using all internal Htables/queues as input.

      ① Synchronization layer: The full/incremental Table configuration in Business Table is adopted to generate the full and incremental Blink task configuration respectively, which describes the process of synchronizing data from data source to internal HTable. For example, for Mysql+DRC tables, in the full stage, full table data will be pulled from Mysql and converted into HFile Bulkload into HTable. In the incremental stage, change data will be pulled from DRC, directly written into HTable, and written into driver queue as required.

      (2) Association processing layer: Multiple HTables are associated, large and wide tables are generated, UDTF is called for processing, and the final data entering the engine is produced. The full and incremental task configurations also need to be generated separately

      2)Maat JobGraph: DAG is a scheduling task description based on Maat. The main purpose of DAG is to schedule Blink tasks at all levels according to their dependencies and execute specific scripts to complete responsibilities such as interaction with external systems. In general an offline tasks will generate the Build/Publish/Stop/Release multiple Maat JobGraph.

    3. JobGraph ->Blink/Maat Job: Traverse JobGraph and call the Translator to convert JobGraph to Blink/Maat Job code. JobGraph was introduced to decouple the underlying computing engine from the computing task description, for example: Our underlying computing engine used to be MapReduce +Blink-1.4-TableAPI, and we recently completed an SQL-based upgrade to Blink-2.1. All we did was basically rewrite a Translator without making any changes to the upper code structure.

    After the above three steps, we completed the transformation from BusinessGraph to Blink/Maat jobs, generated a number of Blink jobs for data synchronization/processing, and converted these Blink jobs into Maat jobs with different functions depending on scheduling. Especially for the offline search scenario, a lot of interaction logic with the downstream engine is added in the scheduling process, including 24 hours of uninterrupted incremental, triggering engine consumption data, switching engine consumption increment queue and other important business processes.

    Storage and computing

    ▲ Hbase based storage architecture

    In 2012, Hbase was introduced as the data storage engine for offline search, which strongly supported the entire development process of the search service from the main search on Taobao to the offline platform. The stability and performance have been clearly verified through multiple Double 11 tests. In terms of functions, the reasons for importing Hbase offline are as follows:

    1. Scan/Get can be used to obtain data in batches or a single item, and BulkLoad/PUT can be used to import data in batches or a single item, which is consistent with the full/incremental search model and naturally suitable for searching offline services.

    2. The underlying storage is based on HDFS, and the ARCHITECTURE of LSM-Tree ensures data security. The architecture of computing and storage separation ensures that the cluster scale is scalable and the overall throughput is easy to improve. Through single-machine performance optimization (Async, BucketCache, Handler layering, Offheap) and cluster capacity expansion, storage is ensured to never become the bottleneck of the system when the business grows significantly.

    3. The feature of Free Schema can cope with the frequent changes of business data and support the data logic of some special business scenarios.

    By introducing Hbase as the internal data storage of the offline system, we successfully solved the problem that caused great pressure to the upstream Mysql in full volume every day, and greatly improved the throughput of the whole system. Data storage to Hbase is also the basis for the transformation of full task to Stream processing process (MR->Stream), which foreshadowing the breeding and development of Blink Stream engine in offline search.

    Of course, Hbase is not without its drawbacks. The chronic problems of JVM memory management, avalanche caused by the overflow of single-machine handlers, and the lack of containerization deployment capability have also brought many troubles. Soon, we will replace Hbase with another set of storage engine developed internally by Ali, hoping to partially solve these problems.

    ★ Computing architecture based on Flink

    In 2016, Flink gradually began to be introduced as a computing engine in search offline, focusing on solving a large number of problems encountered in searching real-time computing scenarios. On the basis of the community Flink version, the real-time computing team developed Blink, adding native YARN mode, Incremetal checkpoint and other features to solve the problem of large-scale distributed operation of Flink. On the other hand, based on DataStream/DataSet interface, Further strengthen the TableAPI and SQL functions, truly unified Stream and Batch call interface, and realize the calculation of business logic SQL development mode.

    Offline platform is an early user and developer of Blink. Since version 0.8, the offline platform has experienced the upgrade and change of several Blink versions. DataStream, TableAPI and SQL have been used as task interfaces successively, and a large number of connectors have been developed to support the interaction between different data sources. Currently, offline platforms are using the latest Blink-2.1.1. Bahamut uses SqlTranslator to directly generate SQL to describe the task logic, and directly submit tasks to different Yarn clusters using Bayes (Blink SQL development platform) as a service. This has the following advantages:

    1. The business logic of using SQL to describe Blink task is very clear, and various operators provided by Blink can be directly used to complete data processing, which is convenient for task debugging. For example: Dim join and groupby, rather than operators like Hbase join that need to be written in Datastream.

    2. Blink 2.1 supports Batch. By using Batch mode, we can directly complete the task of generating HFile, offline MR task, and completely unify the computing engine to Blink. Batch mode Scheduling tasks in batches saves computing resources and improves cluster efficiency. Blink SQL can be converted into Stream or Batch tasks by modifying the submission mode, which is convenient for task debugging and verification while maintaining the stability of business logic.

    3. Submitting tasks to different clusters in service-oriented mode on a development platform such as Bayes completely solves the complex o&M problem of submitting tasks through the GateWay. Adding a new Yarn cluster requires only simple configuration. In addition, Bahamut will also be saved on Bayes automatically generated submitted Sql, you can directly debug and manage tasks on Bayes, convenient for developers.

    The following figure shows a Blink Sql sample automatically generated by Bahamut, which describes a task at the synchronization layer. The task includes Source, Select Oper and Sink, which realizes the synchronization of Real-Time changes from Mysql to Hbase tables.

    Define data source table, CREATE TABLE DRCSource_1 (' tag_id 'VARCHAR, `act_info_id` VARCHAR,) with ( tableFactoryClass='com.alibaba.xxx.xxx.DRCTableFactory', -- other config); CREATE TABLE HbaseSink_1 (' tag_id 'VARCHAR, `act_info_id` VARCHAR,) with ( class='com.alibaba.xxx.xxx.CombineSink', hbase_tableName='bahamut_search_tmall_act', -- other config ); INSERT INTO HbaseSink_1SELECT 'tag_id', 'act_info_id',FROM DRCSource_1;Copy the code

    conclusion

    Search offline data processing is a typical huge amounts of data batch/real-time calculation combined, search our team based on internal technology combined with open source data storage and computing systems, for their own business and technical characteristics of the search platform, provides the complex business scenarios in a single batch processing billions of level data, second grade millions of TPS throughput of real-time computing power. The offline platform currently supports the search business needs of more than 200 different lines of business within the Group, greatly improving the efficiency of business iterations and becoming an important part of the search middle platform. The offline platform will soon be combined with Opensearch/ES on Aliyun to provide customers outside the group with highly available and high-performance search offline data processing capabilities. In the near future, Offline platform will gradually expand to the data processing scene of recommendation and advertisement, with broad application scenarios, anda SARO (Search Advertisment and Recommandation Offline) platform covering Search/recommendation/advertising system will gradually take shape.

    Finally, it’s been two years since the search offline platform went from zero to one, but it’s still far from the SARO platform vision we have in mind. There will be plenty of challenges along the way. Welcome students who are interested in Hadoop, Flink and other big data technologies and have Java background development experience to join us. We will go to the world from Ali and make the world without difficult search. Click “Read the original article” at the end of the article to see the specific position.


    Focus on “Ali Technology”

    Grasp the pulse of cutting-edge technology