Brief introduction: “Real-time Data Warehouse Bootcamp” is joined by many technology/product front-line experts such as Ali Cloud researcher Wang Feng, Ali Cloud senior technical expert Jin Xiaojun, Ali Cloud senior product expert Liu Yiming and other real-time computing Flink version and Hologres. Together, they build the course system of the training camp and carefully refine the course content. Targeting the pain points encountered by the current students. Analyse the architecture, scenario and practical application of the real-time database from the simple to the deep. 7 excellent courses will help you grow from a little white to a great man in 5 days!

This paper sort the live broadcast of “based on Apache Flink + Hologres real-time recommendation system architecture analysis – Qin Jiangjie video link: https://c.tb.cn/F3.0d98Xr

Abstract: This paper is organized by the lecture of teacher Qin Jiangjie of the real-time data warehouse online course.

Brief content:

Key technologies of real-time recommendation system based on Apache Flink + Hologres

Principles of Real-time Recommendation System

(I) Static recommendation system

Before we look at a real-time recommendation system, let’s take a look at what a static recommendation system looks like.

Above is an architectural diagram of a very classic static recommendation system. There will be a lot of client applications in the front end, and these users will generate a lot of user behavior logs, and then put them into a message queue, into ETL. Then the offline system is used to do some feature generation and model training. Finally, the model and features are pushed to the online system, and the online service can call the online reasoning service to get the recommendation results. This is a very classic static recommendation system operation process, let’s take a specific example to see how static recommendation system works in the end.

As shown in the figure above, for example, the behavior log of online users may be the log of some users’ browsing and advertising clicks. The purpose of the recommendation system is to help users recommend advertisements, so the following user behaviors can be seen in the log:

Both User 1 and User 2 read PageID 200 and some other pages, and then User 1 read PageID 200 and clicked Ad 2002, then such a series of behaviors can be summarized in the user log through ETL, and then sent to the model training to train the model. In the process of training the model, we will use some features. In this case, we can find that both User 1 and User 2 are Chinese male users, which may be a feature of the user dimension.

In this case, the result we see from the log is that the user clicked Ads 2002 after viewing PageID 100, and both users are male Chinese users. Therefore, it is possible for our model to learn that when a Chinese male user looks at PageID 100, he should be shown an AD 2002, and this behavior will be trained into the model. At this point we will push some of the user’s offline characteristics to the feature library, and then push the model to the line as well.

Let’s say there’s a user ID4, who happens to be a male user in China, and that feature is pushed into the feature library, and that model is pushed online. If user 4 visits PageID 100, the inference service will first look at the characteristics of user ID4, and then according to the fact that he is a Chinese male user, through the training model, the system will push Ad 2002 to him, which is the basic working principle of a static recommendation system.

In this case, if something changes, let’s see if the static recommendation system will continue to work well.

If the characteristic model of User 1 and User 2 is trained today, and the behavior of User 4 is found the next day, the model will assume that User 4 is a Chinese male user whose behavior is the same as that of User 1 and User 2, so what should be promoted to him should be the behavior of a Chinese male user. But at this point we find that user 4 is actually behaving more like user 3 than like user 1 and user 2.

In this case, since the model and features are static, the model needs to be retrained in order to make user 4 more similar to the behavior obtained by user 3, which will lead to a delay in the predicted effect because user 4 needs to be retrained to recommend some behaviors that are more similar to user 3.

So in this practical case, you can see that there are some problems with the static recommendation model:

Statically generate models and features;

Taking the classification model as an example, users are classified according to their similarity, and it is assumed that similar users have similar interests and behaviors

Male users in China, for example, have similar behaviors.

Once a user is classified into a category, he remains in that category until he is trained to be reclassified by the new model.

In this case, it is difficult to make good recommendations for the following reasons:

  • Users’ behavior is too diverse to fall into a fixed category

1) Purchase health care products for parents in the morning, book a hotel for business trip at noon, and buy clothes for family members in the evening…

2) Static systems cannot accurately place users into the right categories at the time.

  • The behavior of a certain category of users is similar, but the behavior itself may change

1) Assume that users “follow the crowd”, but “the crowd” may change;

2) The “big stream” seen by historical data may not accurately reflect the real situation online.

(2) Add the recommendation system of real-time feature engineering

To solve these problems, you can add dynamic features. So what does the dynamic look like? Let me give you an example.

As shown in the figure above, let’s take the dynamic characteristics of the change of the mass flow as an example. The previous model recommended that if a male Chinese user visited PageID 100, he would be recommended Ad 2002, which was a fixed behavior.

Some changes are made on this basis. When sampling the real-time feature, the real-time feature is the top 10 ads clicked by Chinese male users when they visited PageID 100 in the recent period. This feature cannot be calculated offline because it is an online real-time user behavior.

So what is the one thing that can be done after user behavior is generated? When Chinese male users visit PageID 100, we can not only push the ads 2002 to him, but also push the most clicked ads in the recent period when Chinese male users visit PageID 100.

In this case, when Chinese male users visit PageID 100, the most recently visited ads are 2001 and 2002. When the user ID comes in and we see that he is a Chinese male, it is possible to recommend Ads 2001 instead of Ads 2002.

This is an example of how the tide is changing.

In the same way, because the system can sample the real-time characteristics of the user, it can better judge the intention of the user at that moment. For example, you can see what pages a user has viewed in the last minute, and what products he has viewed, so that you can determine in real time what the user is thinking at that moment, and then recommend an advertisement that is more suitable for his immediate purpose.

Is there no problem with such a recommendation system? Let’s do another example.

For example, as mentioned above, User 1 and User 2 are both Chinese male users. It was assumed that their behaviors were similar, which was also confirmed in the previous historical data. But what happens when you actually look at user behavior online?

It is possible that the behavior of user 1 and user 2 May be differentiated, and there may be many reasons for the differentiation, but we do not know what the reason is. The recommendations for user 1 and user 2 May be completely different, so what is the reason for the divergence?

For example, if user 1 is from Shanghai and user 2 is from Beijing. One day, there is a very high temperature in Beijing. At this time, user 2 in Beijing may start to search for long Johns, but it is still very hot in Shanghai. When user 1 in Shanghai searches for clothes, he may still search for some summer clothes. At this time, among Chinese male users, there were some changes in the search behavior of Shanghai user 1 and Beijing user 2. At this point, you need to recommend different ads to them, but static models don’t do that well.

Because this model is actually a static training model, if it is a classification model, the category that can be generated in it is actually a fixed category. In order to generate a new classification, the model needs to be retrained. Since the model training is conducted offline, it may be that the model of this training can only be updated the next day, which will have an impact on the recommendation effect.

  • By adding dynamic feature

1) Real-time tracking of the behavior of a class of users to fit the “big stream”;

2) Real-time tracking of user behavior to understand the intention of users at the moment and to classify users into more appropriate categories.

  • However, when the classification mode of the model itself changes, the most appropriate category may not be found, and the model needs to be retrained to add classification.

Example: New products are launched frequently, business grows rapidly, and the distribution of user behavior changes rapidly.

When you encounter the above problems, you need to add the things you consider to the dynamic model update. How to do the dynamic model update? It’s the same thing.

As shown in the figure above, in addition to ETL the real-time behavior log of users to an offline place for Feature Generation, it may be necessary to export the user behavior log online, and then do Feature Generation, sample splicing, and model training of the advance line.

Model training here is usually flow training, incremental training is done on a basic model, so that the model can better fit the changes of the user’s behavior at the moment. In this case, the model can be trained with this real-time sample to generate new categories, and it will know that the behavior of users in Shanghai and Beijing may be different. Therefore, when a user visits PageID 100, for a user in Shanghai, it may recommend Ads 2002, and for a user in Beijing, it may recommend Ads 2011.

In this case, if user 4 comes over again, the system will see whether he is a user in Shanghai or a user in Beijing. If he is a user in Shanghai, it will still recommend advertising 2002 to him.

Features of the recommendation system with real-time model training:

  • On the basis of dynamic features, the model is trained in real time to make the model as close as possible to the distribution of user behavior at the moment.
  • Mitigate model degradation.

Real-time recommendation system architecture

The above example is to understand how a real-time recommendation system works and why it works better than a normal offline recommendation system. So how can Flink, plus Hologres and a few other systems/projects build such a workable real-time recommendation system?

(I) Classic offline recommendation system architecture

First, let’s take a look at the architecture of the classic offline recommendation system mentioned above, as shown below.

This architecture is the same architecture as the previous one, but with some additional details.

First, real-time user behavior is collected through message queue. The real-time user behavior in this message queue is imported into an offline store to store historical user behavior, and then static feature calculation is done every day. Finally, it is put into the feature store for online inference service.

At the same time, the system will also do off-line sample splicing, and the spliced samples will be stored in the sample storage for offline model training. Offline model training will generate new models for verification every day, and then send them to inference service for use. This model is a T+1 update.

The above is the architecture of a classic offline recommendation system. If you want to push it into a real-time recommendation system, you need to do three things:

  • Characteristics of computing

Static T+1 feature calculation to real-time feature calculation.

  • Samples to generate

Offline T+1 sample generation to real-time sample generation.

  • Model training

Offline training T+1 updates to incremental training real-time updates.

(2) Alibaba search to promote online machine learning process

Alibaba search promotion has been online such a real-time recommendation system, its entire process is actually similar to the offline recommendation system, the main difference is that the whole process is real-time.

As shown above, this system has three main characteristics:

Timeliness: During the promotion period, the whole process can be updated in real time.

Flexibility: Adapt features and models as needed.

Reliability: system stability, high availability, on-line effect guaranteed.

Users can update models and features with timeliness. During the promotion period, users can adjust the features and models at any time, and the results are very good.

(III) Real-time recommender system architecture

What should a real-time propulsion system architecture look like?

As shown in the figure above, compared with the classic offline recommendation system just now, the real-time recommendation architecture has undergone some changes. First of all, the data generated by the message queue will be read out by the system in two copies, one of which will be used for real-time feature calculation and will also be put into feature storage, and the other will be put into real-time sample stitching. A real-time sample can be obtained by performing a double-stream Join with the user characteristics used by the inference service on the line.

In this case, the samples stored in the real-time system can be used for offline model training as well as real-time model training.

No matter offline or real-time model training, the models generated by them will be put into the model storage, verified by the model and finally put online.

Offline model training is day level, but real-time model training can be minute level, hour level, or even second level. At this time, the offline Model training will generate a Base Model for the real-time Model training at a daily level, and then the incremental Model update will be made.

One thing that needs to be mentioned in the whole architecture is that the inference service needs to use the features from the feature store to do the inference, and it also needs to send the features used to do the inference to the message queue along with the Request ID. So real-time sample together, when generating a positive samples, for example shows the user a advertisement, and then click on it is a positive samples, after this time will be able to know what features to the user when using the recommended ads, so the feature information should be retained, reasoning service to real-time sample do samples stitching inside, To produce a good sample.

It can be seen from this architecture that, compared with the classic offline recommendation system, the parts in the green box are all real-time parts. Some parts are newly added, and some parts are transformed from the original offline parts into real-time parts. For example, real-time feature calculation is a new addition, real-time sample splicing transforms the original offline sample splicing into real-time, real-time model training is a new addition, and the same is true for model validation, which transforms the original offline model validation into real-time model validation.

(IV) Real-time recommendation scheme based on Flink + Hologres

If you want to implement the real-time recommendation system architecture just now, what kind of systems will be used?

As shown in the figure above, Kafka is used for message queuing and HDFS is assumed for offline storage. Both real-time and offline feature calculations can now be performed with Flink. Flink’s ability to stream and batch together ensures that the results of real-time and offline feature calculations are consistent.

The function of Hologres here is feature storage. The advantage of Hologres feature storage is that it can provide very efficient point-checking. Another advantage is that when we do real-time feature calculation, some inaccurate features will often be generated, which need to be corrected in the later stage. The mechanism of Flink and Hologres can be used for good feature modification.

Similarly, on the inference service side, message queues here will also use Kafka by retaining the features used to do the inference and putting them into the sample splicing later. Sample splicing will be done by Flink, which is a very classic application scenario of double-stream Join. After the samples are spliced together, the features are added, and then the calculated samples are also put into the Hologres for sample storage.

In the case of sample storage, samples in Hologres can be used for real-time model training by reading the Binlog of Hologres, or offline model training can be done by batch Scan of Hologres.

Both online and offline model training can be done using Flink or FlinkML, also known as Alink. If it is traditional machine learning, TensorFlow can also be used for deep learning model training. Such model may still be stored in HDFS, and then verified by Flink and TensorFlow for the model. Finally, online inference service can be done.

Many users will have their own inference engine, if they have one, or if they want to use Flink or TensorFlow.

(V) Real-time feature calculation and reasoning (Flink + Hologres)

First, let’s look at the process of real-time feature calculation and inference, as shown in the figure above.

Just as mentioned, we will collect real-time user behavior, send it to Flink for real-time feature calculation, and then store it in Hologres for online reasoning service.

Real-time features here might include:

  • The user’s last 5 minutes of browsing history

1) Goods, articles and videos 2) Duration of stay 3) Collection, additional purchase, consultation, comments

  • The 50 most viewed items in each category in the last 10 minutes
  • The most viewed articles, videos, and products in the last 30 minutes
  • The 100 most searched words in the last 30 minutes
  • For search promotion business, can use such real-time features to better obtain the recommendation effect.

(VI) Real-time sample Mosaic (Flink + Hologres)

Further down we will look at the real-time sample splicing part, as shown in the figure below.

Real-time user behavior is captured and fed into Flink for sample stitching. Sample joining together here includes two parts, the first part is the first to know the sample is positive samples and negative samples, it is through the analysis of the log of real-time user behavior, we will have a display of flow, click stream, if show stream Join click stream, and then find a Item is a user clicks on the show, so this is positive samples. If we show that an Item user didn’t click on it, it’s a negative sample, and that’s how we determine the positive or negative sample.

It is obviously not enough to just have positive and negative samples, because this feature is also needed for training. These features are derived from the inference service, which uses certain features to determine whether the user is interested in an Item when it is presented. These features will be retained in Kafka, and then in Flink. In the process of sample stitching, these features recommended at that time will be made through Request ID Join, and then a complete sample will be generated and put into the Hologres.

Here, Flink multi-stream Join capability will be used for sample splicing, and meanwhile multi-stream synchronization, positive and negative samples, and sample correction will also be performed.

(7) Real-time model training/deep learning (PAI-ALINK/TensorFlow)

After the sample is generated, the next step is real-time model training or deep learning.

As shown in the figure above, in this case, the samples mentioned above are stored in the Hologres, and the samples in the Hologres can be used for two purposes: online model training and offline model training.

Online model training and offline model training can be done by using the Binlog and batch Scan functions of Hologres respectively. In terms of performance, it is not that different from a general message queue or file system scan.

Here, if it’s a deep model, you can use TensorFlow for training. If it is a traditional machine learning model, we can use Alink or FlinkML to do training, and then into the HDFS store, the model is stored, and then through Flink or TensorFlow to do the model validation.

The above processes are some of the techniques that can be used in the actual construction of real-time models and deep model training.

(8) Alink — Flink ML (machine learning algorithm based on Flink)

Here is a brief introduction to Alink. Alink is a machine learning algorithm library based on Flink. It is currently open source and is being contributed to the Apache Flink community.



As shown above, Alink (Flink ML) has two features compared to Spark ML:

  • Spark ML provides batch-only algorithms, while Alink provides batch-stream integrated algorithms.
  • Alink is comparable to Spark ML in batching algorithms.

(IX) Offline feature Backfill

After the training section, let’s look at the off-line feature backfill. This process is basically to say that after the real-time feature is put online, the new feature needs to be put online. What should be done?

As shown in the figure above, there are usually two steps. The first step is to add the new features in the real-time system, so from a certain moment, all the features stored in the Hologres will have new features. What about the historical data? At this time, we need to do a feature backfill again, run a batch task with the historical behavior data stored in HDFS, and then fill up some of the historical features.

So the off-line feature backfilling in the architecture diagram is also done by Flink’s off-line feature calculation, which reads out the historical behavior data from HDFS, calculates some off-line features, and makes up for the features in the past historical messages.

Key technologies of real-time recommendation system based on Apache Flink + Hologres

There are a lot of key technologies used in the framework just now, and then I will mainly talk about two points.

(1) features and samples for which revisions can be withdrawn

The first point is the feature and sample of retractable correction, as shown in the figure above.

In the area with the lower shading in the image, some samples and features will be withdrawn and corrected by Flink and Hologres. Why are features and sample revisions needed?

  • The real-time log is out of order

For example, a user click event arrived late due to a system delay and produced a False Negative sample.

  • Generally, off-line samples are recalculated through off-line work

Re-run the entire off-line sample calculation

  • Withdraw mechanism point updates via Apache Flink + Hologres

Update only the features and samples that need to be corrected

The real-time log may be out of order, with some streams arriving early and others arriving late. In this case, some False Negative samples may be generated during the multi-stream Join due to the delay and late arrival of the system.

For example, when making a presentation or clicking a stream Join, you may think that the user did not click on an advertisement at first, but later you find that the user did, but the event arrived late. In this case, the downstream user is initially told False Negative that they didn’t click, but later it turns out that the user clicked, so they need to change the False Negative. When this happens, you need to do a retraction or update of the previous sample, to tell it that the previous sample is not a negative sample, but a positive sample.

Based on the above situation, we need a recall capability on the whole link, which needs to inform downstream of previous errors step by step and correct them. Such a mechanism can be achieved through the cooperation of Apache Flink + Hologres.

Why do such a thing?

In the past, when such False Negative samples were generated, the offline samples were usually recalculated for correction through offline work. The cost of this method is that the entire off-line sample calculation may need to be rerun, but the final purpose is actually only to correct a small part of all the samples, so the cost is relatively high.

Apache Flink + Hologres makes it possible to update the False Negative sample point-blank rather than re-run the whole sample. In this case, the cost of correcting features and samples will be much lower.

(2) Event-based stream batch mixed workflow

Another key technology in this architecture is event-based streaming batch hybrid workflow. What does it mean?

Looking at this diagram, this is a very complex workflow in addition to the systems just shown. Because between different systems, it may have dependencies and scheduling relations, sometimes is data dependence, sometimes is control dependence.

For example, we might periodically or periodically run some off-line static feature calculations, either to backfill features, or to correct real-time feature problems, but either by default periodically, or manually triggered. In other cases, actions of online model verification need to be triggered after the generation of offline model training, or actions of online model training need to be triggered after the generation of online model training.

For example, after sample stitching is completed at 10 am, I want to tell the model training that all the samples before 10 am have been stitched. I want to run a batch offline training task, and I want to do offline model training on the data from 10 am yesterday to 10 am this morning. Here it is the process of triggering a batch task from a stream task. After the generation of batch model training mentioned just now, it needs to be put into the online model verification process. In fact, it is a process in which batch tasks trigger stream tasks, and models generated by online model training need to be verified by online model training. This is the process in which stream tasks trigger stream tasks.

Therefore, in this process, many interactions between different tasks will be involved. Here, it is called a relatively complex workflow. It has both batch tasks and flow tasks, so it is a flow-batch mixed workflow.

(3) Flink AI Flow

How to achieve the flow-batch hybrid workflow implementation?

Flink AI Flow is used, which is a big data plus AI top-level workflow abstraction.

As shown above, a Workflow is typically divided into two steps: Workflow definition and Workflow execution.

The Workflow definition defines Nodes and Relation, which define nodes and the relationships between them. In Flink AI Flow, we define a node as a Logical Processing Unit, and then define the relationship between the nodes as Event Driven Conditions. Beneath this abstraction, an event-based scheduling is done at the Workflow execution level.

Strictly abstract, in a system there will be many events, the combination of these events, may meet some conditions, when a condition is met, will produce some actions.

For example, A workflow might have A task A that listens for various events within the system. When event 1 occurs, then event 2 occurs, then event 3 occurs. When the event occurs in such A sequence, an action needs to be done to start task A. When event 123 occurs in order, it is A condition.

With this abstraction, it is possible to integrate previously traditional workflows with workflows with streaming jobs. Because the previous traditional workflow is based on the change of job state scheduling, the general job is finished, and then to see how to run the next job. The problem with this approach is that if the job is a stream job, the job will never finish and the workflow will not work properly.

Event – based scheduling is a good way to solve this problem. Workflow scheduling will no longer depend on the state of the job changing, but will do so on an event-based basis. That way, even if it is a streaming job, it can generate some events that tell the scheduler to do something else.

In order to complete the entire scheduling semantics, a number of support services are required. Support services that assist in completing the entire scheduling semantics include:

  • Metadata Service
  • Notification Service
  • Model Center

Let’s take a look at each of these support services.

(4) Metadata Service /Metadata Service

The metadata service is to manage the data set. In the workflow, it is hoped that users do not have to find their own data set very tedious, but can help users manage the data set. Users can give a name when they want to use it.

The metadata service will also manage the Project, which refers to the Project in Flink AI Flow. A Project can contain multiple workflows, and the main purpose of managing the Project is to ensure that the workflow can be reproduced.

Within the metadata service, workflows and jobs are also managed, and each workflow may involve many jobs. In addition, it also manages the model lineage, knowing that the version of the model is generated by which job in which workflow, and finally allowing the user to define some custom entities.

(V) Notification Service /Notification Service

The second service is the notification service, which is a primary key event and event listener.



An example is shown in the picture above. A client wants to listen for an event whose Key is the model. If the Key is updated, the listening user will receive a call back telling him that an event has been updated. The primary Key of that event is the model, the Value is the URI of the model, and the version number is 1.

One thing that can be done here is that if you validate a job, it can listen on the Notification Service. When a new model is generated, you need to be notified and then validated against the model, so this can be done with the Notification Service.

(6) Model Center

The model center is responsible for the multi-version management of models, the record of parameters, including the tracking of model indicators and the management of model life cycle, as well as some work of model visualization.

Here is an example to illustrate how Flink AI Flow describes the complex workflow in the real-time recommendation system with a complete workflow.

As shown above, suppose there is a DAG, which contains three tasks: model training, model validation and online reasoning.

First, a job trained by the Scheduler model will be submitted to the Metadata Service to update the state of the job into a state to be submitted. Assuming the environment is a K8S Cluster, it will submit to Kubernetes to run such a training job.

Once the training job is running, the status of the job can be updated through the job status listener. If the job is a streaming training job, a model will be generated after running for some time, and the model will be registered in the model center. After registration, the Model Center sends an event indicating that a new version of the model has been registered, and this event is sent to the Scheduler, which listens for these events.

The Scheduler then looks to see if any conditions have been met when the event is received and what action needs to be taken. When a model is generated, the Scheduler needs to validate the model. Once this condition is met, the Scheduler needs to pull up a job, which is a model validation job.

After the model validation job is pulled up, it will go to the model center to find the most recently generated version of the model, and then it will perform model validation against it. Assuming that the Model validation has been Validated, and that the validation is a batch job, it tells the Model Center that the Model has been Validated, and at that point the Model Center sends a Model, Validated Version Event to the Scheduler. After the Model has been updated, the Scheduler sends a Model, Validated Version Event to the Scheduler. The Scheduler looks at the Model reliability and triggers the inference service on the pull-up line. After the inference service pulls, it pulls over the Validated models to the model center for inference.

Assume that the inference service is also a stream job and is also always running there. After a certain amount of time, a New Model is generated by the training job of the stream on the line, and the path is repeated. It will have a New Model Version Validated by a Model, which will be heard again by the Scheduler. When the Scheduler pulls up another Validated job, Job2 is pulled up again, and the Validated job validates the Model. It is possible that the Validated job passes again, and a New Model Version Validated job is sent to the Model center. This Event is then put back into the Scheduler by the Model Center. At this point, the Scheduler will see that the reasoning work is already there and may not do anything.

The reasoning job also listens for the Model Version Validated events, and one of the things it does when it receives this event is to reload the latest Validated events into the Model center.

Through this example, we explain why we need a flow-batch mixed scheduler and workflow to realize the series of all jobs and workflow in the end-to-end real-time recommender system architecture.

This article is the original content of Aliyun, shall not be reproduced without permission.