This article is based on the practice of Flink’s Implementation in Bytedance’s Recommendation Feature System shared by Guo Wenfei, who is in charge of basic services of Bytedance’s recommendation system, on May 22, Apache Flink Meetup. The main contents include:

  1. Business background
  2. Next generation system architecture
  3. Subsequent planning

By 2021, bytedance’s products had more than 1.9 billion MONTHLY active users. In the context of product business represented by Douyin, Toutiao and Watermelon Video, a strong recommendation system is particularly important. Flink provides very powerful SQL modules and stateful computing modules. At present, in byte recommendation scenario, real-time simple counting feature, window counting feature and sequence feature have been completely migrated to Flink SQL scheme. In combination with Flink SQL and Flink stateful computing capabilities, we are building the next general-purpose unified architecture for basic feature computing, which is expected to efficiently support the production of commonly used stateful and stateless basic features.

I. Business background

For ByteDance’s products, such as Toutiao, Douyin and Watermelon Video, recommendation based on Feed stream and short duration is the core business scenario. The most basic fuel of recommendation system is feature, and efficient production of basic feature is crucial to the iteration of business recommendation system.

1. Main service scenarios

  • Douyin, Huoshan and other short video application recommendation scenes, such as Feed stream recommendation, follow, social, city and other scenes, the overall domestic DAU is about 600 million +.

  • The recommended scenes of Feed information flow represented by toutiao and watermelon, such as Feed flow, attention, sub-channel and other scenes, have hundreds of millions of DAUs in China.

2. Business pain points and challenges

The current production status of basic features of Bytedance recommended scenarios is “a hundred flowers bloom”. The basic mode of offline feature calculation is to consume Kafka, BMQ, Hive, HDFS, Abase, RPC and other data sources to calculate features based on Spark and Flink computing engine, and then write the feature results to online and offline storage. Different types of basic feature computing are scattered in different services, and lack of business abstraction brings about large operation and maintenance costs and stability problems.

More importantly, the lack of a unified production platform for basic features makes it inconvenient to iterate and maintain business features. For example, the service provider needs to maintain a large number of offline tasks by itself, the feature production link is not monitored, and it cannot meet the evolving service requirements.

Under the business scale of byte, building a unified real-time feature production system is faced with great challenges, mainly from four aspects:

Huge business scale: The data scale of douyin, Toutiao, watermelon, volcano and other products can reach PB level on a daily basis. For example, in the Tiktok scenario, millions of QPS are played during the evening peak hours and tens of millions of IOPS are reported by clients. The business side expects that at any time, feature tasks can be continuous and consumption can be lag free, which requires feature production to have very high stability.

High requirement of feature real-time: in the recommendation scenes represented by live broadcast, e-commerce and short video, to ensure the recommendation effect, the timeliness of offline production of real-time feature should be stabilized at the minute level.

Better scalability and flexibility: Feature requirements become more flexible as business scenarios become more complex. From the feature production of statistics, sequence and attribute types to the need to flexibly support window features and multidimensional features, the business side needs the feature center to be able to support the new feature types and requirements gradually derived from them.

Iterative speed of business: characteristics of China provide business-oriented DSL need enough scene, try to get the business characteristics of the production link write less code, the underlying calculation engine, the storage engine for business completely transparent, completely release business computing, storage, selection, the burden of tuning, thoroughly characteristics of real-time realization of large-scale production, characteristics of rising productivity;

3. Iterative evolution

In the process of the explosive growth of byte services, in order to meet the requirements of various business features, recommendation scenarios derive many feature services. These services support rapid business development in specific business scenarios and historical conditions. The general process is as follows:

At the beginning of 2020, we began to introduce Flink SQL and Flink State technology system into feature production, and gradually implemented scenarios such as counting feature system, sample splicing of model training and window feature, so as to explore the idea of a new generation of feature production schemes.

Second, the new generation system architecture

Combined with the above business background, we redesigned a new generation of real-time feature computing scheme based on Flink SQL and Flink stateful computing capability. The positioning of the new solution is to solve the calculation and online Serving of the base characteristics and provide a more abstract BUSINESS layer DSL of the base characteristics.

In the computing layer, we support various complex window calculations based on the flexible data processing and expression capabilities of Flink SQL, Flink State storage and computing capabilities. This greatly reduces the production period of basic service features and improves the stability of feature output links. In the new architecture, we divide the link of feature production into three stages: data source extraction/splicing, state storage and calculation. Flink SQL performs extraction and streaming splicing of feature data, and Flink State performs intermediate State storage of feature calculation.

Stateful feature is a very important kind of feature, among which the most commonly used feature is the feature with various Windows, such as the statistics of the latest 5-minute video playing VV. For the features of window types, there are some schemes based on storage engine within bytes. The overall idea is “light offline and heavy on line”, that is, the window state storage and feature aggregation calculation are all placed in the storage layer and completed online. Offline data flow is responsible for basic data filtering and writing, and offline detailed data is segmtioned and aggregated according to time (similar to Micro Batch). Most of the bottom storage is KV storage or specially optimized storage engine, and the online layer completes complex window aggregation calculation logic. As each request comes in, the online layer pulls the detailed data from the storage layer for aggregation.

Our new solution is “light online and heavy offline”, that is, put all the heavy time slice detail data state storage and window aggregation computing in the offline layer. Window result aggregation is accomplished by offline window trigger mechanism, and feature results are pushed to online KV storage. The online module is very lightweight and only takes care of simple online serving, greatly simplifying the architectural complexity of the online layer. Storage layer in offline state. We mainly rely on RocksDB, the native state storage engine provided by Flink, to make full use of the local SSD disk resources of offline computing cluster and greatly reduce the resource pressure of online KV storage.

For features with long Windows (Windows over 7 days), the Flink Embedded state storage engine does not provide a particularly good external data recharge mechanism (or is not suitable for doing so) due to the backtracking of detailed Flink state data. Therefore, for this scenario of “state cold start”, we introduce centralized storage as the storage medium of the underlying state storage layer, which is a Hybrid architecture. For example, the status less than 7 days old is stored in the local SSD, and the status between 7 and 30 days old is stored in the centralized storage engine. Offline data tracing can be easily written into the centralized storage.

In addition to window features, this mechanism also applies to other types of stateful features (such as sequence type features).

1. Real-time feature classification system

2. Overall architecture

Features of Windows, such as the number of likes of Douyin videos in the last 1h (sliding window), and the viewing duration of direct-broadcast users in the last session (session window);

2.1 Data Source Layer

In the new integrated feature architecture, we uniformly abstracted various types of data sources into Schema tables, because the Flink SQL computing engine layer that the underlying layer relies on provides a very friendly Table Format abstraction for data sources. In the recommendation scenario, the dependent data sources are very diverse, with each feature relying on one or more data sources upstream. The data source can be Kafka, RMQ, KV storage, RPC service. For multiple data sources, data source streaming and batch splicing are supported. Splicing types include Window Join and Window Union Join based on key granularity. Dimension table Join supports Abase, RPC, HIVE, etc. The specific splicing logic for each type is as follows:

The three types of Join and Union can be combined to achieve complex multi-data flow stitching. For example, (A union B) Window Join (C Lookup Join D).

In addition, Flink SQL supports the computing capability of complex fields, that is, the service side can calculate extended fields based on TableSchema basic fields defined by data sources. The business computing logic is essentially a UDF. We will provide UDF API interface to the business side, and then upload the JAR to the feature background loading. In addition, for relatively simple calculation logic, the background also supports submitting simple Python code to achieve multi-language calculation.

2.2 business DSL

Provide a highly abstract feature production DSL language from a business perspective, shielding the low-level computing and storage engine details and allowing the business side to focus on business feature definition. The business DSL layer provides: data source, data format, data extraction logic, data generation characteristic types, data output methods, etc.

2.3 State Storage Layer

As mentioned above, the main pain point of the new feature integration solution is how to deal with various types of stateful feature computing problems (generally sliding Windows). For such features, there is a state storage layer in the offline computing layer architecture, and the rawfeatures extracted from the extraction layer are stored in slices (slices can be time slices or Session slices, etc.). The slice type is internally an interface type that can be extended architecturally based on business needs. In fact, the state stores not the original RawFeature (storing raw behavior data is too much storage space), but a POJO structure transformed into FeaturePayload, which supports various common data structure types:

  • Int: Stores a simple count type (multidimensional counter);

  • HashMap<int, int> : stores two-dimensional count values, such as Action Counter, where key is target_id and value is count value;

  • SortedMap<int, int>: store topK two-dimensional count;

  • LinkedList

    : Stores id_list data.

  • HashMap<int, List

    > : store two-dimensional id_list;

  • User-defined data types. Services can customize data types based on FeaturePayload requirements

Service interface for updating the status layer: The input is the RawFeature extracted from the SQL extraction/stitching layer. The service side can implement updateFeatureInfo interface to update the status layer based on service requirements. The update interface is built-in for common feature types. Customized feature types on the service side can inherit the update interface.

/** * Public Interface FeatureStateApi extends Serializable {/** * Public interface FeatureStateApi extends Serializable { * * @param fields * context: Saves the feature name, primary key, and some configuration parameters; * oldFeature: state before feature * fields: * @return */ FeaturePayLoad assign(Context Context,FeaturePayLoad feature, Map<String, Object> rawFeature); }Copy the code

Of course, a state storage layer is not required for stateless ETL features.

2.4 calculate layer

The feature computing layer completes the feature computing aggregation logic, and the data input for stateful feature computing is a FeaturePayload with slices stored in the state storage layer. Simple ETL features have no state storage layer, and the input is directly the data RawFeature object of the SQL extraction layer. The specific interface is as follows:

Stateful feature aggregation interface:

Public Interface FeatureStateApi extends Serializable {** * public interface FeatureStateApi extends Serializable {** * public interface FeatureStateApi extends Serializable {** * public interface FeatureStateApi extends Serializable {** * public interface FeatureStateApi extends Serializable {** * public interface FeatureStateApi extends Serializable {** * public interface FeatureStateApi extends Serializable { * * @param featureInfos * * @param featureInfos * * @param featureInfos * * @param featureInfos * @return */ FeaturePayLoad Aggregate (Context Context, List<Tuple2<Slot, FeaturePayLoad>> slotStates); }Copy the code

Stateless feature computing interface:

/** * public interface FeatureConvertApi extends Serializable {/** * public interface FeatureConvertApi extends Serializable {/** * public interface FeatureConvertApi extends Serializable {/** * public interface FeatureConvertApi extends Serializable In stateless calculation, it is converted to the internal feature type. * * @param fields * fields: * @return */ FeaturePayLoad convert(Context Context, FeaturePayLoad featureSnapshot, Map<String, Object> rawFeatures); }Copy the code

In addition, trigger mechanisms are used to trigger the execution of the feature computing layer. Currently, the supported trigger mechanisms mainly include:

3. Service landing

At present, in byte recommendation scenarios, the new-generation feature architecture has launched some real-time features in Douyin live broadcast, e-commerce, push, douyin recommendation and other scenarios. It is mainly characterized by stative type, one-dimensional statistical type with window, two-dimensional inverted zipper type, two-dimensional TOPK type, real-time CTR/CVR Rate type feature, sequence type feature, etc.

Achieved remarkable results in achieving core business indicators. In the live broadcast scene, after a batch of features were launched based on the powerful expression ability of the new feature architecture, the core indicators of business viewing and broadcasting and the interactive indicators had significant benefits. In emscenium, 400+ real-time features are available based on the new feature architecture. In the aspect of live streaming e-commerce, the business core GMV and order rate indicators have significant benefits. In the Tiktok Push scenario, based on the offline storage capability of the new feature architecture, user behavior data is aggregated and then written into downstream storage, greatly relieving the pressure on downstream databases. In some scenarios, QPS can be reduced to about 10% of the previous level. In addition, Douyin recommendation Feed, comment and other businesses are reconstructing the original feature system based on the new feature architecture.

It is worth mentioning that in e-commerce and Tik Tok live scenarios, the maximum task state of Flink streaming has reached 60T, and this level is still increasing. It is expected that in the near future, the single-task state may exceed 100T, which is not a small challenge to the stability of the architecture.

4. Performance optimization

4.1 Flink State Cache

Currently, Flink provides two types of StateBackend: FileSystemStateBackend based on Heap and RocksDBStateBackend based on RocksDB. For FileSystemStateBackend, data is stored in memory and the access rate is fast with no extra overhead. However, RocksDBStateBackend has extra overhead such as disk lookup, serialization, and deserialization, resulting in a significant increase in CPU usage. There are a number of jobs that use State inside bytes. For large State jobs, RocksDBStateBackend is commonly used to manage local State data. RocksDB is a KV database, which organizes data in the form of LSM. In the process of actual use, it has the following characteristics ** *

  1. The data interaction between the application layer and RocksDB takes the form of Bytes array, and the application layer needs to serialize/deserialize each access.
  2. Data is constantly appended to RocksDB, and the RocksDB background compacts constantly to delete invalid data.

The scenarios where the service side uses State are mostly get-Update. The following problems occur when RocksDB is used as the local State store:

  1. Crawler data results in hot keys, get-update status, and single KV data reaches 5MB. RocksDB updates continuously flush and compaction in the background, resulting in slow nodes in a single task (tiktok live).
  2. Most of the scene operations are large State operations (the online operation status is about 60TB at present), and State operations are frequently performed in the business logic. In the process of fusing Flink State, it is found that the CPU overhead increases by 40%~80% compared with the original memory based or abase implementation. After optimization, the CPU overhead is mainly concentrated in the serialization/deserialization process.

To solve the above problems, an object Cache can be maintained in memory to optimize hotspot data access and reduce CPU overhead. Based on the above background, we hope to provide a common Cache function for StateBackend. Through the design scheme of Flink StateBackend Cache function, the following objectives can be achieved:

  1. Reduce CPU overhead: Cache hotspot data to reduce the number of interactions with underlying StateBackend and reduce serialization and deserialization overhead.
  2. Improve the State throughput capability: After the Cache is added, the State throughput capability must be higher than that provided by StateBackend. Theoretically, if the Cache is large enough, the throughput capacity should be similar to that of Heap – based StateBackend.
  3. Cache function generalization: Different StateBackend can directly adapt the Cache function. Currently we mainly support RocksDB. In the future, we hope to provide it to other StateBackend, such as RemoteStateBackend.

Working with the Byte Infrastructure Flink team, CPU utilization in most scenarios of online Cache can be improved by up to 50% in real-time feature production upgrades.

4.2 PB IDL Clipping

Kafka is the main data stream we rely on for real-time feature offline generation links within bytes. These KafKas are defined by PB data with various fields. Large corporate-level topics typically have 100+ fields, but most feature production tasks use only some of these fields. For Protobuf data sources, we can save deserialization overhead by tailoring the data stream entirely and masking unnecessary fields. For PB logs, you can directly cut IDL and keep the serial numbers of necessary fields unchanged. In deserialization, the parsing of unknown fields is skipped. This saves CPU, but the network bandwidth does not benefit. After pB-IDL clipping is implemented, CPU gains for most tasks are around 30%.

5. Problems encountered

New architecture Features A production task is essentially a stateful Flink task. The underlying state store, StateBackend, is primarily local RocksDB. There are mainly two difficult problems, one is Checkpoint failure in DAG change of task, the other is that local storage cannot support feature state historical data backtracking well.

  • Real-time feature tasks cannot dynamically add new features: For an online Flink real-time feature production task, we cannot randomly add new features. This is because the introduction of new features will change the DAG calculated by Flink task, so that the Checkpoint of Flink task cannot be recovered, which is unacceptable for real-time stateful feature production task. Currently, our solution is to prohibit changes to the feature task configuration for online deployment, but this also results in online generated features that cannot be taken offline. For this problem has not found a better solution, the later still need to continue to explore.

  • Feature state cold start Problem: RocksDB, currently the primary state storage engine, does not support state data traceability well.

Iii. Follow-up planning

The current new generation architecture is also evolving rapidly in byte recommendation scenarios and has solved the production problem of real-time window features.

In order to realize feature production under unified recommendation scenario, we will continue to make efforts in batch feature production based on Flink SQL stream batch integration capability. In addition, real-time feature entry into the lake will be completed based on Hudi data lake technology to efficiently support offline feature backtracking of pain points in model training scenes. In the direction of rule engine, we plan to continue to explore CEP and promote more practices in emporia. In the direction of real-time window computing, the native window mechanism of Flink will be further investigated in order to solve the fading problem of window feature data faced by the current scheme.

  • Support for batch features: This feature production solution mainly solves the problem of real-time stateful features. In the byte offline scenario, a large number of batch features are generated using Spark SQL tasks. In the future, we will also provide unified support for the characteristics of batch scenarios based on the computing capability of Flink SQL stream and batch. At present, we have preliminarily implemented several scenarios.
  • Feature offline entry into the lake: The construction of offline data warehouse based On Hudi On Flink to support real-time features is mainly to support offline feature backtracking of model training sample stitching scenes.
  • Flink CEP Rules Engine support: Flink SQL is essentially a rules engine, and we currently use Flink SQL online as the execution engine underlying the semantics of business DSL filtering. However, Flink SQL is good at expressing ETL type filtering rules, which cannot express the semantics of rules with timing types. Timing rules in live and e-market scenes need to try Flink CEP’s more complex rules engine.
  • Introduction of Flink Native Windowing mechanism: For stateful features of window types, we currently adopt the abstract SlotState time slice scheme mentioned above to support them uniformly. In addition, Flink itself provides a very perfect Window mechanism, Window Assigner, Window Trigger and other components can be very flexible to support a variety of Window semantics. Therefore, we will introduce Flink’s native Windowing mechanism in the window feature calculation scene to support window feature iteration more flexibly.
  • Flink HybridState Backend architecture: Currently in byte online scenarios, StateBackend underlying Flink uses the RocksDB storage engine by default. This built-in storage engine cannot provide state data recharge and multi-task sharing through external mechanisms, so we need to support KV centralized storage scheme to achieve flexible feature state backtracking.
  • Unified management of static attribute type features: Unified DSL semantics are provided through the feature platform to unify the management of other external statically typed feature services. For example, some other business team dimensions of user categories, tagging services, and so on.

For more technical problems related to Flink, you can scan the code to join the community nail nail exchange group for the first time to obtain the latest technical articles and community dynamics, please pay attention to the public number ~