Selected from Tensorflow

Heart of the machine compiles

Participation: Huang Yusheng, Huang Xiaotian


This document and accompanying scripts detail how to build a high-performance extensible model for a variety of systems and network topologies. This technique uses some low-level Tensorflow Python primitives in this document. In the future, these technologies will be incorporated into high-level apis.


The input pipe



The performance guide describes how to diagnose potential problems with input pipes and how best to resolve them. We found tF. FIFOQueue and TF. train.queue_runner unable to saturate with the current multiple Gpus when using AlexNet to train ImageNet. This is because Python threads are used as the underlying implementation, and Python threads are too expensive.


Another approach we take in our script is to build the input pipeline in native parallel in Tensorflow. Our approach mainly consists of the following three stages:


  • I/O read: Select and read image files from disk.

  • Image processing: decoding image records into pixels, preprocessing and generating minimum batches.

  • Data transfer from CPU to GPU: Transferring images from CPU to GPU


Using datA_FLOW_ops.stagingArea, the major portions of each phase are executed in parallel with the other phases. StagingArea is a queue-like operator similar to tF.fifoQueue. The difference is that StagingArea provides simpler functionality and can be executed in parallel with other stages on the CPU and GPU. Split the input pipeline into three separate phases of parallel operation, and this is scalable, taking advantage of large multicore environments. The rest of this section details each phase and the use of datA_FLOW_ops.stagingArea.


Parallel I/O reading



Data_flow_ops. RecordInput Used for parallel reads from disks. Given a list of input files representing TFRecords, RecordInput can read records continuously using background threads. These records are placed in a large internal pool, and when the pool is loaded at half its capacity, there is a corresponding tensor output. This operation has its own internal threads, which are dominated by I/O time that consumes the least CPU resources, allowing it to run in parallel with the rest of the model.


Parallel image processing



After the images are read from RecordInput, they are passed as tensors to the image processing pipeline. To make it easier to interpret the image processing pipeline, assume that the goal of the input pipeline is eight Gpus with a batch size of 256 (32 per GPU). The 256 image records are read and processed independently and in parallel. Starting with the 256 RecordInput reads in the figure, each read is followed by a matching image preprocessing operation, which is performed independently and in parallel. These image preprocessing operations include such things as image decoding, distortion and resizing.


As the images pass through the preprocessor, they are joined into eight tensors of size 32. For this purpose, tF.parallel_stack is used instead of tF.concat, and the purpose is implemented as a single operation and all inputs are required to be ready before they can be joined together. Tf.parallel_stack takes uninitialized tensors as output, and when there are tensors input, the tensors for each input are written to the specified part of the output tensor.


When all the tensors have finished input, the output tensors pass through the graph. This effectively hides the memory delay caused by generating the long tails of all input tensors.


Parallel data transfer from CPU to GPU


Continuing with the assumption that the goal is a batch size of 256 (32 gpus per GPU) of 8 Gpus, once the input image has been processed and joined by the CPU, we will have 8 tensors of 32 batch size. Tensorflow allows the tensor of one device to be applied directly to any other device. To make tensors available on any device, Tensorflow inserts implicit copies. Copies are scheduled to run between devices before tensors are actually used. Calculations that require these tensors will come to a halt and degrade performance once the replica is not completed on time.


In this implementation, data_FLOW_ops.stagingArea is used to explicitly schedule parallel copies. The end result is that all tensors are available when the computation on the GPU begins.


Software pipeline



Since all stages can run under different processors, using data_FLOW_ops.stagingArea between them enables them to run in parallel. StagingArea is a queue-like operator similar to TF.fifoQueue, which provides simpler functionality that can be executed on cpus and Gpus.


Before the model starts running all phases, the input pipeline phase is warmed up to place the segmented cache between a set of data. In each run phase, a set of data is read from a segmented buffer at the beginning and pushed at the end.


For example, there are phases A, B, and C, with two segmented regions S1 and S2 in between. During warm-up, we run:


Warm up:

Step 1: A0

Step 2: A1  B0



Actual execution:

Step 3: A2  B1  C0

Step 4: A3  B2  C1

Step 5: A4  B3  C2Copy the code


After preheating, S1 and S2 each have a set of data. For each step that is actually performed, a set of data from the staging area is computed, and a new set of data is added to the staging area.


The benefits of this scheme are:


  • All phases are non-blocking because there is always a set of data in the pre-heated segment area.

  • Each phase can be processed in parallel because they can be started immediately.

  • A segmented cache has a fixed memory overhead and has at most one additional set of data.

  • Running all stages of a step simply calls singlesession.run(), which makes analysis and debugging much easier.


Best practices for building high-performance models



Gathered below are some additional best practices that can improve model performance and increase model flexibility.


Modeling using NHWC and NCHW


The vast majority of Tensorflow operations used by CNN support NHWC and NCHW data formats. On gpus, NCHW is faster; But in cpus, NHWC is only occasionally faster.


Building a model that supports date formats increases its flexibility and works well on any platform. Benchmark scripts are written to support NCHW and NHWC. NCHW is often used when training models with gpus. NHWC is sometimes faster in cpus. NCHW can be used in GPU to train a flexible model, NHWC can be used in CPU for reasoning, and appropriate weight parameters can be obtained from training.


Batch normalization using fusion



The default batch normalization in Tensorflow is implemented as a composite operation, which is a very common practice, but it does not perform well. Converged batch normalization is an alternative that can achieve better performance on gpus. Contrib.layers.batch_norm An example of merging batch normalization with tf.contrib.layers.batch_norm is shown below:


bn = tf.contrib.layers.batch_norm(

          input_layer, fused=True, data_format='NCHW'

          scope=scope)Copy the code


Variable distribution and gradient aggregation



During training, training variable values are updated with aggregated gradients and increments. In the benchmark script, we show that we can build a wide variety of high-performance distribution and aggregation schemes by using the flexible and generic Tensorflow primitives.


Three examples of variable distribution and aggregation are included in the benchmark script:


  • Parameter server, from which each copy of the training model reads variables and updates variables independently. As variables are required for each model, they are copied to a standard implicit copy added by the Tensorflow runtime. The sample script shows how to do local, distributed synchronous, and distributed asynchronous training using this approach.

  • Copy, place an identical copy of each training variable on each GPU, and as soon as the variable data is immediately available, forward and reverse calculations begin immediately. Gradients in all Gpus are accumulated, and the sum of the accumulations is applied to each COPY of the GPU variable to keep it in sync.

  • Distributed replication, where a copy of training parameters in each GPU is placed together with the master copy on the parameter server, and forward and reverse calculations begin as soon as variable data is available. The gradients for each GPU on a server are accumulated, and then the aggregated gradients for each server are applied to the master copy. When all modules have done this, each module updates the variable copy from the master copy.


Following are additional details about each method.


Parameter server variable


The most common way to manage variables in the Tensorflow model is the parameter server pattern.


In a distributed system, each worker process runs the same model, and the parameter server handles its own master copy of variables. When a worker needs a variable from a parameter server, it can reference it directly from it. Tensorflow adds an implicit copy to the graph at run time, which makes variable values available on computing devices that need it. When a gradient is calculated on the worker, it is transmitted to a parameter server that has a specific variable, and the optimizer is used to update the variable.


Here are some techniques to improve throughput:


  • To balance the load, these variables are transferred between parameter servers based on their size.

  • When each worker has multiple Gpus, the gradient of each GPU is added up and the single aggregation gradient is sent to the parameter server. This reduces network bandwidth and reduces the workload of the parameter server.


To coordinate workers, an asynchronous update pattern is often used, in which each worker updates the master copy of a variable rather than synchronizing with the other workers. In our model, we show that it is very easy to introduce synchronization into the worker, so all workers must be updated before the next step can begin.


This parameter server approach can also be applied to local training, where instead of propagating master copies of variables between parameter servers, they are propagated on cpus or distributed on available Gpus.


This architecture has gained widespread adoption in the community due to the simplicity of the setup.


This pattern can also be used in scripts by passing the parameter variable_update=parameter_server.


In parameter server mode with 3 variables, a single worker that reads and updates variables.


Variable replication



In this design, each GPU in the server has its own copy of variables. These values are kept in sync across Gpus by applying a fully aggregated gradient to each GPU copy of the variable.


Because variables and data are prepared at the beginning of the training, the forward calculation of the training can begin immediately. Aggregate the gradients of the various devices to get a fully aggregated gradient and apply that gradient to each local replica.

Gradient aggregation between servers can be achieved in different ways:


  • The sum is summed on a single device (CPU or GPU) using the Tensorflow standard operation, and then copied back to all gpus.

  • Use nvidia NCCL, which is described in the NCCL section below.


Variable replication in distributed training


The above method of variable replication can be extended to distributed training. A similar approach is to completely aggregate gradients in the cluster and apply them to each local copy. This approach may appear in future versions of the script, but the current script takes a different approach. The description is as follows.


In this mode, in addition to each GPU copy of a variable, the master copy is stored in the parameter server. With this replication mode, the local copy of a variable can be trained immediately.


As gradients of weights become available, they are sent back to the parameter server and all local copies are updated:


  1. All gradients of the GPU are aggregated together in the same worker.

  2. The aggregation gradient from each worker is sent to the parameter server with its own variables, where a special optimizer is used to update the master copy of the variables.

  3. Each worker updates the local copy of variables from the master copy. In the example model, this is done in a load with cross copies after waiting for all modules to complete variable updates, and the new variables can only be retrieved after the load has been released by all copies. Once all variables have been copied, this marks the completion of one training step and the beginning of the next.


Although these sound similar to standard usage of parameter servers, performance is better in many cases. This is largely because there is no delay in the computation, and most of the replication delay of the early gradient can be hidden by later layers.


This pattern can be used in scripts by passing the parameter variable_UPDATE =distributed_replicated.



In distributed replication mode with 3 variables, a single worker that reads and updates variables. Each step is numbered and all steps are applied to each variable.


NCCL


To propagate variables and aggregation gradients across different Gpus on the same host, we can use Tensorflow’s default implicit replication mechanism.


However, we can also choose NCCL(tf.contrib.nCCl). NCCL is a library from Nvidia that enables efficient data transfer and aggregation across different Gpus. It assigns a collaborative kernel on each GPU that knows how to best leverage the underlying hardware topology and uses a single SM GPU.


Experiments show that although NCCL usually speeds up data aggregation, it does not necessarily speed up training. Our assumption is that implicit replicas are essentially time-consuming because they replicate the engine on the GPU, as long as its latency can be hidden by the main computation itself. Although NCCL can transfer data faster, it requires an SM and puts more stress on the underlying L2 cache. Our results show that under the condition of 8 Gpus, NCCL shows superior performance; But with fewer Gpus, implicit duplicates usually perform better.


Segmentation variables



We further introduce a segmented variable pattern where we use segmented areas for variable reads and updates. Similar to software pipelining in an input pipeline, this hides delays in data copying. If the calculation takes longer than the time taken to copy and aggregate, then replication itself can be considered non-time-consuming.


The disadvantage of this method is that all the weights come from the previous training steps, so this is a different algorithm from SGD, but it is still possible to improve the convergence by adjusting the learning rate and other hyperparameters.


Script execution


This section lists the core command-line arguments for executing the main script and some basic examples (tf_CNn_benchmarks.


Note: The force_gpu_compatible configuration file used by tf_cnn_benchmarks was introduced after Tensorflow 1.1, and it was not recommended to build from source until 1.2.


Main command line arguments


  • Model: the models used are resnet50, inception3, vgg16, and alexnet.

  • Num_gpus: indicates the number of Gpus used.

  • Data_dir: Path for data processing, if not set, then synthetic data will be used. To use Imagenet data, Can turn these instructions (https://github.com/tensorflow/tensorflow/blob/master/tensorflow_models/inception#getting-started) as a starting point.

  • Batch_size: indicates the batch size of each GPU.

  • Variable_update: Methods for managing variables: parameter_server, replicated, distributed_replicated, independent.

  • Local_parameter_device: specifies the device used by the parameter server: CPU or GPU.


A single instance


# VGG16 training ImageNet with 8 GPUs using arguments that optimize for

# Google Compute Engine.

python tf_cnn_benchmarks.py --local_parameter_device=cpu --num_gpus=8 \

--batch_size=32 --model=vgg16 --data_dir=/home/ubuntu/imagenet/train \

--variable_update=parameter_server --nodistortions



# VGG16 training synthetic ImageNet data with 8 GPUs using arguments that

# optimize for the NVIDIA DGX-1.

python tf_cnn_benchmarks.py --local_parameter_device=gpu --num_gpus=8 \

--batch_size=64 --model=vgg16 --variable_update=replicated --use_nccl=True



# VGG16 training ImageNet data with 8 GPUs using arguments that optimize for

# Amazon EC2.

python tf_cnn_benchmarks.py --local_parameter_device=gpu --num_gpus=8 \

--batch_size=64 --model=vgg16 --variable_update=parameter_server



# ResNet-50 training ImageNet data with 8 GPUs using arguments that optimize for

# Amazon EC2.

python tf_cnn_benchmarks.py --local_parameter_device=gpu --num_gpus=8 \

--batch_size=64 --model=resnet50 --variable_update=replicated --use_nccl=False

Copy the code


Distributed command line arguments


1) PS_hosts: In the <host>:port format (such as 10.0.0.2:50000), a comma-separated list of hosts is used as the parameter server.

2) worker_hosts: (e.g. 10.0.0.2:50001), a comma-separated list of hosts used as workers, in the format of <host>:port.

3) task_index: host index in the ps_host or worker_hosts list that is being started.

4) job_name: job category, such as ps or worker.


Distributed instance


Here is an example of training RESNET-50 on two hosts (Host_0 (10.0.0.1) and HOST_1 (10.0.0.2)). This example uses composite data. Pass the datA_DIR parameter if you want to use real data.


# Run the following commands on host_0 (10.0.0.1):

python tf_cnn_benchmarks.py --local_parameter_device=gpu --num_gpus=8 \

--batch_size=64 --model=resnet50 --variable_update=distributed_replicated \

- job_name = worker - ps_hosts = 10.0.0.1:50000,10.0. 0.2: \ 50000

- worker_hosts = 10.0.0.1:50001,10.0. 0.2:50001 - task_index = 0



python tf_cnn_benchmarks.py --local_parameter_device=gpu --num_gpus=8 \

--batch_size=64 --model=resnet50 --variable_update=distributed_replicated \

- job_name = ps - ps_hosts = 10.0.0.1:50000,10.0. 0.2: \ 50000

- worker_hosts = 10.0.0.1:50001,10.0. 0.2:50001 - task_index = 0



# Run the following commands on host_1 (10.0.0.2):

python tf_cnn_benchmarks.py --local_parameter_device=gpu --num_gpus=8 \

--batch_size=64 --model=resnet50 --variable_update=distributed_replicated \

- job_name = worker - ps_hosts = 10.0.0.1:50000,10.0. 0.2: \ 50000

- worker_hosts = 10.0.0.1:50001,10.0. 0.2:50001 - task_index = 1



python tf_cnn_benchmarks.py --local_parameter_device=gpu --num_gpus=8 \

--batch_size=64 --model=resnet50 --variable_update=distributed_replicated \

- job_name = ps - ps_hosts = 10.0.0.1:50000,10.0. 0.2: \ 50000

- worker_hosts = 10.0.0.1:50001,10.0. 0.2:50001 - task_index = 1

Copy the code


The original link: https://www.tensorflow.org/performance/performance_models?from=singlemessage&isappinstalled=0



Register for the Heart of The Machine GMIS 2017 ↓↓↓