Wang Hui joined Qunar in 2017. At present, I am in charge of the risk control business related to anti-crawler, covering a wide range of technical fields, and I am continuously exploring in the direction of intelligent practice of risk control.

A preface.

In Qunar intelligent risk control scenario, the risk control R&D team often applies some algorithm models to solve problems in complex scenarios. Typical such as neural network model, decision tree model and so on. In order to complete the whole process from training to deployment and prediction of the model, in addition to the model algorithm, cannot do without the support of the technical framework. In this article, I will share my practical experience in building a distributed machine learning computing framework based on Tensorflow for Java and Spark-Scala during the predictive service deployment phase. Mainly around the following points:

  • Tensorflow for Java & Spark-Scala
  • Frame selection and application scenario thinking
  • How to use Spark-Scala integration with Tensorflow for Java to build prediction services?
  • Optimization in the framework practice process and experience in stepping pit

Ii. Frame selection

2.1 Project Scenario

The background of the project is to build a neural network model after collecting user risk control data collected by the mobile phone client to analyze and predict user risks offline.

The amount of data collected by the client is large. At present, the amount of hourly characteristic data is about 300W, which will be expanded to tens of millions in the future. At the same time, we want to perform the prediction task every hour, so we need to ensure that a calculation can be completed within the hour, otherwise there will be a backlog of tasks. Therefore, even offline computing, which has low real-time requirements, still requires high-performance processing. In this context, we expect a distributed machine learning computing framework based on big data to improve the efficiency of model prediction.

2.2 Frame selection

In the selection of distributed machine learning computing framework, one is the distributed capability supported by machine learning framework itself. For example, Tensorflow and Pytorch two mainstream frameworks have supported distributed computing. The other is the combination of big data distributed framework and machine learning framework, which integrates machine learning framework API through Spark, Flink and other big data frameworks.

At present, although the machine learning framework gradually has the distributed capability, the main purpose is to solve the problem of model training performance under the scenario of large data volume and multiple model parameters. Compared with the traditional big data distributed framework, the advantages are the support for distributed training and the distributed scheme for machine learning scenarios. The disadvantage is that it does not provide a good solution for data merging and multi-level stratification. Secondly, it needs to build and maintain clusters independently, which increases the operation and maintenance cost. In the distributed prediction scenario, a scheme combining big data framework and machine learning framework is adopted.

Big data frameworks usually fall into two categories: stream processing and batch processing. Streaming processing is suitable for real-time and quasi-real-time computing, and Flink, Storm and Spark-streaming belong to the framework of Streaming processing. Batch processing is applicable to offline computing. Our scenario is a typical offline batch processing scenario. The mainstream batch processing framework currently includes Spark and Hadoop’s MapReduce. The framework implementation comparison shows that although Spark consumes more resources based on memory computing, it can greatly improve the performance.

In the field of machine learning frameworks, TensorFlow and PyTorch are the two most widely used frameworks in industry and academia respectively. TensorFlow is an open source deep learning framework created by developers at Google and released in 2015. PyTorch is one of the latest deep learning frameworks, developed by a team at Facebook and made open source on GitHub in 2017.

By comparing Tensorflow and Pytorch frameworks from multiple dimensions, we can find that Pytorch has advantages such as better API, faster working, dynamic graph, easy debugging, etc., which is suitable for researchers to build models quickly and has fast iteration speed. Tensorflow has advantages in terms of multilanguage support, cross-platform capability, performance, and production deployment, and is suitable for production deployment. This is why Pytorch is more popular in academia, while Tensorflow remains dominant in industry. Both Tensorflow and Pytorch are currently trying to move closer to each other’s strengths, but that’s not likely to change anytime soon.

Based on the above analysis and combined with the actual application scenarios, the algorithm structure of the risk control model is not very complex and can be quickly constructed under both Tensorflow and Pytorch. Therefore, considering the difficulty of model deployment in production environment and the requirements of cross-platform deployment, we choose Tensorflow to build the model. \

2.3 Why choose Tensorflow for Java & Spark-Scala? *

Among the integration schemes of Spark and Tensorflow, one is to use PySpark+Tensorflow for Python in the Python environment, which is also the most practical schemes at present. The other is Spark-Scala+Tensorflow for Java in Java/Scala environment. We choose the second option mainly for performance considerations. The performance differences of the two frameworks will be compared below:

2.3.1 Introduction to Tensorflow for Java and Spark-Scala

TensorFlow can be installed and run in a variety of client languages, although Python remains the only well-supported language. Tensorflow for Java is the API provided by Tensorflow for Java programs that are suitable for loading models created in Python and executing them in Java applications. Spark-scala is the Spark task development tool in the Scala language. Because the Scala language also runs on the JVM and is compatible with the Java language, it can be integrated with Java applications. The Spark framework is also developed in Scala because Scala uses functional programming to excel in parallel and concurrent computing programming.

2.3.2 Performance Comparison

As mentioned above, the purpose of choosing Tensorflow for Java & Spark-Scala is to meet the requirements of high performance scenarios. The performance differences between the two schemes are analyzed below. The runtime architecture of Spark is as follows. The Spark application runs on the Driver, is scheduled and encapsulated by Spark into tasks, and then sends information about these tasks to executors for execution. The Scala version of Spark runs under this native architecture. \

The runtime structure of PySpark is as follows. In order not to break the existing runtime architecture of Spark, Spark wraps a Layer of Python API around it and uses Py4j to implement the interaction between Python and Java. In this way, Spark applications can be written in Python.

The performance of PySpark is weaker than that of Spark-Scala for two reasons:

  • First, there is an extra layer of Python-to-Java conversion on the Driver side.
  • Second, the Executor side starts a separate Python process for each Task in order to run user-defined Python functions or Lambda expressions. Sends Python functions or Lambda expressions to a Python process for execution through socket communication.

Compare Tensorflow for Python with Tensorflow for Java. At the bottom of both are Tensorflow’s C++ libraries, and there is little difference in performance. The Java language is faster than Python at the top level, making a difference for scenarios where a lot of preprocessing is required before invoking the Tensorflow API.

In conclusion, scheme 2 is superior to scheme 1 in performance.

Of course, Option 1 has its advantages in other areas, such as development efficiency, easy integration (no cross-platform), API support, and so on.

2.4 summary

Summary of this section:

  • It is a relatively feasible and effective solution to realize distributed machine learning prediction based on Spark big data framework and Tensorflow machine learning framework.
  • The integration of Spark and Tensorflow based on Tensorflow for Java and Spark-Scala can bring higher performance.
  • The current scheme is suitable for the offline prediction scenario of high performance distributed machine learning model under big data.

3. Application practice

Project application process in practice: firstly, the model is trained based on sample data and the model file is generated. After that, the characteristic data is read in Spark, and the Tensorflow Java API is invoked to load the model, perform prediction, and obtain the result set.

3.1 Model training

3.1.1 Training model file

Model training in our project is based on Python+Tensorflow+Keras implementation. Here, CNN classification of MNIST data set is taken as an example to demonstrate the model training code. After training, protobuf model files are saved. Protobuf format files can be loaded across platforms.

import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense,Dropout,Convolution2D,MaxPooling2D,Flatten from tensorflow.keras.optimizers import Adam def train_model(): # Load training set and test set data, Mnist = tf.keras.datasets. Mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() y_train = tf.keras.utils.to_categorical(y_train,num_classes=10) y_test = Tf. Keras. Utils. To_categorical (y_test, num_classes = 10) # define model = Sequential order model (#) convolution, pooling layer, flat, full connection model.add(Convolution2D(input_shape=(28, 28, 1), filters=32, kernel_size=5, strides=1, padding='same', activation='relu')) model.add(MaxPooling2D(pool_size=2, strides=2, padding = 'same')) model.add(Convolution2D(64, 5, strides=1, padding='same', Activation ='relu') model.add(MaxPooling2D(2,2,'same')) model.add(Flatten()) model.add(Dense(1024,activation ='relu')) Model.add (Dense(10,activation='softmax')) Adam = Adam(lr=1e-4) model.compile(optimizer= Adam,loss='categorical_crossentropy',metrics=['accuracy']) # Fit (x_train,y_train,batch_size=64,epochs=10,validation_data=(x_test, Save ('./model/model_v1', save_format="tf")Copy the code

3.1.2 Viewing model Files

Go to the model file directory and run the following command to display the model file information. The information circled in red from top to bottom is the label of the model, the signature, the input tensor, the output tensor, and the name of the prediction method. This information is used later to load the model predictions.

saved_model_cli show --dir ./model_v1/ --all
Copy the code

3.2 Model Prediction

3.2.1 Project construction & Introduction of framework

Create a Scala project and introduce Spark and Tensorflow dependencies

<! -- scala --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <! -- spark hadoop --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency> <! -- tensorflow --> <dependency> <groupId>org.tensorflow</groupId> <artifactId>tensorflow</artifactId> The < version > 1.15.0 < / version > < / dependency >Copy the code

3.2.2 Model file loading

Call the Tensorflow API to load the pre-trained Protobuff format model file and get the SavedModelBundle model object. The model file can be stored in the project resource directory and then loaded from the resource directory (Tensorflow does not support recording the model directly from the HDFS, we will see how to do this later).

package com.tfspark import org.apache.spark.sql.SparkSession import org.tensorflow.SavedModelBundle import {tensorFlow => tf} object ModelLoader {//modelPath is the path of the model in the resource, modelTag is obtained from the model file information def loadModelFromLocal(spark: SparkSession, modelPath: String, modelTag: String): SavedModelBundle = { val bundle = tf.SavedModelBundle.load(modelPath, modelTag) } }Copy the code

3.2.3 Calling the Tensorflow API for prediction

In the Java version of Tensorflow, the pattern is similar to the static graph in Tensorflow1.0. You need to establish a session, specify the characteristic data of the feed and the predicted results of the fetch, and then execute the run method. The information obtained by viewing the model file is passed in here as a parameter.

package com.tfspark.tensorflow import com.qunar.rdc.util.TfUtil import org.tensorflow.SavedModelBundle import scala.collection.mutable.WrappedArray import org.{tensorflow => tf} object TensorFlowCnnProcessor { def predict(broads: SavedModelBundle, features: WrappedArray[WrappedArray[WrappedArray[Float]]]): Int = {val sess = bundle.session(); val x = tf.Tensor. Create (Array(feature.map (a => a.tensor (b => a.tensor)); B. toArray).toarray).toarray)) // To perform the prediction, you need to pass in the name of the input tensor and the name of the output tensor in the model information, Val y = ses.runner ().feed(" serving_default_hmc_INPUT :0", X). The fetch (" StatefulPartitionedCall: 0 "). The run () get (0) / / the result is 1 x2 two-dimensional array val result = Array. OfDim [Float](y.shape()(0).toint,y.shape()(1).toint) y.popyto (result) Array. OfDim [Float](y.shape()(0).toint,y.shape()(1).toint) y.popyto (result) Tfutil.argmaxonedim (result(0))}}Copy the code
3.2.4 Spark combined with Tensorflow prediction

Spark reads the forecast data from Hive, converts it into characteristic data after preprocessing, and invoks the Tensorflow API for prediction. The Tensorflow API is used in conjunction with Spark distributed data sets to integrate the distributed batch processing framework with machine learning.

Val sensorPredict = udF ((features: WrappedArray[WrappedArray[WrappedArray[Float]]]) => {predict(bundle, Val resultDf = featuredf. withColumn("predict_result", sensorPredict(col("feature"))Copy the code

3.3 Service Deployment

3.3.1 Environment Dependencies

For projects that integrate Spark-Scala with Tensorflow for Java, use Maven to print the dependency package tfspark-1.0.0-jar-with-dependencies.jar.

Run the JAR package on the Hadoop cluster where spark is deployed. Big data components such as Spark, Hadoop, and Hive have been installed in the dependent cluster environment.

3.3.2 Executing scripts

Spark-submit Executes the JAR package, specifies the main function class to be executed, com.tfspark.predictmain, specifies the path of the JAR package, sets the number of executors and cores to execute the task, and memory parameters, and passes in model file version parameters.

Sudo -u root /usr/local/apache-spark /2.4.3/bin/spark-submit --class com.tfspark.predictmain --master yarn --deploy-mode client --driver-memory 6g --executor-memory 6g --num-executors 5 --executor-cores 4 / TMP/tfspark - 1.0.0 - jar - with - dependencies. Jar model_v1Copy the code

3.4 Practical Results

Complete the integration of Tensorflow for Java and Spark-Scala, and realize the combination of big data distributed batch processing framework and machine learning. The model files generated in Python environment are loaded and applied to Java platform to achieve cross-platform application effect of machine learning model. Successfully applied to online projects, 300W data model prediction was completed per hour with a task time of 9m and a throughput of 5500+/ s. To achieve high performance offline model prediction in big data scenarios and open up the whole application process.

Iv. Optimization & stepping pit experience

Duration performance optimization

In Section 3.2.4, we demonstrated the general operation process of Spark calling the Tensorflow API in DataFrame. When our project was launched according to the above implementation method, the execution time of 300W data was about 20m. After analysis, it is concluded that there is room for performance optimization.

  • Problem: Each piece of data calls the model prediction method once, resulting in reusable objects being created multiple times and the same method flow being called multiple times.
  • Optimization idea: data batch call prediction method. Reduce repetitive object creation and method process execution.
  • Solution: Use the mapPartition operator to replace the Map operator in THE RDD mode, obtain the feature array, and invoke it in batches.

Compare the implementation of mapPartition operator and map operator:

Both are iterators that operate on the partition. The map operator retrives each element from the iterator and calls the operation function. The function takes the element type. MapPartition passes iterators directly to the operator, which takes the iterator type of the set of elements. So the difference is, mapPartition operates on all partition elements in one method, calls the operation function once; Map can only operate on one element at a time, and the operation function can be called multiple times.

Therefore, compared with MAP, mapPartition is more suitable for scenarios where duplicate objects are created or processes are invoked, which improves performance and efficiency. The prominent disadvantage of mapPartition is that it may lead to OOM, because loading multiple elements at a time occupies more memory than loading one element at a time for map, and it cannot be garbage collected in time. The Tensorflow API supports incoming array batch calls. By using mapPartition to convert iterators into arrays, you can batch predict and improve efficiency.

float[][] matrix = new float[m][n];
Tensor<Float> ft = Tensor.create(matrix, Float.class);
val y =  sess.runner().feed("serving_default_hmc_input:0", ft).fetch("StatefulPartitionedCall:0").run().get(0)
Copy the code

Results: After applying mapPartition operator in RDD mode to realize batch prediction, the task duration decreased significantly from 20m to 9m.

Model file hot update

  • Problem: As mentioned above, the model files are stored in the project Resource directory. In the case that the model structure remains unchanged, it is not easy to update the model files in this way, which requires redeployment of services.
  • Solution: Improve the storage mode and store the model files in the HDFS. Model data is obtained from HDFS each time. Tensorflow itself does not provide an API for directly loading the model from THE HDFS. However, you can use Spark to read the model from the HDFS to the local computer and then load the model from the local computer. In this way, if the model structure remains unchanged, you only need to upload a new model file and overwrite the original HDFS file or upgrade the version number.
*//modelPath is the path of the model in HDFS, ModelTag obtained from the model file information * spark sparkContext. AddFile (modelPath, true) val localPath = SparkFiles.get(modelPath) tf.SavedModelBundle.load(localPath, modelTag)Copy the code

The Tensorflow library file could not be found

Error: The libtensorflow_jni.so file cannot be found while executing a prediction task on the Hadoop cluster.

Exception in thread "main" java.lang.UnsatisfiedLinkError: /tmp/tensorflow_native_libraries-1613705012956-0/libtensorflow_jni.so: libtensorflow_framework.so.1: cannot open shared object file: No such file or directory at java.lang.ClassLoader$NativeLibrary.load(Native Method) at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) at  java.lang.Runtime.load0(Runtime.java:809) at java.lang.System.load(System.java:1086)Copy the code

Cause analysis: According to the error log, it is concluded that Tensorflow depends on THE C environment and has requirements on the version of the C basic library. The versions of basic libraries in cluster C are inconsistent. Some versions are too low or too high. As a result, Tensorflow is incompatible and some machine tasks report errors.

  • Solution: Offer two ideas.

    The first option is to fix the cluster environment, but changing the underlying repository can have a significant impact if it is a public cluster.

    The second solution is to use a single physical machine instead of a Hadoop cluster and use Spark Local mode to start multiple executors to execute tasks to ensure environment consistency.

In practice we use the second.

In live.

The purpose of this paper is to share my thinking and experience in the practical application of distributed machine learning computing framework for your reference and communication.

Through the comparison of advantages and disadvantages of different frameworks, and the analysis of the impact of the underlying implementation on performance, the thinking process of selection is expounded. It is clear why Tensorflow for Java & Spark-Scala is suitable for high performance distributed machine learning model prediction scenarios under big data. Combined with practical experience, the overall process of framework application in the project is demonstrated. And summarized in the performance and deployment process optimization process thinking.

Due to the limited level, there are many flaws in the article, also ask you to correct.