When the data is large or the model is large, distributed training with multiple Gpus is generally adopted to improve the training efficiency of machine learning model.

In terms of parallel mode, distributed training is generally divided into data parallelism and model parallelism. Model parallelism: different Gpus in distributed system are responsible for different parts of network model. For example, different network layers of the neural network model are assigned to different Gpus, or different parameters within the same layer are assigned to different Gpus.

Data parallelism: Different Gpus have multiple copies of the same model, each GPU is assigned different data, and then the results of all gpus are combined in some way.

Note that the above non-Gpus can be multiple Gpus on the same machine or non-Gpus.

There are also mixed patterns of data parallelism and model parallelism.

Due to certain dependence on each part of the model parallel, the scale scalability is poor (meaning that the number of Gpus cannot be increased at will), which is seldom used in practical training. While data parallel, each part is independent, scale scalability is good, more commonly used in practical training, speed up the effect is better.

Data parallelism involves the synchronization of model parameters between gpus, which is generally divided into synchronous update and asynchronous update. Synchronous update should wait until the gradient calculation of all Gpus is completed, and then uniformly calculate the new weight, and then all Gpus synchronize the new value before the next round of calculation. Asynchronous update: After the gradient calculation of each GPU is completed, there is no need to wait for the gradient calculation of other Gpus (sometimes the number of gradients to wait can be set). The overall weight can be updated immediately, and then the weight can be synchronized for the next round of calculation. Synchronous update has wait, while asynchronous update basically has no wait, but asynchronous update involves more complex problems such as gradient obsolescence.

In practice, synchronous data parallel with single machine and multiple cards is the most commonly used, and the most common training method in this paper is single machine with eight cards. More data, generally need more machines and more cards.

Both single-machine and multi-machine training are distributed. Before Horovod came along, tensorFlow was the only officially recommended cluster training method.

However, tensorFlow’s cluster training is not easy to use.

Disadvantages of tensorFlow clusters

  1. Lots of concepts and a steep learning curve

Tensorflow cluster adopts parameter Server architecture, which introduces many complex concepts as listed below

server
client
master
cluster
parameter server
worker
job
task
replica_device_setter
master service
worker service
clone
Copy the code

The function that’s involved

tf.train.Server
tf.train.Supervisor
tf.train.SessionManager
tf.train.ClusterSpec
tf.train.replica_device_setter
tf.train.MonitoredTrainingSession
tf.train.MonitoredSession
tf.train.SingularMonitoredSession
tf.train.Scaffold
tf.train.SessionCreator
tf.train.ChiefSessionCreator
tf.train.WorkerSessionCreator
Copy the code

I have studied the relationship between server, client, master, Master Service, worker Service, Clone and session for many times, but I still haven’t made it clear. Roughly, the server instance is created in the client, and the session corresponds to the server one by one. The server contains two services, master Service and worker Service. The master service is responsible for communication with the outside world. For example, sess.run usually tells the master service of the server to start working, and the master service of the server notifies the worker service of the same server to start working, and the worker service transfers GPU computing. After completion, the result is returned to the master Service for weight update. In the case of multi-machine and multi-card distribution, gradient transmission and weight synchronization are carried out between the Parameter Server and the Master Service. Stackoverflow.com/questions/3…

  1. Large amount of modified code

If you want to move from a single-machine, single-card model to a multi-machine, multi-card model, the amount of code involved is measured in days, or even a week.

  1. Multiple machines are required to run different scripts

Tensorflow cluster is based on parameter Server architecture. In order to run multi-machine and multi-card cluster, each machine needs to start a client, that is, run a script to start training. With 100 machines, people will crash.

  1. The ratio between PS and worker is hard to choose

Tensorflow cluster will divide servers into ps and worker job types. There is no definite calculation formula for how much performance ps sets.

  1. Large performance loss

Tensorflow clusters do not perform well, falling below half of their desired performance over a certain size.

Horovod

Because TensorFlow clusters are so unfriendly, the industry has been experimenting with new clustering solutions. In 2017, Facebook released Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour verifies the high efficiency of big data parallelism. In the same year, Baidu released “Bringing HPC Techniques to Deep Learning”, which verifies the feasibility of the new gradient synchronization and weight update algorithm. Inspired by these two papers, Uber developed the Horovod cluster solution.

The convention is as follows: the network bandwidth is denoted as B (unit Mb/s), the total parameter data volume of the model is denoted as D (unit Mb), the total number of servers is denoted as n, the number of parameter servers is denoted as N_p (where n= N_p + N_W), and the number of worker servers is denoted as: N_w (n= n_p+ n_w) The calculation time for a single server is described as T_0

Gradient synchronization and weight update algorithm

1) Parameter Server architecture

The cluster architecture of TensorFlow is parameter Server architecture, and the data conduction model is shown in the following figure.

Then, the total time of parameter Server architecture cluster scheme can be calculated as follows:

It can be seen that T has a linear relationship with the total number of nodes n, but the total performance will be different with different parameter servers and woker server allocation schemes. Assuming that E represents the proportion of worker servers, that is, e= n_W /n, the optimal e value can be calculated as follows:

It can be seen that the proportion of the optimal worker server is related to the model size, network bandwidth and single operation market, which is not a superparameter with optimal value at a glance.

2) Horovod’s Ring-AllReduce algorithm

In the “Bringing HPC Techniques to Deep Learning” released by Baidu in 2017, a new gradient synchronization and weight synchronization algorithm, called Ring-AllReduce, was adopted. In this algorithm, each node only communicates with two adjacent nodes, and no parameter server is needed. Therefore, all nodes participate in computation as well as storage. A weight update mainly consists of two processes: 1) Cumulative gradient All gradients are divided into N fragments, and only one fragment gradient is transmitted to adjacent nodes each time. After n-1 times, the gradient of each fragment completes the accumulative gradient of this fragment of all nodes, but the accumulative value of no fragment is distributed on different nodes. Step 2 and step 3 in the figure below; 2) Distribute the accumulated gradient to all nodes. After the gradient accumulated in the first step is exchanged n-1 times again, the gradient of all nodes is synchronized. Steps 4 and 5 are shown below. After averaging, the weights are updated, and the weights of all nodes are updated.

The total time consuming of the Ring-AllReduce algorithm can be calculated as follows:

It can be seen that the total time is basically linear with the total number of nodes N (when n is large, 1/n is basically 0).

Horovod adopts ring-AllReduce algorithm for gradient synchronization and weight synchronization.

concept

Horovod’s data delivery is based on MPI, so the concepts involved are also concepts in MPI. Take four servers with four Gpus per server as an example.

  • Size The number of processes, that is, the number of gpus, is 16
  • Unique ID of the rank process, 0-15
  • Local rank Indicates the unique local ID of each process on the server. The value ranges from 0 to 3
  • Allreduce sums all data and synchronizes it to all nodes, as shown in the following figure

  • Allgather is the operation that collects all data and synchronizes it to all nodes, each node contains data for all nodes, and this data exists separately. The diagram below.

  • Broadcast The operation of propagating data (acknowledged by the root node) from one node to all other nodes

That’s about it, simple and clear.

Change the single machine single card to multi-machine multi-card

To change a training script that supports only a single machine and a single card to a training script that supports multiple machines and multiple cards, take Tensorflow as an example. The following changes are required:

import tensorflow as tf
import horovod.tensorflow as hvd


# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op)
Copy the code

As you can see, the changes are minor, with only about 10 lines of code added in six main steps:

1) Initialize horovod
hvd.init()
Copy the code
2) One GPU is bound to one process
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())
Copy the code
3) Enlarge the learning rate according to the total number of Gpus
Opt = tf. Train. AdagradOptimizer (0.01 * HVD. The size ())Copy the code

Since BatchSize will be scaled up according to the number of Gpus, the learning rate should also be scaled up

4) Use hvd.DistributedOptimizer to encapsulate the original Optimizer
opt = hvd.DistributedOptimizer(opt)
Copy the code

Distributed training involves gradient synchronization. The gradient calculation of each GPU is still performed by the original Optimizer, but the gradient synchronization is performed by hvd.DistributedOptimizer.

5) Broadcast initial variable values to all processes
hooks = [hvd.BroadcastGlobalVariablesHook(0)]
Copy the code

This is mainly to ensure that all process variables start with the same value

6) Save checkpoint only on worker 0
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None
Copy the code

Prevent checkpoint save errors

Horovod only needs to change the necessary changes, does not involve the parameter server architecture device Settings, tedious operations.

The training

To train on a single 4-card machine, simply execute the following command:

horovodrun -np 4 -H localhost:4 python train.py
Copy the code

Start training on 4 machines, 4 cards per machine, just execute the following command on one machine:

horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
Copy the code

Note that both single-machine and multi-machine commands need to be executed once on one machine, while horovod uses MPI to start processes and transfer data on other machines.

The performance comparison

The performance loss of Horovod is much smaller than that of TensorFlow as horovod grows in a linear fashion.

conclusion

Anyone who uses the TensorFlow cluster will appreciate how useful Horovod is, thanks to Baidu, Facebook and Uber for making deep learning even better.

Note, however, that Horovod’s distribution only seems to support synchronous and newer data parallelism, model parallelism, and asynchronous and newer data parallelism, which I haven’t tried and, according to the Ring-AllReduce algorithm, should not be supported.