This paper is shared by the search algorithm architecture team of JINGdong, mainly introduces the application practice of Apache Flink in online learning of jingdong commodity search ranking. The main outline of the article is as follows:

1, the background

2. Jd search online learning architecture

3. Real-time sample generation

4. Flink Online Learning

5. Monitoring system

6. Planning summary

The background,

In the commodity search sorting of JINGdong, we often encounter the problem that the system is not optimal due to the lack of diversity of search results. In order to solve the shortage of product ordering diversity caused by data Matthew effect, we use binomial Thompson sampling modeling, but the algorithm still adopts a consistent strategy for all users, and does not effectively consider the personalized information of users and goods. Based on the current situation, we adopt online learning, make deep learning and Thompson sampling fusion, realize personalized diversity ranking scheme, real-time update model parameters.

In this scheme, Flink is mainly applied to the generation of real-time samples and the realization of online learning. In the process of online learning, sample is the cornerstone of model training. In the processing of super-large sample data, we compared Flink, Storm and Spark Streaming. Finally, Flink is chosen as the framework for the production of real-time sample stream data and the iteration of online learning parameters. The overall link of online learning is particularly long, involving online end feature log, streaming feature processing, streaming feature and user behavior label association, abnormal sample processing, real-time training and update of dynamic parameters of the model, etc. Online learning requires high accuracy and stability of sample processing and parameter state processing. Problems may occur at any stage, so we access the Observer system of JINGdong and have a complete full-link monitoring system to ensure the stability and integrity of data at all stages. Let’s first introduce the structure of JINGdong search online learning.

Second, JINGdong search online learning structure

The ranking model system architecture of JINGdong search mainly includes the following parts:

1. Predictor is a model prediction service that can be divided into static and dynamic parts in the load model. The static part is derived from off-line data training and mainly learns about the dense representation of users and doc. Dynamic part mainly contains weight vector of DOC granularity, which is updated in real time by real-time online learning task. 2. Rank mainly includes some sorting strategies. After the final sorting result is determined, the feature log will be landed in real time, and the features of DOC will be written into the feature data stream in sequence as the data source (feature) of the subsequent real-time samples. 3. The task of the Feature Collector is to undertake the Feature data sent by the online prediction system, shield the online system specific logic such as cache, de-gravity and screening for the downstream, and produce the Feature stream with Query+Doc granularity. 4. The task of Sample Join takes the above user behavior label data, such as feature data, exposure, click, add purchase and order, as data sources and associates them with Flink’s Union + Timer data model to become Sample data that meet business requirements. The algorithm can select different labels as positive and negative sample labels according to the target requirements. 5. The **Online learning ** task is responsible for consuming the real-time samples generated by the upstream for training and updating the dynamic part of the model.

Real-time sample generation

Online Learning has high requirements for the timeliness and accuracy of Online sample generation, as well as the stability of the operation. In the case of massive real-time influx of user log data, we should not only ensure low data delay, high sample association rate and stable task, but also ensure that the throughput of the job is not affected and the resource utilization rate reaches the highest.

The main process of jd search sorting online sample is as follows:

1. Data sources generally include exposure stream, feature stream and user behavior stream as real-time sample data sources, which are unified in the form of JDQ pipeline stream and supported by JD real-time computing platform. 2. After receiving the feature stream, exposure stream and Label stream, perform data cleaning to obtain the data format required by the task. 3. After getting each standard stream, perform union operation on each stream, and then keyby. 4. We added Flink timer in process Function as the real-time window generated by samples. 5. The generated samples will be dropped into JDQ and HDFS in real time. JDQ can be used as the input of the following online learning, while HDFS persistently stores sample data for offline training, incremental learning and data analysis.

Online sample task optimization practice:

Jd.com’s search sample data throughput reaches GB per second, which puts forward high optimization requirements for distributed processing of fragments, oversized state and exception processing.

1. Data skew

When keyby is used, data skew is inevitable. Here, we assume that the key is properly designed, shuffle mode is correctly selected, tasks are not overloaded, resources are sufficient, and data skew is caused by the parallelism setting of tasks. Let’s look at how keys are distributed to subtasks in Flink.

keygroup = assignToKeyGroup(key, maxParallelism)
subtaskNum = computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroupId)
Copy the code

If our concurrency is set to 300, maxParallelism is set to 512, so that some subtasks are allocated to 1 keygroup and some to 2 keygroups, and the data is naturally skew. There are two solutions to the above problems:

● Set the parallelism to 2 power n; ● Set the maximum parallelism to n times of parallelism.

If parallelism is set to 300 and maxParallelism is set to 1200, parallelism can be adjusted to the power of 2. You can also reduce skew by making maxParallelism larger to ensure that there are fewer keys per keygroup.

2, large checkpoint

The state of Flink was used in the online sample. We put the state in the memory by default before, but with the increase of volume, the amount of state data surged, and we found that the GC time was extremely long. Then we changed the strategy and put the state into RocksDB, and the GC problem was solved. We configured checkpoint as follows:

● Enable incremental checkpoint; ● Set checkpoint timeout, interval, and minimum pause time.

● Let Flink manage the memory occupied by RocksDB and tune blockcache, writeBuffer, etc. ● Optimize the use of state data, put state data into multiple State objects to use, reduce the cost of serialization/deserialization.

During the task tuning, we found that our task took a very long time to access RocksDB. Looking at JStack, we found that many threads were waiting for data serialization and deserialization. With the gradual increase of algorithm features, the number of features in the sample exceeded 500, increasing the magnitude of each data. However, when doing sample association, it does not need feature association, only the corresponding primary key association is ok. Therefore, ValueState is used to store primary key, and MapState/ListState is used to store feature equivalence. Of course, these eigenvalues can also be stored in external storage, so there is a choice between network IO and local IO.

● Enable local recovery for Failure Recovery.

Since our checkpoint data has reached TB level, once a failover occurs, both HDFS and the task itself will be under great pressure. Therefore, local recovery is preferred. This reduces HDFS pressure and increases recovery speed.

Iv. Flink Online Learning

For online learning, we first introduce Bernoulli Thompson sampling algorithm. Assuming that reward probability of each commodity follows Beta distribution, we maintain two parameters for each commodity: success times SI and failure times Fi. And the common prior parameters of success alpha and failure beta for all commodities.

The expected reward of the best product is sampled according to the corresponding Beta distribution of the product each time: Q(at) = θ I, and the product with the maximum expected reward is selected and presented to the user. Finally, the real reward is given according to the environment, and the corresponding parameters of the model are updated to achieve the effect of online learning. This parameter represents a commodity feature and is represented by an N-dimensional vector, which is predicted from the original feature through the MLP network. The original features were obtained by DNN network to obtain an N-dimensional vector as the personalized representation of the product. Logistic Regression function was used to model the likelihood function, and Flink was used to construct the real-time sample composed of the representation and real-time feedback, which was used to continuously iterate and approximately update the parameter distribution.

1. Ensure the order of data

After receiving real-time samples from JDQ, the watermark mechanism was used to ensure the orderliness of data because there was no guarantee of data orderliness.

2. Sample data processing

The goods with only exposure and no behavior are regarded as negative samples, and the goods with click and subsequent behavior are regarded as positive samples. When the window reaches a certain positive and negative ratio or data volume, batch training is done to iterate the new parameter vector and put the goods’ data into Flink state. The parameters are then updated as the Dynamic part of the Model.

3. Synchronous iteration and asynchronous iteration

When the personalized EE parameter online learning adopts the asynchronous update mode, the parameter update order is out of order, which reduces the convergence speed of the online learning model and causes the waste of traffic. Therefore, the parameter asynchronous update mode is changed to synchronous update mode to avoid the parameter read and write disorder. In the mode of synchronous update, the parameter vector stored in status needs to be used in the next training iteration. If the parameter is lost, the iteration process of the product will be interrupted. In order to prevent the parameter loss caused by system risk, double parameter guarantee is designed. When a task is abnormal or restarted, parameters can be recovered from checkpoint or SavePoint. If the parameters cannot be recovered, retrieve the parameters of the previous version from the remote online service and record them to state.

4, multi-test version support

The online learning task uses the same Flink task to support multiple version models to conduct AB experiments in different experimental buckets. Different AB traffic buckets are distinguished by the version number. The corresponding real-time samples are processed with doCID + Version as key, and the iteration process does not affect each other.

5, the custom serialization

In order to improve bandwidth utilization and performance requirements, we internally use PB format to transmit data. After investigation, THE TRANSMISSION format of PB is better than the Kryo serialization method of Flink’s General Class. So we used Flink’s Custom Serialization solution to transfer data directly between op’s in PB format.

5. Monitoring system

Here, we distinguish service full-link monitoring from task stability related monitoring, which will be described in details below.

1. Full-link monitoring

The whole system uses the internal Observer platform of JINGdong to achieve full-link business monitoring. It mainly includes predictor service monitoring, feature dump QPS monitoring, feature and label quality monitoring, association monitoring, train monitoring, and AB indicator monitoring, as follows:

2. Task stability monitoring

Task stability monitoring here mainly refers to the task stability monitoring of Flink. Link throughput reaches GB/s scale, QPS of feature message reaches 10W scale, and online learning is indiscontinuous. Related monitoring and alarm are essential.

■ Monitor the container’s memory, CPU, number of threads, and GC

■ Sample related business monitoring

6. Summary of planning

Flink has good performance in real-time data processing, such as disaster, throughput performance, operator accessible using rich, natural support batch flow integration, and there are open source the framework of online learning, doing online learning is not a second choice, with the scale enlargement of machine learning data and the data timeliness, models, timeliness required to ascend, Online learning is not only a supplement to offline model training, but also a trend of model system efficiency development. Our plan is as follows:

Author’s acknowledgements: Thanks to the real-time computing r&d department, search sorting algorithm team for their support.