preface

One-point machine learning platform mainly serves information flow recommendation system, and model training of part of the business is undertaken by deep learning framework TensorFlow. In our practice, we found that native TensorFlow has some shortcomings and can be optimized for large-scale sparse model scenarios, such as

  1. The synchronization and on-line speed of parameters in large-scale sparse model is slow, and the training effect cannot be applied to inference service in time

  2. Sparse feature Embedding occupies large memory in single machine prediction, which is difficult to support large-scale model parameters

To solve the above problems, we customized the optimization based on our understanding of the TensorFlow framework.

Problem analysis

Save the parameters of model training

The original parameter saving scheme of TensorFlow is to save the parameters as files and store them on disk during training for use in prediction. There are a lot of high-dimensional discrete sparse features in the recommendatory system, and the parameters have the following characteristics in the model training stage

The updating of the recommendation model is sparse, that is, the parameters updated by training only account for a small part of the total amount in a period of time. For each Embedding parameter, the parameters only change gradient at part of the matrix, and the following optimization measures can be taken to update the data IO with repeated Embedding parameters

When saving the model parameters, only the parameters with gradient changes can be saved because only some parameters have changed. Compared with the invariable parameters in the previous training iteration, there is no need for repeated saving when the incremental data is updated. When the Embedding parameters are saved, the gradient changes only occur in the local part of the matrix due to the sparse feature of the parameter matrix. Similarly, preserved in incremental coverage, constant part is not necessary to save again, only a part of the gradient changes need to be updated Through the way of incremental saving data, to process the data gathered in the change, under the mass parameters of sparse characteristics, can greatly reduce the required synchronization of data, the synchronization of the decrease of the amount of data can be brought to promote the efficiency of data synchronization.

In addition, from the perspective of engineering implementation, synchronization efficiency can be further improved by reducing IO interaction for batch processing of synchronous data.

Model synchronization

The original on-line process of model parameters of TensorFlow needs to save the parameters in a file during training, and then transfer the file to the prediction server, which will load and use it. The process is long, file transfer is slow, and update is not timely.

If the parameters saved during model training are shared with the prediction service, the time consumption of parameter synchronization can be almost completely saved. Especially for data transmission of large scale parameters, saving synchronization time brings greater efficiency improvement.

Of course, for the sake of data isolation, this approach also requires support such as versioning of model parameters.

Model USES

Memory bottlenecks are a big challenge in large-scale model reasoning scenarios. In distributed training mode, parameters are distributed in PS cluster, which can relieve the storage pressure of parameter data in memory. However, in the face of high-request QPS, reasoning service lacks a good support scheme. Even if the same distributed scheme as distributed training is adopted, the transmission of parameters between clusters will bring high network bandwidth pressure and response delay.

If we can simplify the loading of data and reduce the memory requirement of prediction service, we can better solve the problem of memory bottleneck. Because of the sparse characteristics of some parameters, we can think of the idea of data fragmentation, and use multiple local data to represent the complete effective data.

At the same time, the part actually used by the sparse parameter is usually only part of the whole parameter, so it can be considered to load only part of the data of the parameter, so as to reduce the requirement on memory storage space.

Strengthen the custom

After the above analysis, the problems and solutions are clear. We hope to enhance the following functions of TensorFlow

In the training stage, parameters can be saved to the specified storage system in time. Here, we use the company-level KV database Morpheus to save only the changed part of parameters. In addition, each Embedding parameter only saves the updated part of dimension prediction and only loads the valid data that the parameters are used in order to achieve the above goal, TensorFlow needs to be customized. So how do we go about customizing, let’s look at TensorFlow design

Understand the TensorFlow calculation diagram and operation OP

TensorFlow uses directed graphs of “nodes” and “edges” to describe mathematical calculations. “Nodes” are commonly used to indicate mathematical operations imposed, but can also indicate the beginning of a feed in/the end of a push out or the end of a read/write in a persistent variable.

Edge indicates the input/output relationship between nodes. These edges can transport size-adjustable multidimensional arrays, called tensors. The fact that tensors flow through a graph is the reason the tool is called “TensorFlow”. Once all the tensors on the input side are ready, the nodes are allocated to various computing devices to perform operations asynchronously and in parallel.

In short, the structure of the computational graph is determined by the algorithmic structure of the model, and the operation on the data is operation (OP). When the model structure is defined, our enhancements require customization of the OP.

So which op needs to be enhanced, let’s first look at a simple graph with a simple code example to understand the key process of a data flow graph calculation process. For example,

import tensorflow as tf from tensorflow.core.protobuf import saver_pb2 from tensorflow.python.ops import variables from Random (size=(2,4)) b_matrix = random. Random (size=(2,4)) print("a_matrix=", a_matrix) print("b_matrix=", b_matrix) a = tf.Variable(a_matrix, dtype=tf.float32, name="a") b = tf.Variable(b_matrix, dtype=tf.float32, name="b") res_a = tf.nn.embedding_lookup(a, [0, 0], name="lookup_a") res_b = tf.nn.embedding_lookup(b, [1, 1], name="lookup_b") y = tf.add(res_a, res_b) saver = tf.train.Saver(variables._all_saveable_objects(), sharded=True, write_version=saver_pb2.SaverDef.V2, allow_empty=True) meta_graph_def = saver.export_meta_graph(as_text=True, clear_devices=True, strip_default_attrs=True) with open("./meta_graph_def.pbtxt", "w") as f: f.write(str(meta_graph_def)) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) ## tensorboard --logdir=tensorboard/test0/ writer = tf.summary.FileWriter("./tensorboard/test0/", sess.graph) print("res_a=", sess.run(res_a)) print("res_b=", sess.run(res_b)) print("y=", sess.run(y)) writer.close() if __name__ == '__main__': print("hello world")Copy the code

The computation diagram implements a simple calculation, defining two variables, each executing the embedding_lookup, and then summing the results of the query. The graph is saved in the code, as well as the data required by tensorBoard for further analysis.

Using TensorBoard to open the file save path, we can see a visual representation of this calculation diagram. By viewing the compute nodes separately, you can observe the input and output relationship of the OP.

Custom OP with custom computations

TensorFlow provides extended capabilities for customizing OP operations, considering that the official library OP does not cover user needs. Once you know how to define, save, collect, and so on, you can make customized changes. By writing a custom OP, so that when the data is saved, according to the designed format saved to our specified database. When the data is loaded, it is deserialized from the database and assembled into the required parameters, bypassing the TensorFlow framework’s method of loading parameters from files. The following lists some of the design functions of the custom OP for reference

Once the OP is customized, how do YOU replace the native OP in the model computation diagram? When TensorFlow saves the model, it generates a meta_graph_def file, which describes the structural relationship of the calculation graph in a JSON-like format. When this file is loaded, TensorFlow builds a calculation diagram based on the structural information described in the file.

We can modify the meta_graph_def file saved by the model, replace the op in it with our custom op, and modify the input and output relationships of each node to change the dependency between op. Then the modified meta_graph_def file was used to load back the calculation graph of the model, that is, the structure of the original calculation graph was modified. The following is a partial example of modifying the meta_graph_def file.

Node {name: "a/read" op: "InferenceIdentity" #### The original op is the original Identity, which is replaced by the custom op input: "a" attr {key: "T" value { type: DT_FLOAT } } ... }Copy the code

After reviewing the scheme for writing a custom OP and modifying meta_graph_def, you are basically ready to enhance your customization.

Incremental update of parameters

Generally, the training process uses the back-propagation of minimize function to update parameters and converge loss. It’s a two-step process inside the minimize function

In the first step, the compute_gradients function returns the updated gradient and the parameter grads_and_vars

In the second step, the apply_gradients function updates the parameters by applying parameters and gradients to the existing parameters

In the first step, the gradients and parameters that need to be updated this time are obtained, where the gradient of the Embedding parameter contains the IndexedSlices that change in each tensor. IndexedSlices are described in part in the official documentation.

“An IndexedSlices is typically used to represent a subset of a larger tensor dense of shape [LARGE0, D1.. DN] where LARGE0 >> D0. The values in indices are the indices in the first dimensIOn of the slices that have been Extracted from the larger tensor.”

You can think of it as a kind of SparseTensor idea, representing a large tensor in terms of element data and element location. You slice the tensor into the first dimension, and then translate a large tensor of [LARGE0, D1,.. DN] into a bunch of smaller tensor of [D1,.. DN].

With IndexedSlices, the tensor changes corresponding to the gradient changes can be obtained. When parameters are synchronized, slices sliced by IndexedSlices can be saved to the database as needed. For example, a tensor of [M, N, L, K] can be saved as m KV numbers of [N, L, K] by the number of slices. Key is a unique name for tensor_name and the sequence number combination of m dimensions.

By dividing the Optimizer into two steps, RecordIndicesOp records the indices of gradient updated parameters and their gradient IndexedSlices during each iteration of the training round. Then you can keep track of the parameters that are updated and the parts of the parameters that are changed.

After several rounds of training iterations, MorpheusWriteOp wrote the mapping data of tracking records to KV storage according to Indices, and then synchronized the change parameters during this period to the database. After parameters are written, ClearIndicesOp clears recorded INDEX data to start the next incremental update cycle.

As shown below, starting from m round training iteration, sparse parameter A was locally updated value, recorder recorded the coordinates of the first dimension being updated. After n rounds of iteration, combined with the current A parameter and the changed first dimension coordinates recorded in record, the changed part of the parameter is taken out and the data is written into KV database. Then clear recorder records, start the next round of iteration.

Model loading

According to the previous process, we have got the customized model GRAPH and the parameter data saved in KV database. In the inference service, the calculation graph metra_graph is loaded natively. The loading of the parameter part is handled by the custom OP mentioned earlier, which is described here.

After loading the graph structure, specify a FAKE file path to load the parameter data. Note that we have already modified the RestoreV2 op used to load the data, so no parameters are loaded here, and the fake file is only loaded for initialization to satisfy the framework flow.

So how is the parameter data stored in the KV database loaded? TensorFlow’s Graph automatically runs the required dependencies during execution, and when the session is run for the first time the data flows through the custom OP, triggering the custom part of the operation. Therefore, after loading the model diagram structure and the parameter file of FAKE, it is necessary to perform the inference again to complete the replacement of fake parameters. The substitution process is to use InferenceIdentityOp to get the data from the database and then assemble the necessary tensor according to shape, then replace the fake tensor and put it into the model. There are some special optimizations for Embedding data loading, which are described in the next section.

Data usage for inference services

For the prediction service, it is generally necessary to complete the loading of model and data before inference. Non-embedding data is generally small and does not pose a challenge to engineering practice. However, the size of the Embedding data is related to the word list and dimension, and the [vocabulary_size, Embedding_dimension] is usually large. For recommendation services, the characteristic layer parameters are sparse. Loading a full Embedding vector is a waste of memory. Moreover, when the word list is large, the large scale parameters are too difficult for the stand-alone memory to bear and cannot be loaded, which presents a large engineering challenge.

Customized processing for large-scale Embedding parameters

In practice, the embedding_LOOKUP parameter looks up some data from the full parameter, as well as the embedding_sparse_lookup parameter. Instead of loading the entire Embedding parameter, it is possible to load only some of the parts needed for the full Embedding parameter.

The section “Parameter incremental update” introduces the parameter saving scheme for gradient changes. As a subset of iterative parameters in training, Embedding data is also saved using the same data slicing scheme. As mentioned in the “Custom OP” section, the embedding_Lookup operation is actually used in the prediction service using the custom InferenceGatherV2Op. The following examples illustrate the complete flow of Embedding data from training to inference.

For example, define in training

a = tf.Variable(a_matrix, dtype=tf.float32, name=”a”)

Used in inference

b = tf.nn.embedding_lookup(a, [0, 1, 3], name=”b”)

The flow process of parameter A and parameter B is shown in the figure below

During the training process, parameter a (shape [4,4]) underwent gradient changes, which were processed by InferenceRecordOp, and the data were stored in KV database after IndexedSlices with gradient changes. The part of line no. 0 and 2 of parameter A is saved as two KV data of “A :0” = [1, 0, 0, 0] and “A :2” = [0, 1, 0, 0] respectively, and the rest part of line no. 1 and 3 is not saved in the database.

During the initialization of model on-line, a parameter A was not loaded, but a fake data A ‘(Shape [1]) was loaded to ensure that the model graph could be loaded normally.

When embedding_lookup (a, [0, 1, 3]) is needed for parameter A in the implementation of inference InferenceGatherV2Op will try to load three data with keys “A :0”, “A :1”, and “A :3” from the KV database. In fact, only data a:0 can be obtained, and the obtained data is assembled into data b (shape [3,4]) according to the assembly rules to complete the embedding_lookup operation.

It can be seen that parameter A in the whole process is not completely loaded into the prediction service, but it does not affect the use of parameter A in the reasoning process.

Optimization of data synchronization

In the above design, KV database serves as the parameter server in the prediction process, and Morpheus, the KV system of the company, is used as the bottom support, which reduces the data synchronization process and improves the efficiency of training data on-line. In order to reduce the read and write pressure on KV database, the parameter data other than Embedding is cached locally in the prediction service. However, due to the existence of the local cache, the client’s data pull request is masked by the cache and cannot obtain the latest parameters in a timely manner. So in this case, the Morpheus client provides the ability to listen for data updates by subscribing to message queues. When Morpheus Server data is updated, the client will also sense the changed key, and then pull the data to the local cache for update, which makes the parameter update in near real time.

Summary and Prospect

Based on the ideas of data fragmentation and lazy data loading, the above attempts to enhance the customization of TensorFlow framework from the aspects of parameter synchronization and data use, realizing the quasi-real-time update of model training parameters. Compared with the original hour-level model parameter update, the recommendation system index has achieved significant improvement.

However, with the increasing requirements of recommendation algorithms on data dimension, thesaurus size and application efficiency, we still face many challenges. We will continue to explore and experiment with TensorFlow engineering applications. If you are also interested in TensorFlow, please leave your comments or suggestions in the comments section. Thank you very much.

This article is from the little Information machine learning platform team