0 x00 the

This series introduces the distributed optimizer, divided into three articles, the cornerstone, the Data parallel optimizer in DP/DDP/Horovod, and the PyTorch Distributed optimizer, progressive in depth.

This article introduces the optimizer in data parallel DP/DDP/Horovod.

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

PyTorch Distributed Autograd (4) —-

PyTorch Distributed Autograd (5) —-

PyTorch Distributed Autograd (6) —-

PyTorch Distributed optimizer (1)—- cornerstone

For better illustration, the code in this article will be streamlined accordingly.

0x01 Previous review

The main function of a conventional optimizer is to optimize with gradients and then update the current parameters: w.data -= w.grad * lr, and do so in a disciplined manner.

The optimizer in data parallelism is another case. Since each worker calculates the gradient by himself, the main technical difficulties of the optimizer are as follows:

  • Does each worker have its own optimizer? Or does only one worker have an optimizer, which can be used for unified optimization?
  • If there is only one optimizer, how can the gradients of each worker be combined so that each worker is passed to this unique optimizer?
  • If each worker has its own optimizer and the local optimizer is optimized into the local model, how can we ensure that the models in each worker are always consistent?

This varies from framework to framework.

Optimizer in 0x02 DP

2.1 process

In DP, we need to note that PyTorch uses multi-threaded parallelism, so there is only one optimizer in the application. This optimizer is also a normal type optimizer, and the flow is as follows:

  1. Each GPU in a separate thread will independently and parallel carry out forward calculation for their respective input data and calculate the output.
  2. Gather output on the Master GPU.
  3. Calculate losses on the main GPU.
  4. Scatter the losses between GPUs.
  5. Backward propagation is run on each GPU to calculate parameter gradients.
  6. Merge gradients on GPU 0.
  7. Perform gradient descent and update model parameters on the main GPU with gradient.
  8. The updated model parameters are copied to the remaining slave Gpus for subsequent iterations.

DP modifies the forward and BACKWARD methods, merges the gradients of each thread together and optimizes them so that although data is parallel, the optimizer does not need to modify them.

2.2 the use of

Specific use is as follows:

model=torch.nn.DaraParallel(model);
optimizer = torch.optim.SGD(model.parameters(), args.lr,
                                momentum=args.momentum,
                                weight_decay=args.weight_decay)

for batch_idx, (data, label) in pbar:   
    if args.cuda:
        data,label= data.cuda(),label.cuda(); # Data is placed on the default GPU
    data_v = Variable(data)
    target_var = Variable(label)
    
    prediction= model(data_v,target_var,args) # Multithreaded parallel forward propagation
    criterion = nn.CrossEntropyLoss()
    loss = criterion(prediction,target_var) # Calculate loss on default GPU
    
    optimizer.zero_grad()      
    loss.backward()  # Multithreaded parallel backward propagation
    optimizer.step() # update parameters
Copy the code

A simplified diagram is presented as follows. Gradient calculation is performed for each thread, and finally the gradient is merged to GPU 0 for optimization on GPU 0:

             Forward                                                    Backward
      +-------------------+                                       +------------------+
  +-->+ Thread 0 on  GPU0 +--+                                +-->+ Thread 1 on GPU0 +-+
  |   +-------------------+  |          GPU 0                 |   +------------------+ |
  |   +-------------------+  | output +---------------+ loss  |   +------------------+ |
+---->+ Thread 1 on  GPU1 +---------> |  Compute Loss +---------->+ Thread 2 on GPU1 +---+
| |   +-------------------+  |        +---------------+       |   +------------------+ | |
| |   +-------------------+  |                                |   +------------------+ | |
| +-->+ Thread 2 on  GPU2 +--+                                +-->+ Thread 3 on GPU2 +-+ |
|     +-------------------+                                       +------------------+   |
|                                                                                        |
|                                                                                        |
|                                    GPU 0                                               |
|   Model                  +-------------------------+    gradient                       |
+--------------------------+     optimizer.step      |  <--------------------------------+
                           +-------------------------+
Copy the code

0x03 Optimizer in DDP

The picture below, from kuaishou Bagua’s paper, compares the native training process with DDP/Horovod.

  • Vanilla above is the native training process, where the U part corresponds to the optimizer process. The main function of the conventional optimizer is to update the current parameters of the model according to the gradient:w.data -= w.grad * lr.
  • The following section is the DDP/Horovod optimization process, as you can see, the backward calculation and merge gradients are partially parallel.

3.1 process

In DDP, the common optimizer is still used, but the multi-process method is adopted. Each process completes the whole training process, but all-reduce is needed to merge the gradient in the backward calculation. Each process has its own separate optimizer, which is also a generic optimizer.

There are two features:

  • Each process maintains its own optimizer and performs a complete optimization step in each iteration. While this may seem redundant, because gradients are already gathered and averaged across processes, gradients are the same for each process, which means no parametric broadcast steps are required, reducing the time spent transferring tensors between nodes.
  • All-reduce operations are done in backward propagation.
    • A Reducer is generated during DDP initialization, and autograd_hook is registered internally.
    • Autograd_hook synchronizes gradients when propagating back.

DDP chose to modify the PyTorch kernel from the perspective of initialization and forward operations for the distribution Dataparallel model.

The specific logic is as follows:

  1. DDP uses multiple processes to load data in parallel. On host, each worker process loads data from the hard disk to Page-locked memory. The distributed MiniBatch Sampler ensures that the data loaded by each process does not overlap each other.
  2. Instead of broadcasting the data, the miniBatch data is loaded in parallel from Page-locked memory to each GPU. Each GPU has a copy of the model, so there is no need to copy the model.
  3. Run forward propagation on each GPU, compute the output, each GPU performs the same training, no need to have a master GPU.
  4. The loss is calculated on each GPU, backward propagation is run to calculate the gradient, and an All-reduce operation is performed on the gradient while calculating the gradient.
  5. Update model parameters. Since each GPU starts training from exactly the same model, and the gradient is all-reduced, each GPU finally gets the same copy of the average gradient at the end of the back propagation, and the weight update on all Gpus is the same. In this way, models on all workers are consistent, and model synchronization is not required.

Because it is also modified in the forward and backward operation of the model, the optimizer does not need to be modified, and each worker is optimized separately in its own local process.

3.2 Optimizer state

The important thing to note here is, how do you ensure that the optimizer state is the same for each process?

DDP is not actually associated with the optimizer, and DDP is not responsible for this, so it requires user coordination to ensure that the optimizer state is the same across processes. This revolves around two things:

  • The optimizer parameters have the same initial values. The same initial optimizer values are ensured by “the user does not initialize the Optimizer until the DDP model is created”.
  • Optimizer parameters have the same value each time they are updated. Each gradient update is all-reduce, so each optimizer gets the same gradient delta.

3.3 the use of

An example is as follows:

model = ToyModel().to(rank)
# Construct DDP model
ddp_model = DDP(model, device_ids=[rank])

loss_fn = nn.MSELoss()
The optimizer can only be initialized after the DDP model is constructed.
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

optimizer.zero_grad()
outputs = ddp_model(torch.randn(20.10))
labels = torch.randn(20.5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
Copy the code

The illustration is as follows:

+--------------------------------------------------------------------------------------+
| Process 1 on GPU 1                                                                   |
|                              +------------------------------+                        |
|                              | Backward                     |                        |
|                              |                              |                        |
| Forward +---->  Loss +-----> |  Compute  +---->  ALL+REDUCE | +---->  Optimizer.step |
|                              |                     ^        |                        |
|                              |                     |        |                        |
|                              +------------------------------+                        |
|                                                    |                                 |
|                                                    |                                 |
+--------------------------------------------------------------------------------------+
                                                     |
                                                     |
                                                     |
                                                     |
                                                     +
                                                 SYNC GRADS
                                                     +
                                                     |
                                                     |
                                                     |
+--------------------------------------------------------------------------------------+
| Process 2 on GPU 2                                 |                                 |
|                                                    |                                 |
|                              +------------------------------+                        |
|                              | Backward            |        |                        |
|                              |                     v        |                        |
| Forward +---->  Loss +-----> |  Compute  +---->  ALL+REDUCE | +---->  Optimizer.step |
|                              |                              |                        |
|                              |                              |                        |
|                              +------------------------------+                        |
|                                                                                      |
+--------------------------------------------------------------------------------------+
Copy the code

0x04 Optimizer for Horovod

Instead of modifying the model FW/BW (probably because it doesn’t work as well as Facebook itself), Horovod modified the optimizer and implemented a DistributedOptimizer.

We use horovod/torch/optimizer. Py, for example.

An optimizer that wraps another torch.optim.Optimizer, using an allreduce to
combine gradient values before applying gradients to model weights.

Allreduce operations are executed after each gradient is computed by ``loss.backward()``
in parallel with each other. The ``step()`` method ensures that all allreduce operations are
finished before applying gradients to the model.
Copy the code

The DistributedOptimizer wraps another torch.optim.optimizer, which does:

  • Execute in parallel on workerloss.backward()After each gradient has been calculated, allReduce is used to merge gradients at the point in time “before applying gradients to model weights.”
  • usestep()Method to ensure that all AllReduce operations are completed before applying gradients to the model.

The implementation of _DistributedOptimizer is _DistributedOptimizer, and _DistributedOptimizer merges gradients in two ways, one by hook and the other by explicitly calling Synchronize.

4.1 Hook Synchronization gradient

Hook adopts PyTorch hook method, which is very similar to DDP. That is, the hook is registered on the gradient calculation function, whose function is to call the hook after the gradient calculation. In this way, all-reduce is automatically completed during gradient calculation. There is no need to wait for the step method to be called explicitly to complete (like DP), specifically:

  1. The loss is calculated on each GPU, backward propagation is run to calculate the gradient, and an All-reduce operation is performed on the gradient while calculating the gradient.
  2. Update model parameters. Because each GPU starts training from exactly the same model, and the gradient is all-reduced, each GPU ends up with the same copy of the mean gradient at the end of the backpropagation, weight updates are the same on all Gpus, and model synchronization is not required.

Note: The code is divided into two main parts, dealing with groups-specific and common cases.

Groups is a related configuration for PyTorch to put gradient AllReduce operations together, but since the code is complex and irrelevant to the main logic of this article, we’ll skip this section and just look at the normal non-grouped case.

groups: The parameter to group the gradient allreduce ops. Accept values is a
        non-negative integer or a list of list of tf.Variable.
        If groups is a non-negative integer, it is the number of groups to assign
        gradient allreduce ops to for explicit grouping.
        If groups is a list of list of tf.Variable. Variables in the same
        inner list will be assigned to the same group, while parameter that does
        not appear in any list will form a group itself.
        Defaults as None, which is no explicit groups.
Copy the code

4.4.1 registered hooks

The Hook function is divided into two steps. The first part is to register hooks.

    def _register_hooks(self) :

        if self._groups is not None: # groups = gradient groups
            p_list = []
            # Get list of parameters with grads
            for param_group in self.param_groups:
                for p in param_group['params'] :if p.requires_grad:
                        p_list.append(p)

            # To ensure parameter order and group formation is consistent, broadcast p_list order
            # from rank 0 and use for every worker
            p_list_names = [self._parameter_names.get(p) for p in p_list]
            p_list_names = broadcast_object(p_list_names, root_rank=0)
            p_list = sorted(p_list, key=lambda p : p_list_names.index(self._parameter_names.get(p)))

            # Form groups
            if isinstance(self._groups, list):
                p_groups = []
                grouped_id = set()
                p_list_ids = [id(p) for p in p_list]
                for group in self._groups:
                    p_groups.append([p for p in group if id(p) in p_list_ids])
                    for p in p_groups[-1]:
                        grouped_id.add(id(p))
                for p in p_list:
                    if id(p) not in grouped_id:
                        p_groups.append([p])
            else:
                p_groups = split_list(p_list, self._groups)

            p_groups = [tuple(p) for p in p_groups]
            for group in p_groups:
                for p in group:
                    self._p_to_group[p] = group
                self._group_counts[group] = 0

        # registered hooks
        for param_group in self.param_groups: # traversal group
            for p in param_group['params'] :Loop through the group parameters
                if p.requires_grad: # If you need to calculate the gradient
                    p.grad = p.data.new(p.size()).zero_()
                    self._requires_update.add(p)
                    p_tmp = p.expand_as(p)
                    grad_acc = p_tmp.grad_fn.next_functions[0] [0] Get the gradient function
                    grad_acc.register_hook(self._make_hook(p)) Register hook to gradient function
                    self._grad_accs.append(grad_acc)
Copy the code

_make_hook builds hooks that return hook functions that are called on back-propagation with all-reduce executed internally.

def _make_hook(self, p) :
    def hook(*ignore) :
        # omit some code
        handle, ctx = None.None
        self._allreduce_delay[p] -= 1
        if self._allreduce_delay[p] == 0:
            if self._groups is not None: # deal with the part about groups, which we'll skip
                group = self._p_to_group[p]
                self._group_counts[group] += 1
                if self._group_counts[group] == len(group):
                    handle, ctxs = self._grouped_allreduce_grad_async(group) All reduce is performed when called
                    self._handles[group] = (handle, ctxs)
                    # Remove any None entries from previous no-op hook calls
                    for gp in group:
                        self._handles.pop(gp, None)
                    self._group_counts[group] = 0
                    return
            else:
                handle, ctx = self._allreduce_grad_async(p) All reduce is performed when called
        self._handles[p] = (handle, ctx) Register handle locally for future use
        
    return hook
Copy the code

4.1.2 Merge gradient

The second stage is merge, that is, hook function is called in the backpropagation stage to carry out all-reduce.

def _allreduce_grad_async(self, p) :
    name = self._parameter_names.get(p)
    tensor = p.grad
    tensor_compressed, ctx = self._compression.compress(tensor)

    if self.op == Average:
       # Split average operation across pre/postscale factors
       # C++ backend will apply additional 1 / size() factor to postscale_factor for op == Average.
        prescale_factor = 1.0 / self.gradient_predivide_factor
        postscale_factor = self.gradient_predivide_factor
    else:
        prescale_factor = 1.0
        postscale_factor = 1.0

    # Call allreduce_async_ to complete the MPI call
    handle = allreduce_async_(tensor_compressed, name=name, op=self.op,
                              prescale_factor=prescale_factor,
                              postscale_factor=postscale_factor)
    return handle, ctx

def _grouped_allreduce_grad_async(self, ps) :
    name = self._parameter_names.get(ps[0])
    tensors_compressed, ctxs = zip(*[self._compression.compress(p.grad) for p in ps])

    handle = grouped_allreduce_async_(tensors_compressed, name=name, op=self.op)
    return handle, ctxs
Copy the code
4.1.2.1 MPI function

The specific MPI function is located in horovod/torch/mpi_ops.py

The main point here is that allreduce_async_ returns a handle, which can then be controlled, such as poll or Synchronize.

def allreduce_async_(tensor, average=None, name=None, op=None,
                     prescale_factor=1.0, postscale_factor=1.0) :
    """ A function that performs asynchronous in-place averaging or summation of the input tensor over all the Horovod processes. The reduction operation is keyed by the name. If name is not provided, an incremented auto-generated name is used. The tensor type and shape must be the same on all Horovod processes for a given name. The reduction will not start until all processes are ready to send and receive the tensor. Arguments: tensor: A tensor to reduce. average: .. warning:: .. Deprecated :: 0.19.0 Use 'op' instead. Will be removed in V0.21.0. Name: A name of the reduction operation. The reduction operation to combine tensors across different ranks. Defaults to Average if None is given. prescale_factor: Multiplicative factor to scale tensor before allreduce. postscale_factor: Multiplicative factor to scale tensor after allreduce. Returns: A handle to the allreduce operation that can be used with `poll()` or `synchronize()`. """
    op = handle_average_backwards_compatibility(op, average)
    return _allreduce_async(tensor, tensor, name, op, prescale_factor, postscale_factor)
Copy the code

_allreduce_async is located in horovod/ Torch /mpi_ops.py, which extracts functions from the MPI library for processing.

def _allreduce_async(tensor, output, name, op, prescale_factor, postscale_factor) :
    # Set the divisor for reduced gradients to average when necessary
    if op == Average:
        if rocm_built():
            # For ROCm, perform averaging at framework level
            divisor = size()
            op = Sum
        else:
            divisor = 1

    elif op == Adasum:
        if tensor.device.type! ='cpu' and gpu_available('torch') :if nccl_built():
                if rocm_built():
                    # For ROCm, perform averaging at framework level
                    divisor = local_size()
                else:
                    divisor = 1
            else:
                divisor = 1
        else:
            divisor = 1
    else:
        divisor = 1

    function = _check_function(_allreduce_function_factory, tensor)
    try:
        handle = getattr(mpi_lib, function)(tensor, output, divisor,
                                            name.encode() if name is not None else _NULL, op,
                                            prescale_factor, postscale_factor)
    except RuntimeError as e:
        raise HorovodInternalError(e)
    _handle_map[handle] = (tensor, output)
    return handle
Copy the code
4.1.2.2 schematic diagram

This diagram is similar to DDP, so I’ll skip it.

4.2 Step Synchronous gradient

Step is another way to perform all-Reduce operations.

The step function is defined as follows. As you can see, if you want to force synchronization, call self.synchronize(); otherwise, call the step function of the base class to update the parameters.

    def step(self, closure=None) :
        if self._should_synchronize:
            if self._synchronized:
                warnings.warn("optimizer.step() called without "
                              "optimizer.skip_synchronize() context after "
                              "optimizer.synchronize(). This can cause training "
                              "slowdown. You may want to consider using "
                              "optimizer.skip_synchronize() context if you use "
                              "optimizer.synchronize() in your code.")
            self.synchronize()
        self._synchronized = False
        return super(self.__class__, self).step(closure)
Copy the code

2 the synchronize

So let’s take a closer look at Synchronize.

Note that synchronize() is used to force an Allreduce operation to complete, which is particularly useful for gradient clipping or other in-place gradient modification operations that need to be performed before step().

Synchronize () works with optimizer.skip_synchronize().

DistributedOptimizer exposes the ``synchronize()`` method, which forces allreduce operations
to finish before continuing the execution. It's useful in conjunction with gradient
clipping, or other operations that modify gradients in place before ``step()`` is executed.
Make sure to use ``optimizer.skip_synchronize()`` if you're calling ``synchronize()``
in your code.
Copy the code

4.2.2 Gradient Cropping

First of all, we need to understand what gradient explosion is. Gradient explosion refers to that in the process of model training, the model is unstable because the gradient becomes too large, and it is easy to directly skip the optimal solution. Gradient clipping is a technique for dealing with gradient explosion: if the gradient gets too big, tune it to keep it small so that the model doesn’t overdo its best.

To coordinate with gradient clipping, you need to call Synchronize before step to force all-reduce completion. Examples from the source code are as follows:

    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.synchronize()
    torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip)
    with optimizer.skip_synchronize():
        optimizer.step()
Copy the code

Holdings implementation

Let’s look at the implementation of Synchronize next. Outputs = synchronize(handle) calls horovod.torch. Mpi_ops. Synchronize.

from horovod.torch.mpi_ops import synchronize

def synchronize(self) :
    completed = set(a)for x in self._handles.keys():
      completed.update(x) if isinstance(x, tuple) else completed.add(x)
    missing_p = self._requires_update - completed # Find the gradient that is not currently calculated
    
    for p in missing_p:
        handle, ctx = self._allreduce_grad_async(p) # Perform explicit All-reduce if the calculation is not complete
        self._handles[p] = (handle, ctx) Record the handle of this calculation

    for p, (handle, ctx) in self._handles.items():
        if handle is None: # If there is no record of an all-reduce call
            handle, ctx = self._allreduce_grad_async(p)  # for all - the reduce
            self._handles[p] = (handle, ctx)
            
    for p, (handle, ctx) in self._handles.items(): # Finally unified synchronization!
        if isinstance(p, tuple) :# This was a grouped result, need to unpack
            outputs = synchronize(handle) Call mpI synchronization
            for gp, output, gctx in zip(p, outputs, ctx):
                self._allreduce_delay[gp] = self.backward_passes_per_step
                gp.grad.set_(self._compression.decompress(output, gctx))
        else:
            output = synchronize(handle) Call mpI synchronization
            self._allreduce_delay[p] = self.backward_passes_per_step
            p.grad.set_(self._compression.decompress(output, ctx))
            
    self._handles.clear()

    self._synchronized = True
Copy the code

4.2.4 MPI Synchronization Operations

The code is located in horovod/torch/mpi_ops.py, which calls the MPI library function directly.

def synchronize(handle) :
    """ Synchronizes an asynchronous allreduce, allgather or broadcast operation until it's completed. Returns the result of the operation. Arguments: handle: A handle returned by an allreduce, allgather or broadcast asynchronous operation. Returns: An output tensor of the operation. """
    if handle not in _handle_map:
        return

    try:
        mpi_lib.horovod_torch_wait_and_clear(handle)
        output = _handle_map.pop(handle)[-1]
        return output
    except RuntimeError as e:
        raise HorovodInternalError(e)
Copy the code

4.2.5 graphic

The current logic is as follows:

+---------------------------------------------------------------------------------+
| Process 1 on GPU 1                                                              |
|                                                 +----------------------------+  |
|                                                 | Optimizer                  |  |
|                                                 |                            |  |
| Forward +---->  Loss +----->  Backward  +---->  |     ALL-REDUCE +----> step |  |
|                                                 |                            |  |
|                                                 |            ^               |  |
|                                                 |            |               |  |
|                                                 +----------------------------+  |
|                                                              |                  |
+---------------------------------------------------------------------------------+
                                                               |
                                                               |
                                                               |
                                                               |
                                                               |
                                                           SYNC|GRADS
                                                               |
                                                               |
                                                               |
                                                               |
+----------------------------------------------------------------------------------+
| Process 2 on GPU 2                                           |                   |
|                                                              |                   |
|                                                 +-----------------------------+  |
|                                                 | Optimizer  |                |  |
|                                                 |            |                |  |
| Forward +---->  Loss +----->   Backward  +----> |            v                |  |
|                                                 |     ALL-REDUCE +----> step  |  |
|                                                 |                             |  |
|                                                 +-----------------------------+  |
|                                                                                  |
+----------------------------------------------------------------------------------+
Copy the code

Now that the data parallel optimizer has been analyzed, stay tuned for our next article on the PyTorch Distributed optimizer.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Torch. Optim. Optimizer source code read and flexible use

Optimizer is the optimizer principle

Pytorch Optimizer (Optim) operates with different parameter groups and different learning rate Settings

Pytorch momentum, momentum

Various optimization methods to summarize comparison (SGD/momentum/Nesterov/adagrad/adadelta)

Optimizer algorithm and PyTorch implementation (1) : indelible SGD

The PyTorch optimizer is introduced using Optim.sgd as an example

—- Optimizer (SGD, Adam)

Optimizing neural networks and optimizer selection using Torch. Optim in PyTorch — PyTorch Chinese

Pytorch optimizer: SGD

Pytorch addmm() and addmm_(

Visualization tools under PyTorch

PyTorch’s optimizer