An overview,

1.1 Service Background

In vivo short video recommendation, it is necessary to filter and de-weight the videos that users have already watched, so as to avoid repeatedly recommending the same video to users to affect their experience. In a recommendation request processing process, videos will be recalled based on users’ interests, ranging from 2,000 to 10,000 videos, and then videos will be de-duplicated. Videos that users have watched will be filtered and only videos that users have not watched will be reserved for sorting. Videos with high scores will be selected and sent to users.

1.2 Current Status

The current recommendation is implemented based on Redis Zset. The server writes the video reported by the buried point and the video sent to the client with different keys respectively into Redis Zset. After the video recall, the recommendation algorithm directly reads the playback and distribution records (the whole Zset) of the corresponding user in Redis. Deduplication is realized based on the Set structure in memory, that is, to determine whether the current recall video already exists in the Set of sending or playing video. The general process is shown in Figure 1.

(Figure 1: Current situation of short video reduplication)

Video deduplication itself is filtered based on the videos actually watched by the user. However, considering that the videos actually watched are reported through the buried point of the client, there is a certain delay. Therefore, the server saves the last 100 sending records of the user for deduplication, which ensures that even if the buried point of the client is not reported, They don’t recommend videos they’ve already seen (repeat recommendations). However, videos sent to users may not be exposed, so only 100 videos are saved, so that videos not watched by users can still be recommended after 100 records are sent.

The main problem of the current scheme is that it occupies a large amount of Redis memory, because the video ID is stored in Redis Zset in the form of original string. In order to control the memory usage and ensure the read and write performance, we limit the maximum length of playback records of each user. Currently, the maximum storage length of each user is 10000. However, this can affect the product experience of heavy users.

2. Plan research

2.1 Mainstream Solutions

First, the form of storage. Video deduplication scenarios typically only need to determine whether the video exists, so it is not necessary to store the original video ID. At present, the most commonly used solution is to use bloom filter to store multiple Hash values of the video, which can reduce the storage space several times or even ten times.

Second, storage media. The amount of Redis storage required to support storing 90-day (three-month) playback records, rather than the current outrageous limit of 10,000. For example, according to 50 million users, the average single user plays 10,000 videos in 90 days, and each video ID occupies 25B of memory, requiring 12.5TB in total. Video de-replay will eventually be read into memory, so consider sacrificing some read performance for more storage. In addition, the current Redis is not persisted. If the Redis failure occurs, data will be lost and difficult to recover (due to the large amount of data, the recovery time will be long).

At present, the most commonly used solution in the industry is to use KV disk (generally, the bottom layer is based on RocksDB to achieve persistent storage, and the hard disk uses SSD). The read and write performance is slightly lower than Redis, but compared with memory, disk has obvious advantages in capacity.

2.2 Technology selection

First, playback records. Due to the need to support the playback history of at least three months, the bloom filter is selected to store the video records watched by users. Compared with storing the original video ID, the space occupied will be greatly compressed. We design according to 50 million users, if use Redis to storage in the form of bloom filter play records, also will be more than TB level data, considering we finally filtering operation in the host local memory, so you can accept the performance of reading a little bit low, choose disk KV persistent storage playback records in the form of bloom filter.

Second, record delivery. Since only 100 delivery video records are stored, the overall amount of data is not large. In addition, considering the elimination of the previous 100 records, Redis is still used to store the latest 100 delivery records.

Iii. Program design

Based on the above technology selection, we plan to add a unified deduplication service to support the writing of sending and playing records, and realize video deduplication according to the sending and playing records. Among them, the key consideration is to receive the broadcast buried point into the Bloom filter. After receiving the buried point of playback, writing KV to disk in the form of Bloom filter needs to go through three steps, as shown in Figure 2: First, read and deserialize the Bloom filter. If the Bloom filter does not exist, it needs to create a Bloom filter; Second, update the playing video ID to the Bloom filter; Third, serialize the updated Bloom filter and write it back to disk KV.

(Figure 2: Main steps of unified de-duplication service)

The whole process is pretty clear, but given the need to support tens of millions of users, assuming a 50 million user target, we need to consider four more questions:

  • First, videos are delivered according to brush times (5-10 videos per brush), while playback buried points are reported according to video granularity. Therefore, in terms of recommended weight loss of videos, QPS of data written is higher than that of data read. However, compared with Redis disk KV, its write performance is lower than that of data read, and it supports 50 million users. So how to implement bloom filter to write KV to disk is an important problem to be considered.

  • Secondly, because The Bloom filter does not support deletion, the data that is longer than a certain period of time needs to expire. Otherwise, the data that is no longer used will occupy storage resources all the time. Therefore, how to implement the expiration of the Bloom filter is also an important issue to consider.

  • Third, the server and the algorithm currently interact directly through Redis. We hope to build a unified deduplication service, which is called by the algorithm to realize filtering of watched videos. The server is based on Java technology stack, and the algorithm is based on C++ technology stack, so we need to provide services in the Java technology stack to call the C++ technology stack. GRPC is finally used to provide interface for algorithm invocation, and Consul is used in the registry. This part is not important and will not be elaborated in detail.

  • Fourthly, after switching to the new scheme, we hope to migrate the playback records previously stored in Redis ZSet to bloom filter to achieve smooth upgrade to ensure user experience, so the design of migration scheme is also an important issue to consider.

3.1 Overall Process

The overall process of unified de-duplication service and its interaction with upstream and downstream are shown in Figure 3. When a server sends a video, it saves the current record to the Key corresponding to the Redis record through the Dubbo interface of the unified deduplication service. Using the Dubbo interface, the record can be written immediately. At the same time, monitor the buried point of video playback and store it in disk KV in the form of Bloom filter. In consideration of performance, we adopt a batch write scheme, which is detailed below. The unified deduplication service provides an RPC interface for the recommendation algorithm to call, so as to filter out the recalled videos that users have watched.

(Figure 3: Overall process of unified de-duplication service)

Disk KV write performance than read performance is much worse, especially in the case of the Value is larger write QPS will be worse, live must consider day level case disk KV write performance can not meet the requirements of written directly, so you need to design flow convergence solutions, is a period of time within the same user play convergence a written record, This greatly reduces the write frequency and reduces the write pressure to the disk KV.

3.2 Traffic Aggregation

In order to achieve write traffic aggregation, we need to store the playing videos temporarily in Redis for aggregation, and then write the temporarily stored videos into the disk KV for storage periodically by generating bloom filter. Specifically, we have considered two ways of writing only once in N minutes and batch writing in scheduled tasks. Next we elaborate on our design and considerations in flow convergence and bloat filter writing.

3.2.1 Near-real-time write

After listening to the broadcast buried point reported by the client, it should have been directly updated to the Bloom filter and saved to the disk KV. However, considering the reduction of write frequency, we can only save the played video ID in Redis first, and only write to the disk KV once within N minutes. Let’s call this scheme as near real-time write scheme.

The simplest idea is to save a Value in Redis each time when writing, which will become invalid after N minutes. After listening to the buried point of playback each time, judge whether the Value exists. If it exists, it indicates that KV has been written once in N minutes, otherwise, KV operation will be performed. When data is generated, do not write data immediately and wait N minutes for a small batch of traffic to be collected before writing data. This Value is like a “lock”, protecting the disk KV is only written once every N minutes, as shown in Figure 4. If the current state is locked, locking will fail again, which can protect the disk KV from being written during locking. From the point of view of buried data flow, the original continuous data flow becomes a batch of micro-batch data every N minutes after this “lock”, so as to realize flow convergence and reduce the disk KV write pressure.

(Figure 4: Near-real-time write scheme)

The starting point of near-real-time writing is very simple, and the advantages are obvious. It can write the video ID in the buried point of playback into the Bloom filter in near real-time, and the time is relatively short (N minutes), which can avoid the excessively long data temporarily stored in Redis Zset. However, there are many special scenarios that need to be considered for careful analysis, mainly as follows:

First, keep a Value in the Redis is equivalent to a distributed lock, in fact it is difficult to ensure that the “lock” is absolutely safe, so there might be two receive broadcast buried point that disk KV write operation can be done, but the two read the staging data is not necessarily the same, because the disk KV does not support the bloom filter structure, The write operation needs to first read the current Bloom filter from disk KV, then update the ID of the video to be written to the bloom filter, and finally write back to disk KV. In this case, there may be data loss after writing to disk KV.

Secondly, the data of the last N minutes can only be written into the disk KV by playing the buried point when the user uses it next time. If there are a large number of inactive users, there will be a large amount of temporary data left in Redis to occupy space. At this point, if the scheduled task is used to write this part of data to the disk KV, the problem of concurrent write data loss in the first scenario will also easily occur.

In this way, although the starting point of the near-real-time write scheme is straightforward, it becomes more and more complex, so we have to find another scheme.

3.2.2 Batch Write

Since the near-real-time write scheme is complex, it is better to consider a simple scheme to write the temporary data to the disk KV in batches through a scheduled task. We mark the data to be written, assuming we write every hour, then we can mark the temporary data by the hour value. However, considering that scheduled tasks may inevitably fail to be executed, compensation measures are needed. The common solution is to execute tasks on data 1-2 hours ahead each time to compensate. However, obviously such a scheme is not elegant enough, we take inspiration from the time wheel, and based on this design bloom filter batch write scheme.

Value we will hours end to end, so as to get a ring, and the corresponding data exist the hour value identity, so the same hours value (11 PM every day, for example) data is to be together, if today’s data has failed to perform tasks not perform or is not synchronized to disk KV, then the next day will be a compensation.

Following this idea, we can mold the hour value to a certain value to further shorten the time interval between two compensations. For example, take the module of 8 as shown in Figure 5. It can be seen that the data of 1:00, 2:00 and 9:00, 10:00 will fall on the data to be written identified by point 1 on the time loop in the figure, and will get a chance of compensation after 8 hours. In other words, this value of modulo is the time interval for compensation.

(Figure 5: Bulk write scheme)

So, what should we set the compensation interval to? It is worth considering that the selection of this value affects the distribution of the data to be written over the ring. Our business generally has busy hours and off-hours, and the amount of data in busy hours will be larger. According to the characteristics of short video busy hours, we finally set the compensation interval to 6, so that busy hours fall evenly at each point on the ring.

Determine the compensation after the interval, we think it’s 6 hours compensation or too long, because the user in six hours may have seen a lot of video, if not in time to data synchronization to disk KV, will take up a lot of Redis memory, and we use Redis ZSet temporary user record, too long words will seriously affect the performance. Therefore, we designed to add a scheduled task every hour, the second task compensates for the first task, if the second task still failed to compensate, then after a circle, it can be compensated again (pocket bottom).

In Figure 5, the “data to be written” and the scheduled task are not distributed at the same point on the ring. We designed this way to make the scheme simpler. The scheduled task will only operate on the data that is no longer changing, thus avoiding the concurrent operation problem. Just like garbage collection in Java virtual machines, we can’t recycle garbage and still throw it in the same room. Therefore, the scheduled tasks of the nodes on the ring are designed to process only the data on the previous node to ensure no concurrent conflicts and keep the scheme simple.

The bulk write solution is simple and does not have concurrency problems, but in Redis Zset, data needs to be saved for one hour, which may exceed the maximum length, but considering the reality that the average user does not play a very large number of videos in an hour, this is acceptable. Finally, we chose the batch write scheme, which is simple, elegant and efficient. On this basis, we need to continue to design the play video ID scheme that temporarily stores a large number of users.

3.3 Data Sharding

In order to support the 50 million daily live scale, we need to design the corresponding data sharding method for the timed batch write scheme. First of all, we still need to store the playlist in Redis Zset, because we need this data to filter the videos that the user has already watched before it is written to the Bloom filter. As mentioned above, we will temporarily store one hour’s data. Normally, a user will not play more than 10,000 pieces of data within an hour, so generally there is no problem. In addition to the video ID itself, we also need to save which users have generated playback data in this hour. Otherwise, the scheduled task does not know which users’ playback records should be written into the Bloom filter. If 50 million users are stored, data fragmentation is required.

Combined with the time loop introduced in the batch synchronization section, we designed the data sharding scheme as shown in Figure 6, where 50 million users Hash into 5000 sets, so that each Set stores up to 10,000 user ids without affecting the performance of the Set. At the same time, each node in the time loop saves the data according to the fragmentation mode of this, which is expanded as shown in the bottom half of Figure 6, played:user: Time node ID :{time node ID}: Time node ID :{user Hash value} saves all user ids that have generated playback data in a fragment of a time node as a Key.

(Figure 6: Data sharding scheme)

Correspondingly, our timed tasks are sharded, and each task shard is responsible for processing a certain number of data shards. Otherwise, if the two are one-to-one, the distributed scheduled task is divided into 5000 shards. Although retries are better for failure, there is pressure on task scheduling. In fact, the scheduled task of the company does not support 5000 shards. We divided the timed task into 50 shards, with task shard 0 handling data shard 0 100, task shard 1 handling data shard 100 199, and so on.

3.4 Data Elimination

For short video recommendation to repeat the business scenario, we generally guarantee that users will not recommend a video to the user within three months after watching it, so it involves the elimination of expired data. The Bloom filter does not support deletion operation, so after adding the user’s playback history to the Bloom filter, we store it monthly and set the corresponding expiration time, as shown in Figure 7. Currently, the expiration time is set to 6 months. During data reading, the system reads the data of the last four months according to the current time for deduplication. The data of four months is required because the data of the current month is less than one month. In order to prevent repeated recommendation to users within three months, data of three complete months and the current month should be read.

(Figure 7: Data elimination scheme)

Data is stored monthly, so new data is generated at the beginning of the month. If the expiration time is set to 6 months later, a large amount of new data will be generated at the beginning of the month, and a large amount of old data will be discarded, which will put pressure on the database system. Therefore, we split the expiration time. First, we randomly set the expiration time to any day in the next six months. Second, we set the expiration time to idle hours, such as 00:00~05:00, so as to reduce the pressure on the system during database clearing.

3.5 Solution Summary

By integrating the above three design schemes of traffic aggregation, data sharding and data elimination, the overall design scheme is shown in FIG. 8. Buried point data is played from left to right from Kafka, the data source, to Redis for temporary storage, and finally to KV disk for persistence.

(Figure 8: Overall scheme flow)

First, after listening to the data from the Kafka playback buried point, we append the video to the user’s corresponding playback history based on the user ID, determine the corresponding time loop based on the current time and the Hash value of the user ID, and save the user ID to the user list corresponding to the time loop. Then, each distributed timing task fragment obtains the player user data fragment of the last time ring, and then obtains the player record of the user and updates it to the read Bloom filter. Finally, the bloom filter is serialized and written into the disk KV.

4. Data migration

In order to realize the migration from the current Redis ZSet-based de-smoothing to the bloomed filter based de-smoothing, we need to migrate the playback records generated by users before the unified de-smoothing service goes online, so as to ensure that user experience is not affected. We designed and tried two schemes, and formed the final scheme after comparison and improvement.

We have achieved our batch records to show the original disk KV bloom filter is generated to store data, as a result, the migration scheme only needs to be thinking of the historical data will be stored in the original Redis (produce) before going to heavy service launched migrated to the new Redis, next to a regular task, project as shown in figure 9. The incremental data generated after the unified deduplication service goes online is written by listening to the broadcast burying point. The old and new data are written in pairs so that they can be degraded if necessary.

(Figure 9: Migration Scheme 1)

However, we ignore two problems: first, the new Redis is only used for temporary storage, so the capacity is much smaller than the old Redis, so the data cannot be migrated at one time, so it needs to be migrated in batches; Second, the storage format after migrating to the new Redis is different from that of the old Redis. Besides playing the video list, it also needs to play the user list. After consulting the DBA, I learned that such migration is difficult to achieve.

Since the migration data more troublesome, we will consider to migrate data, determine whether the user when to heavy migrated, if not read at the same time together with a copy of the old data used to filter, and trigger the user’s old data migration to new Redis (including writing user list), three months later, the old data have been overdue, The data migration is now complete, as shown in Figure 10. This migration solution solves the problem that the new and old Redis data format is not consistent with the migration of difficult, and is triggered by the user request migration, but also avoids the one-time migration of data on the new Redis capacity requirements, but also can do accurate migration, only migration within three months of the need to migrate data users.

(Figure 10: Migration Scheme 2)

So, we carried out in accordance with the second scheme for data migration, at the time of on-line testing, it needs to be found due to user requests for the first time to migrate old data, cause to take heavy interface is not stable, and video to heavy as an important link, the video is recommended to take more sensitive, so they have to keep thinking about the new migration project. We have noticed that at the time of mass production bloom filter regularly, read the rings corresponding to the time after the user list, according to the user ID to obtain a list broadcast video, then generate bloom filter is saved to disk KV, at this point, we only need to add a user history records can be read from the old Redis migrated the history data. To trigger the process of generating a bloom filter for a user’s playback record, we need to save the user ID to the corresponding player list on the time loop, and the final scheme is shown in Figure 11.

(Figure 11: Final migration scheme)

First of all, DBA helped us to scan out all the keys (containing user IDS) of playback records in old Redis and export them through files. Then, we import the exported file into Kafka through the big data platform, enable consumers to listen and consume the data in the file, parse it and write it to the list of players corresponding to the current time loop. Next, after reading a user in the player list, if the user does not migrate data, the distributed batch task will read the historical player record from the old Redis and update it to the Bloom filter together with the new player record and store it to the disk KV.

Five, the summary

This paper mainly introduces the design and thinking of constructing recommendation de-repetition service for short video based on Bloom filter, and designs and optimises the scheme step by step from the problem, striving to be simple, perfect and elegant, hoping to be of reference value to readers. Due to the limited space of the article, some aspects are not covered, and many technical details are not elaborated in detail. If you have any questions, please continue to communicate.

Author: Zhang Wei, Vivo Internet Server Team