This article is arranged from the topic “Flink’s practice in iQiyi’s advertising Business” shared by Han Honggen, technical manager of IQiyi, in Flink Meetup in Beijing on May 22. The content includes:

  1. The business scenario
  2. Business practices
  3. Problems and solutions in Flink use
  4. The future planning

1. Service Scenarios

The application scenarios of real-time data in advertising business can be mainly divided into four aspects:

  • ** Data large screen: ** including exposure, click, income and other core indicators display, as well as failure rate and other monitoring indicators;

  • ** Abnormal monitoring: ** Because the link of advertising is relatively long, any fluctuation on the link will have an impact on the overall effect of advertising. In addition, whether each team will have an impact on the overall launch during the process of going online can be observed through the anomaly monitoring system. We can also observe whether the trend of business indicators is reasonable, such as in the case of normal inventory, exposure whether there are different fluctuations, which can be used to find problems in real time;

  • ** Data analysis: ** Mainly used for data enabling business development. We can real-time analysis of some abnormal problems in the process of advertising, or based on the current effect of advertising to study how to optimize, so as to achieve better results;

  • ** Feature Engineering: ** Advertising algorithm team mainly does some model training to support online delivery. The technical features were initially mostly offline, but as real time progressed, some projects began to move to real time.

Ii. Business practice

There are two main types of business practices, the first is real-time warehouse and the second is feature engineering.

1. Real-time storage

1.1 Real-time warehouse – target

The goals of real-time digital warehousing include data integrity, service stability, and query capability.

  • ** Data integrity: ** In the advertising business, real-time data is primarily used to guide decisions, such as advertisers need to guide subsequent bids or adjust budgets based on real-time data currently placed. In addition, the monitoring of failure rates requires that the data itself be stable. If the data are volatile, it’s very poor guidance, or no guidance at all. So completeness itself is a tradeoff between timeliness and completeness;

  • ** Service stability: ** The production chain includes data access, computing (multiple layers), data write, progress service and query service. There is also the quality of the data, including the accuracy of the data and whether the trends are in line with expectations;

  • ** Query capability: ** There are many application scenarios in the advertising business. Different OLAP engines may be used in different scenarios, so the query mode and performance requirements are inconsistent. In addition, when doing data analysis, in addition to the latest and most stable real-time data, real-time + offline analysis queries, data cross-source and query performance requirements are also included.

1.2 Real-time warehouse – challenge

  • ** Data progress services: ** There is a tradeoff between timeliness and completeness.

  • ** Data stability: ** As the production link is relatively long and multiple functional components may be used in the middle, the impact of end-to-end service stability on the overall data accuracy is relatively critical.

  • ** Query performance: ** Mainly includes OLAP analysis capability. In the actual scenario, the data table contains both offline and real-time, the size of a single table can reach hundreds of columns, and the number of rows is also very large.

1.3 Advertising data platform architecture

Above is the AD data platform infrastructure diagram, viewed from the bottom up:

  • At the bottom is the data acquisition layer, which is consistent with most companies. The business database mainly contains the advertiser’s order data and the strategy of placing; Buried point log and billing log are generated in the process of advertising link;

  • In the middle is the data production part, and underneath the data production is the big data infrastructure, which is provided by a cloud platform team of the company, including Spark/Flink computing engine, Babel unified management platform. Talos is a real-time data warehouse service, RAP and OLAP correspond to different real-time analysis and OLAP storage and query services.

    The middle tier of data production is the services that the AD team includes, such as offline and real-time computing that are typical in production.

    • Offline is a common hierarchical model, and the scheduling system is to manage and schedule offline tasks effectively.

    • There are also many engines used in real-time computing. We started real-time computing in 2016, and we chose Spark Streaming at that time. Later, with the development of big data technology and the company’s business requirements, different scenarios were generated, and then the computing engine Flink was introduced.

    • The underlying scheduling of real-time computing relies on the Babel system of cloud computing. In addition to computing, there is also data governance, including schedule management, which refers to real-time computing in which a data report has been stable progress to the point in time. Offline actually corresponds to a table, which partitions.

    • Blood management includes two aspects, offline including table level blood and field blood. Real time is primarily related at the mission level.

    • As for lifecycle management, in an offline store, its calculations are continuously iterative. However, if the data retention time is very long, the amount of data on the underlying storage pressure will be relatively large.

    • Data lifecycle management is primarily a tradeoff between business requirements and storage costs.

    • Quality management mainly includes two aspects. One part is in the data access layer to judge whether the data itself is reasonable; The other part is in the data exit, which is the result indicator layer. Since our data will be used by many other teams, it is important to ensure that the data is properly calculated at the data exit level.

  • The upper layer is unified query service, we will encapsulate a lot of interfaces for query.

    • Because datalization includes offline and real-time, as well as cross-cluster, some core functions such as select group, select table, complex query and split are carried out in intelligent routing.
    • The query service manages the popularity of historical queries in a unified manner. This allows for further service lifecycle management on the one hand, and on the other hand allows you to see which data makes the most sense to the business.
    • In addition to life cycle management, it can also guide our scheduling system, such as which reports are critical, so that these tasks can be prioritized when resources are scarce.
  • Above that are data applications, including reporting systems, add-hoc queries, data visualization, exception monitoring, and downstream teams.

1.4 Real-time silo-production link

Data production link is in terms of time granularity. We started with offline warehouse link. At the bottom line, with the advance of real-time demand, a real-time link is generated, which is a typical Lambda architecture.

In addition, some of our core metrics, such as billing metrics, because their stability is critical downstream, we use different routes for multiple activities. After logs on the source end are generated, the computing layer and the downstream storage layer are completely redundant, and unified processing is performed in subsequent queries.

1.5 Real-time multi-storey-progress service

As described above, the indicators of the real-time data we require to be provided are stable and unchanged. The core point of progress service implementation includes the changing trend of indicators in the time window, and the status of the real-time computing task itself is combined, because in the real-time data store, many indicators are aggregated based on the time window.

For example, for a real-time indicator, the output indicator is 3 minutes, that is to say, the indicator of the time point 4:00 includes the data from 4:00 to 4:03, and 4:03 includes the data from 4:03 to 4:06. In fact, it refers to the data of a time window and when it is visible to the outside. Because in real-time computing, the data is coming in, and the data for the 4:00 time window is starting at 4:00, and the metrics are already being generated. As time adds up, the index continues to rise, and finally tends to stabilize. We judge whether it is stable based on the rate of change of the time window indicator.

But if only based on this point, then it also has some disadvantages.

Because the calculation chain of this result table will depend on many calculation tasks, if any task on this link has problems, the current indicator may become normal, but ultimately incomplete. Therefore, on this basis, we also introduce real-time computing task state. When the indicators tend to be stable, we can also check whether these computing tasks on the production chain are normal. If they are normal, it means that the indicators at the time point of the task itself have been stable and can provide external services.

If the computation is stuck, stacked, or has an exception in the restart process, you need to continue to wait for the iteration.

1.6 Real-time store-query service

Above is the query service architecture diagram.

At the bottom is data, with real-time storage engines, including Druid and others. In off-line, data is stored in Hive, but queries are synchronized to OLAP using both engines. To do union queries with Kudu, it will be synchronized to the OLAP engine, which will then use Impala to do the same queries. In addition, for the more fixed ways in the use scenario, it can be translated into Kylin, and then do data analysis on it.

Based on this data, there are multiple query nodes, and above that there is an intelligent routing layer. Query the gateway from the top. When a query request comes in, first determine whether it is a complex scenario. For example, in a query whose duration spans both offline and real-time, both offline and real-time tables are used.

In addition, there is more complex table selection logic in offline tables, such as hour level, day level. After complex scenario analysis, the final table selection is finalized. In fact, when doing intelligent routing, will refer to the left of some basic services, such as metadata management, the current progress of these tables to what point.

For query performance optimization, in data, the amount of data underlying the scan has a significant impact on the final performance. So there will be a report dimension reduction, according to the historical query to do analysis. For example, which dimensions are included in a dimension reduction table and what percentage of queries can be covered.

1.7 Data Production-Planning

As mentioned earlier in real-time data report production, it is implemented primarily in an API-based manner. One problem with the Lambda architecture itself is that there are two computing teams, real-time and offline. For the same requirement, two teams need to develop at the same time, which brings several problems.

  • On the one hand, their logic may differ, resulting in inconsistent results.
  • On the other hand is the human cost, which requires two teams to develop at the same time.

Therefore, our appeal is flow and batch integration, and we think whether we can use a logic to express the same business requirement in the computing layer, for example, we can use flow or batch computing engine at the same time to achieve the calculation effect.

In this link, raw data is brought in via Kafka, followed by unified ETL logic, and then stored in a data lake. Because the data lake itself supports both stream and batch reads and writes, and because the data lake itself can be consumed in real time, it can do both real-time and offline computing and write data back to the data lake.

As mentioned earlier, offline and real-time integration is used for queries, so writing the same table in the data lake can save a lot of work at the storage level and save storage space.

1.8 Data production-SQL

SQL is a capability provided by the Talos Real-time warehouse platform.

From the page, it includes several features, with project management on the left and Source, Transform, and Sink on the right.

  • Some business teams are very familiar with computing engine operators themselves, so they can do some code development;
  • But many business teams may not know much about the engine, or have a strong desire to know, and they can use this visualization to splice a job together.

For example, you can drag a Kafka data source in, do data filtering on it, and then you can drag a Filter operator to Filter logic, and then you can do some Project, Union calculation, and finally output to somewhere.

For those of you who are a little bit more capable, you can do some higher level calculations. Here can also achieve the purpose of real-time data warehouse, in which to create some data sources, and then through the SQL way, the logic expressed, and finally the data output to some storage.

The above is from the development level. At the system level, it actually provides some other functions, such as rule verification and development/test/live, which can be managed in a unified way. In addition, there is monitoring. There are many real-time indicators for the real-time tasks of online running. You can judge whether the current task is in normal state by checking these indicators.

2. Feature engineering

There are two requirements for feature engineering:

  • The first requirement is real-time, because the value of the data gets lower and lower over time. For example, if a user shows that he likes watching children’s content, the platform will recommend advertisements related to children. In addition, users will have some positive/negative feedback behaviors during the process of viewing advertisements. If these data are iterated into features in real time, the subsequent conversion effect can be effectively improved.

    Another key point of real-time is accuracy. Before, many feature projects were offline. In the production process, there was a deviation between the calculation data and the features in the delivery process, and the basic feature data was not very accurate.

  • The second requirement of feature engineering is service stability.

    • The first is the fault tolerance of the job, such as whether the job can be recovered normally when the exception occurs.

    • The other is data quality, the pursuit of end-to-end accuracy in real-time data once.

2.1 Estimate of click rate

Here are some practices in feature realtime, starting with the need for click-through estimation.

The background of click-through rate estimation case is as shown above. In terms of the delivery link, in the front end of the advertisement, the user will watch the movie, the front end will request the advertisement engine, and then the advertisement engine will get the user characteristics and the advertisement characteristics when doing the rough/fine advertisement recall. After the advertisement is returned to the front end, the subsequent user behavior may produce the exposure, click and other behavioral events. When making the click rate estimate, we need to correlate the characteristics of the previous request stage with the exposure and click in the subsequent user behavior stream to form a Session data, which is our data demand.

In practice, there are two aspects:

  • On the one hand is the correlation of exposure and click events in the Tracking stream.

  • Another aspect is the association of feature flow with user behavior.

What are the challenges in practice?

  • The first challenge is data volume;
  • The second challenge is real-time data disordering and latency;
  • The third challenge is accuracy.

The feature is definitely ahead of Tracking in time, but how long should the feature be retained when the successful correlation rate between the two streams is above 99%? Because in the advertising business, the user can download a content offline, at the time of downloading has completed the AD request and return. However, if the user watches the event without the Internet, the event will not be returned immediately. Only when the state recovers, the subsequent exposure and click event will be sent back.

So at this point, the summary time for feature flow and Tracking is actually quite long. After offline data analysis, if the correlation rate of the two streams is more than 99%, then the characteristic data needs to be retained for a relatively long time. Currently, the retention is 7 days, which is a relatively large magnitude.

The figure above shows the overall structure of click-through prediction. As mentioned earlier, the correlation consists of two parts:

  • The first part is the association between exposure and click events in the user behavior stream, which is implemented through CEP.
  • The second part is the association between the two streams. The previously described feature needs to be retained for 7 days, and its state is large, which is already hundreds of terabytes. This magnitude is managed in memory, which has a great impact on data stability. Therefore, we put the feature data in an external storage (Hbase), and then do a real-time data query with Hbase features to achieve this effect.

However, because the timing of the two streams themselves may be staggered, that is, when the exposure and click appear, this feature may not be there, so you can not get this feature. So we made a multilevel retry queue to ensure the integrity of the final two flow associations.

2.2 Click-through rate estimation-In-stream event correlation

On the right of the figure above is a more detailed explanation of why the CEP option was chosen for in-stream event correlation. The business requirement is to associate the exposure and click of the same AD request and the same AD in the user behavior stream. After exposure, such as within 5 minutes of the click, as a positive sample, after 5 minutes of the click is discarded.

Can imagine, when encountered such a scenario, through what kind of scheme can achieve such an effect. In fact, multiple events in a stream can be handled using Windows. But here’s the problem with Windows:

  • If the event sequence itself is in the same window, the data is fine.
  • However, when the sequence of events is across Windows, the normal correlation effect is not achieved.

Therefore, after a lot of technical research at that time, it was found that CEP in Flink could achieve such an effect. In a way similar to policy matching, the matching methods that these sequences needed to meet were described. In addition, it can specify a time window, such as 15 minutes between exposure and click.

On the left of the figure above is a description of the matching rule. Begin defines an exposure that can be clicked within 5 minutes after exposure, followed by a description of a click that can occur multiple times. Within indicates how long the associated window is.

In the process of production practice, this scheme can be associated in most cases, but when making data comparison, it was found that some exposure clicks were not normally associated.

After data analysis, it is found that the characteristic of these data itself is that the time stamp of exposure and click are both millisecond level. When they have the same millisecond time stamp, this event cannot match normally. So we adopted a scheme that artificially added a millisecond to the click event to make an artificial dislocation so that the exposure and the click could be successfully correlated.

2.3 Click-through rate estimation — Dual-stream correlation

As mentioned earlier, feature data is retained for 7 days, so the state is in the hundreds of terabytes. Data needs to be placed in an external storage, so there are certain requirements for external storage when making technology selection:

  • First, high read/write concurrency is supported.
  • In addition, its timeliness needs to be very low;
  • Also, since data is retained for 7 days, it is desirable to have life cycle management capabilities.

Based on the above points, HBase was chosen to form the solution shown in the figure above.

The top line relates the exposure click sequence to each other after passing the CEP. The bottom line relates the feature flow through Flink to HBase for external state storage. The middle core module is used to achieve the association between the two streams. After getting the exposure, click the association to look up the HBase data, and if it works, output it to a normal result stream. For data that does not constitute an association, a multi-level retry queue is created, which degrades the queue after multiple retries. In addition, in order to reduce the scanning pressure on HBase during retries, the retry Gap is gradually increased.

There is also an exit mechanism because retries are not infinite. Exit mechanisms exist for two main reasons:

  • The first point is that the feature data is retained for 7 days. If the corresponding feature is 7 days ago, it itself is not correlated.
  • In addition, in the advertising business, there are some external brush behavior, such as brush exposure or brush click, but it itself does not have a real existence of advertising requests, so this scenario can not get the corresponding characteristics.

Therefore, the exit mechanism means that it will expire after multiple retries, and then it will be in the retried expired data.

2.4 Effective Click

In the effective click scene, there is actually an association between the two streams, but the technology selection in the two scenes is completely different.

Take a look at the background of the project first. In the Internet scene, the film itself is an advertisement. After the user clicks on it, it takes them to a play page. In the playing page, users can watch for 6 minutes for free. After 6 minutes, users need to be members or purchase to continue watching. In this case, the statistics need to be valid clicks, which is defined as watching for more than 6 minutes after clicking.

The implementation of this scenario is technically the association of two streams, including the click stream and the play heartbeat stream.

  • Click stream is easy to understand, including user exposure and click behavior, you can filter click events from inside.
  • The play behavior stream is during the user’s watching process, and it will regularly send back the heartbeat information. For example, a heartbeat is sent back in three seconds, indicating that the user is watching continuously. If the duration is longer than 6 minutes, you need to do some processing on the state itself to meet the requirement of 6 minutes.

In this scene, the Gap between the two flows is relatively small, while in the movie, it usually lasts more than two hours. Therefore, the Gap can be completed within three hours after clicking. Therefore, the state here is relatively small, and Flink state management can achieve such an effect.

Now let’s look at a specific scenario.

In terms of streams, the green part is the click stream and the blue part is the play heartbeat stream.

  • In the state on the left, when a click event comes in, a status record is made for that click, and a timer is registered for periodic cleanup. The timer is three hours long. Since most movies are less than three hours long, if the corresponding play event does not have a target state at this point, the click event will basically expire.

  • In the playing heartbeat flow on the right, this state is a cumulative of the duration, and it is itself a heartbeat flow, say one heartbeat every three seconds. We need to do a calculation here to see if it has accumulated to 6 minutes, and also to see if the current record has reached 6 minutes. A corresponding implementation in Flink is to relationship the two flows through the Connect operator, and then you can formulate a CoProcessFunction in which there are two core operators.

    • The first operator is what processing needs to be done after the stream event of state 1 is obtained.
    • The second operator is what functions can be customized after the second stream event.

    Operators give the user a lot of flexibility, and the user can do a lot of logical control in them. Compared with many Input joins, users have more space to play.

2.5 Feature engineering – Summary

Make a summary of the above cases. Dual-stream management is now very common, and there are many alternatives, such as Window Join, Interval Join, and our use of Connect + CoProcessFunction. In addition, there are some user – defined schemes.

In the selection of the time, it is recommended to start from the business, to do the corresponding technical selection. By first thinking about the relationship of events between multiple flows, and then determining the size of the state, you can somewhat eliminate the infeasible scenarios from many of the above scenarios.

Iii. Problems in Flink use and solutions

1. The fault tolerance

In Flink, fault tolerance is mainly achieved through Checkpoint. Checkpoint itself supports fault tolerance at the Task level within a Job. However, when a Job is actively restarted or abnormally restarted, its status cannot be recovered from the historical state.

A small improvement is that when a job is started, it will go to Checkpoint and get the history of the last successful job. Then it will do initialization management, so that the status can be restored.

2. Data quality

To achieve end-to-end accuracy, Flink needs to enable the Checkpoint function and specify the semantics of accuracy in the Checkpoint. In addition, if the downstream, such as the Sink, supports transactions by itself, it can combine the two-phase commit with Checkpoint and downstream transactions to achieve end-to-end accuracy.

This process is depicted on the right of the figure above. This is a pre-commit process, that is, the Checkpoint coordinator will inject some Barrier data into the Source during the Checkpoint process. After each Source gets the Barrier, it will store the status, and then feedback the completion status to the coordinator. So each operator that gets a Barrier is doing the same function.

When it reaches the Sink end, it commits a pre-commit flag in Kafka, which is guaranteed by the transaction mechanism of Kafka itself. After all operators have finished Checkpoint, the coordinator will send an ACK to all operators to confirm their status. At this time, the Sink end should submit.

3. Sink Kafka

In our previous practice, we found that as downstream Kafka increases the number of partitions, no data is written to the new partition.

FlinkKafkaProducer uses FlinkFixedPartitioner by default. Each Task will only be sent to the corresponding downstream Partition. This problem occurs when the Partition of a downstream Kafka Topic is greater than the parallelism of the current task.

There are two solutions:

  • The first option is to customize a FlinkKafkaPartitioner.

  • Alternatively, the data is written to each Partition in polling mode by default.

4. Enhanced monitoring

For a running Flink job, we need to look at some of its own state. For example, in the Flink UI, many of its metrics are in Task granularity and have no overall effect.

On the platform side, these indicators are further aggregated and displayed in a unified page.

As you can see from the figure above, the display information includes the backpressure status, latency, and CPU/memory utilization of JobManager and TaskManage during operation. There are also checks by Checkpoint, such as whether it has timed out and whether any Checkpoint has failed recently. We will make some alerts for these checks.

5. Monitor and alarm

When abnormal real-time task operation, the user is the need to know the state in time, as shown in the above, there are some alarm, including alarm subscription, alarm level, there are other indicators, according to the previous set of parameter values, if meet these alarm policy rules, and will give alarm people subscribe to push alarm and alarm way including email, phone and internal communication tools, In this way, task exception status notification is realized.

In this way, when a task becomes abnormal, the user can be informed of the state and then intervene.

6. Real-time data production

Finally summarize the iQiyi advertising business in real-time link production above the key nodes.

  • We started real-time from 2016, when the main function point was to do some index real-time, using SparkStreaming;

  • Real-time click-through feature was launched in 2018;

  • Flink came online in 2019 with end-to-end accuracy to a time and monitoring enhancement.

  • In 2020, the effective click real-time feature was launched;

  • In October of the same year, it gradually promoted the improvement of real-time digital warehouse, and gradually SQL the API production mode;

  • In April 2021, the exploration of stream and batch integration will be carried out. At present, the stream and batch integration will be implemented in ETL first.

Previously we did the real-time ETL and offline ETL separately, through batch processing, and then transferred to Hive tables, followed by offline stores. In real time, through real time ETL, into Kafka, and then to do the subsequent real time warehouse.

Do flow in ETL first batch of one of the first benefit is offline for warehouse timely, because the data needs to be done against cheating, so we provide advertising algorithm based feature, anti cheat after aging for subsequent promotion of the overall effect is larger, so if the ETL make integrated real-time on subsequent significance is very great.

After ETL achieves streaming and batch integration, we will put the data in the data lake, and the subsequent offline warehouse and real-time warehouse can be implemented based on the data lake. In the first stage, the ETL can be integrated. In addition, the report side can also be placed in the data lake, so that our query service can achieve an update magnitude. Because before, we need to do a Union calculation between the offline table and the real-time table. In the data lake, we can achieve this by writing a table offline and real-time.

Fourth, future planning

About future planning:

  • The first is the flow batch, which includes two aspects:

    • The first one is ETL integration, which has basically reached online status.
    • The second is a combination of real-time report SQL and data lakes.
  • In addition, the current anti-cheating is mainly realized offline, and some online anti-cheating models may be converted to real-time in the future to minimize the risk.