background

The author is engaged in the development of content understanding server, mainly processing text, video and audio, producing corresponding features through NLP and CV technologies, providing them to recommendation, search and advertising businesses, and training models for recall and ordering in corresponding scenarios.

The term

  • Real-time features: that is, features that change constantly with time, such as the exposure sequence of the last five minutes of short videos browsed by users and the current image features of live broadcast. Such features are generally updated in real time by streaming computing framework such as Flink. Real-time features are directly stored in KV storage and dynamically updated in real time through Flink.
  • Offline features: features that do not change over time in real time, such as the user’s age and gender, and the cover image of short videos. These features are generally generated once or updated every day. Demographic characteristics such as age and gender are recalculated periodically. The calculated results are stored in HDFS and then written to KV through Spark or MapReduce. Once the features of short video cover image are generated, they are written to KV directly and a copy is stored in the offline data warehouse.
  • Real-time sample: In order to meet the performance requirements during online serving, offline features and real-time features are stored in KV (such as Redis and hbase). Each time the user requests the prediction service, all the features obtained by the request at that time are dropped as the RawSample for subsequent model training. This sample is then joined with the exposure, click and transformation logs of the front end to generate the corresponding label, and finally the real-time samples required by model training are obtained.

demand

  • Content understanding side the new characteristics of delivery to the following recommendations, search, advertising, as a result of real-time sample need to be online on the characteristics of the accumulation, generally more than seven days, after the completion of accumulation model training, offline evaluation standard, to online ABTest throughout the cycle is long, affects the content to understand business iteration speed, Therefore, historical feature data need to be backtracked, and then join into real-time samples to enter into offline model training faster, verify the offline effect of feature, and improve the overall experimental speed.

Basic logic

The data structure

  • OfflineFeature /OfflineFeature
int64 featureId=1;
Feature feature=2;
Copy the code
  • Real-time feature /RealtimeFeature
int64 featureId=1;
Feature feature=2;
int64 timestamp=3;
Copy the code
  • Note:

FeatureId can be a string instead of int64. In this case, MurmurHash64 is used to Hash the string to int64 to improve join efficiency. Of course, the same operation needs to be performed when processing real-time samples.

  • Real-time sample /RawSample
int64 timestamp=1;
repeated features=2;
Copy the code

Real-time samples can obtain characteristic values corresponding to featureId corresponding to real-time features and offline features through convenience features, which is the basis for real-time samples and features to realize join.

Stitching logic

Real-time feature processing can be obtained

int64 timestamp=1;
repeated features=2;
int64 joinFeatureId=3;
Copy the code
  • To implement offline features of real-time sample join, featureId can be directly used to join.
  • When real-time samples need to join real-time features, real-time features need to be aggregated into a sequence in a certain time interval according to the featureId, and then the time is aligned to the beginning or end of the time interval (such as the whole minute or hour), and then the time is combined with featureId as the key. The real-time sample time is also aligned, and the joint of time and joinFeatureId is used as the key. During the join process, the feature with the largest timestamp without crossing is selected and written into the real-time sample according to the comparison between the timestamp of the real-time sample and the timestamp in the real-time feature sequence.

Challenges and solutions

The sample size is huge

  • In a recommended scenario, the real-time sample occupies 450 gb of offline HDFS storage per hour, which is close to 350w, and a single sample size exceeds 100k. If there is shuffle or slight data skew during the whole join process, the spark task is likely to crash. Protobuf arrays are encoded into strings using Base64 and then stored in SNappy compression format. The cost of serialization and deserialization is also quite high.

The first pit

  • When converting RDD to PairRdd at the same time of decoding, shuffle and serialization costs a lot. The executor appears oom and spark crashes
Val rawRdd = sc.textFile(rawSamplePath) val pairRdd = rawrdd. map(row=>{//base64 decodes binary arrays // deserializes binary arrays into PB objects FeatureId (featureId,pb)}Copy the code

The biggest problem in this process is that after reading real-time sample files on HDFS, the number of partitions of RDD is equal to the number of HDFS files, which is 80. When decoding, deserialization, and pairRdd generation are carried out simultaneously, data skew is easy to occur with 80 partitions, resulting in spark task crash. Executors that do not take full advantage of Spark applications will also be slow overall. Solution:

Val rawRdd = sc.textFile(rawSamplePath).repartition(1000) val pairRdd = rawrdd. map(row=>{// Base64 decodes into binary array FeatureId (featureId,pb)}Copy the code

After reading HDFS files, perform repartition. The parallelism is greater, the data skewness ratio is much smaller, and the shuffling, decoding, and deserialization pressure of a single executor is greatly reduced. Executor OOM will almost not appear.

The problem of the join

After real-time features and PairRdd corresponding to feature generation, if join is directly carried out, the large scale of real-time features and uneven distribution of corresponding Featureids will inevitably lead to data skew, resulting in the crash of spark task.

The first solution

  • The real-time samples are aggregated by featureId, sorted by the number of real-time samples corresponding to different Featureids, and broadcast join the real-time features corresponding to the first N Featureids in the head. Spark’s conventional PairRdd join is used for real-time features and real-time samples corresponding to featureId in the middle and tail, and the data skew phenomenon in the middle and tail is much less.

Problem with the first solution

  • The distribution of different Featureids in the real-time sample varies widely, with some less than 100 at most, but the Spark task still crashed, making it difficult to determine how many featureids were used for broadCast join in the header.

Second solution

  • Since the object of real-time sample is very large, featureId is used to join. If there is a slight data skewness, spark task crash will occur. Then we will assign a unique ID to each real-time sample object. Generate a uniqId, use PairRdd(featureId,uniqId) to join with real-time features, After join completion, PairRdd(uniqId,Feature> is obtained because PairRdd(uniqId,rawSample) join is carried out on real-time samples, so there is absolutely no data skew.

Potholes encountered during implementation

  • In the process of generating uniqId, the uUID random method was selected at the beginning. When joining, it was found that the join result of pairRdd with the last two keys as uniqId was empty. This was because RDD generated for real-time samples was too large to cache and would be recalculated, so the natural uniqId was different.
  • MEMORY_AND_DISK_SER, but spark still crashed. The RDD was too large. The I/O, serialization, and deserialization costs a lot during disk writing and reading. That led to oom.
  • Finally, obtain a reqId from RawSample, and use MurmurHash64 to generate a 64-bit uiqId for the reqId, which represents the unique Id of RawSample.

conclusion

Due to the author’s spark development level, the current solution is still very rough, especially for storage and serialization, there are still many areas to be explored, and corresponding optimization needs to be carried out after further learning spark storage and serialization mechanism and Protobuf serialization.