0 x00 the

This series introduces the distributed optimizer, divided into three articles, the cornerstone, the Data parallel optimizer in DP/DDP/Horovod, and the PyTorch Distributed optimizer, progressive in depth. This article introduces the PyTorch distributed optimizer and the optimizer in PipeDream, focusing on model parallelism (pipeline parallelism).

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

PyTorch Distributed Autograd (4) —-

PyTorch Distributed Autograd (5) —-

PyTorch Distributed Autograd (6) —-

PyTorch Distributed optimizer (1)—- cornerstone

PyTorch Distributed optimizer (2)—- Data parallel optimizer

For better illustration, the code in this article will be streamlined accordingly.

0x01 Previous review

Before whether it was DP, DDP, or Horovod, essentially all dealt with data parallelism, such as DDP copying the same model to all Gpus, where each GPU uses a different partition of the input data. While it can significantly speed up the training process, it is not suitable for some use cases where the model is too large to fit into a single GPU. Model parallel was introduced.

Accordingly, the optimizer needs to be modified differently to accommodate model parallelism. For better analysis, this article first introduces stand-alone model parallelism and then the PyTorch distributed optimizer.

0x02 Single-machine Model

The following text is from pytorch.org/tutorials/i…

Model parallelism is widely used in distributed training. In contrast to DataParallel, model parallelism splits a single model onto different Gpus, instead of replicating the entire model on each GPU. (Specifically, if a model M contains 10 layers, each GPU will own all 10 layers when using DataParallel. When using model parallelism on two Gpus, each GPU can host 5 layers).

The high-level idea of model parallelism is to place the different subnetworks of the model on different devices and implement the forward method accordingly to move the intermediate output across devices. Because only part of the model is running on a single device, a group of devices can collectively serve a larger model.

In this article, we won’t try to build huge models and compress them into a finite number of Gpus. Instead, this article focuses on showing the idea of model parallelism. Readers can apply these ideas to practical applications.

2.1 Basic Usage

Let’s start with a toy model with two linear layers. To run this model on two Gpus, simply place each linear layer on a different GPU and move the inputs and intermediate outputs accordingly to match the layer devices.

import torch
import torch.nn as nn
import torch.optim as optim


class ToyModel(nn.Module) :
    def __init__(self) :
        super(ToyModel, self).__init__()
        self.net1 = torch.nn.Linear(10.10).to('cuda:0')
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10.5).to('cuda:1')

    def forward(self, x) :
        x = self.relu(self.net1(x.to('cuda:0')))
        return self.net2(x.to('cuda:1'))
Copy the code

The ToyModel code looks very similar to how it would be implemented on a single GPU. Only two parts are modified: the network construction part and the forward part.

  • __init__The method uses twoto(device)Statements are used to place a linear layer on the appropriate device, thus splitting the network into two parts that can then be run on different Gpus.
  • The forward method uses twoto(device)The tensor statement is used to place tensors on the appropriate device so that the output of one layer can be copied to the GPU of the other layer through the tensor.to semantics.

This is the only part of the model that needs to be changed. Backward () and Torch. Optim will cope with this, they automatically take over gradients as if the model were on a GPU. When calling the loss function, you just need to make sure that the label is on the same device as the network’s output.

model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

optimizer.zero_grad()
outputs = model(torch.randn(20.10))
labels = torch.randn(20.5).to('cuda:1')
loss_fn(outputs, labels).backward()
optimizer.step()
Copy the code

The most important thing here is labels = torch. Randn (20, 5). To (‘cuda:1′), which ensures that the labels are in CUDa :1’.

Recall the previous forward code: self.net2(x.tro (‘cuda:1′)). These two lines of code ensure that the label and output are on the same device CUDA :1’.

After initialization:

+--------------------+                       +------------------------+
| cuda:0             |                       | cuda:1                 |
|                    |                       |                        |
|                    |                       |                        |
|                    |                       |                        |
|       net1(x)      |                       |        net2(x)         |
|                    |                       |                        |
|                    |                       |                        |
|                    |                       |                        |
+--------------------+                       +------------------------+

Copy the code

After the forward operation and setting the label, the output and label are now on GPU 1:

               +--------------------+                       +------------------------+
               | cuda:0             |                       | cuda:1                 |
               |                    |                       |                        |
               |                    |                       |                        |
               |                    |                       |                        |
x.to('cuda:0')-------> net1(x)  +-------> x.to('cuda:1') +-------->  net2(x)         |
               |                    |                       |                        |
               |                    |                       |   labels.to('cuda:1') | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +Copy the code

2.2 Parallel application of models to existing modules

You can also convert an existing single GPU module to run on multiple Gpus by changing a few lines of code. The following code demonstrates how to decompose torchvision. Models. Resnet50 () to the above two GPU. The basic idea is to inherit the existing ResNet module and split the layer into two Gpus during the build process. The forward method is then overridden to concatenate the two subnetworks by moving the intermediate output accordingly.

from torchvision.models.resnet import ResNet, Bottleneck

num_classes = 1000

class ModelParallelResNet50(ResNet) :
    def __init__(self, *args, **kwargs) :
        super(ModelParallelResNet50, self).__init__(
            Bottleneck, [3.4.6.3], num_classes=num_classes, *args, **kwargs)

        self.seq1 = nn.Sequential(
            self.conv1,
            self.bn1,
            self.relu,
            self.maxpool,

            self.layer1,
            self.layer2
        ).to('cuda:0')

        self.seq2 = nn.Sequential(
            self.layer3,
            self.layer4,
            self.avgpool,
        ).to('cuda:1')

        self.fc.to('cuda:1')

    def forward(self, x) :
        x = self.seq2(self.seq1(x).to('cuda:1'))
        return self.fc(x.view(x.size(0), -1))
Copy the code

The above implementation solves the problem where the model is too large to fit into a single GPU. However, you may have noticed that even if your model is suitable for this situation, it may be slower than running on a single GPU. This is because, at any given time, only one of the two Gpus is working and the other is sitting there doing nothing. The need to copy intermediate output from CUDA :0 to CUDA :1 between Layer2 and Layer3 further causes performance deterioration.

Let’s run an experiment to understand execution time in a more quantifiable way. In this experiment, we by running the random input and a label to train ModelParallelResNet50 and existing torchvision models. Resnet50 (). After training, the model will not produce any useful predictions, but we can have a reasonable idea of the execution time.

import torchvision.models as models

num_batches = 3
batch_size = 120
image_w = 128
image_h = 128


def train(model) :
    model.train(True)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)

    one_hot_indices = torch.LongTensor(batch_size) \
                           .random_(0, num_classes) \
                           .view(batch_size, 1)

    for _ in range(num_batches):
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, image_w, image_h)
        labels = torch.zeros(batch_size, num_classes) \
                      .scatter_(1, one_hot_indices, 1)

        # run forward pass
        optimizer.zero_grad()
        outputs = model(inputs.to('cuda:0'))

        # run backward pass
        labels = labels.to(outputs.device)
        loss_fn(outputs, labels).backward()
        optimizer.step()
Copy the code

The train(Model) method above uses nn.mseloss as the loss function and optim.sgd as the optimizer. It mimics the training of 128 X 128 images, which are organized into 3 batches containing 120 images each. We then run train(Model) 10 times using timeit and plot the execution time in terms of standard deviation.

import matplotlib.pyplot as plt
plt.switch_backend('Agg')
import numpy as np
import timeit

num_repeat = 10

stmt = "train(model)"

setup = "model = ModelParallelResNet50()"
mp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
mp_mean, mp_std = np.mean(mp_run_times), np.std(mp_run_times)

setup = "import torchvision.models as models;" + \
        "model = models.resnet50(num_classes=num_classes).to('cuda:0')"
rn_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
rn_mean, rn_std = np.mean(rn_run_times), np.std(rn_run_times)


def plot(means, stds, labels, fig_name) :
    fig, ax = plt.subplots()
    ax.bar(np.arange(len(means)), means, yerr=stds,
           align='center', alpha=0.5, ecolor='red', capsize=10, width=0.6)
    ax.set_ylabel('ResNet50 Execution Time (Second)')
    ax.set_xticks(np.arange(len(means)))
    ax.set_xticklabels(labels)
    ax.yaxis.grid(True)
    plt.tight_layout()
    plt.savefig(fig_name)
    plt.close(fig)


plot([mp_mean, rn_mean],
     [mp_std, rn_std],
     ['Model Parallel'.'Single GPU'].'mp_vs_rn.png')
Copy the code

The results show that the execution time required by model parallelism is 4.02/3.75-1=7% longer than that required by GPU implementation. So we can conclude that copying a tensor back and forth between gpus is about 7% overhead.

2.3 Problems and Solutions

2.3.1 Current status

To sum up where we stand:

  • There are multiple Gpus, but at any point during the entire execution, only one GPU is computing and the other Gpus are idle.
  • There is also the copying of intermediate results between gpus, which also deteriorates performance.

Therefore, we need to deal with these two problems in a targeted manner:

  • Get all gpus moving.
  • Reduces the copy transfer time.

2.3.2 Solution

The solutions to the two problems are as follows:

One option to get all gpus moving is to join a pipelining mechanism: each batch is further divided into a split pipeline, so that when a split reaches a second subnetwork, subsequent splits can be fed into the first. This way, two consecutive splits can run simultaneously on both Gpus.

Why is this possible? This is because of CUDA’s asynchronous parallel execution logic.

  • Some CUDA operations are asynchronous, such as nuclear emission, data copying between devices, copying small chunks of storage between hosts and devices, and so on.
  • Almost all CUDA devices with computing power 1.1 or higher support concurrent replication and core execution, meaning that data copying and numerical computation can be done in parallel.
  • Some 2.x computing power devices can execute multiple kernels concurrently.
  • On some 2.x computing power devices, copies in both directions can be done in parallel (GPU to CPU, CPU to GPU).

How can I reduce the copy transfer time? This can be done using some combination of hardware and software to increase bandwidth and reduce latency, such as:

  • Hardware components include PCIe, NVlink, and NVSwitch components. RDMA network between multiple machines (IB or RoCE).
  • Software stack includes: A series of GPUDirect technologies: P2P (peer-to-peer), RDMA, Async, Storage, etc.

PyTorch uses the NCCL library (based on CUDA computing).

2.4 Input acceleration through pipeline

In the following experiment, we further divided each “120 image batches “into “20 image splits”. Because PyTorch starts CUDA operations asynchronously, the implementation does not need to generate multiple threads for concurrency.

class PipelineParallelResNet50(ModelParallelResNet50) :
    def __init__(self, split_size=20, *args, **kwargs) :
        super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
        self.split_size = split_size

    def forward(self, x) :
        splits = iter(x.split(self.split_size, dim=0))
        s_next = next(splits)
        s_prev = self.seq1(s_next).to('cuda:1')
        ret = []

        for s_next in splits:
            # A. s_prev runs on cuda:1
            s_prev = self.seq2(s_prev)
            ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

            # B. s_next runs on cuda:0, which can run concurrently with A
            s_prev = self.seq1(s_next).to('cuda:1')

        s_prev = self.seq2(s_prev)
        ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

        return torch.cat(ret)


setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)

plot([mp_mean, rn_mean, pp_mean],
     [mp_std, rn_std, pp_std],
     ['Model Parallel'.'Single GPU'.'Pipelining Model Parallel'].'mp_vs_rn_vs_pp.png')
Copy the code

Note that the device-to-device tensor copy operation synchronizes on the current flow on the source and target devices. If you create multiple streams, you must ensure that the replication operations are properly synchronized. Writing to the source tensor or reading/writing to the target tensor before completing the replication operation may result in undefined behavior. The above implementation uses the default stream only on the source and target devices, so there is no need to force additional synchronization.

Experimental results show that the training process is about 3.75/2.51-1=49% faster after pipeline input is added to the ResNet50 model parallelism. Although it is far from the ideal 100 percent acceleration. Since we introduced a new parameter split_SIZES in the pipeline parallel implementation, it is not clear how this new parameter affects the overall training time. Intuitively, using a small SPLit_size causes many tiny CUDA cores to boot, while using a large SPLit_size results in a relatively long idle time between the first and last split. Neither is optimal. Split_size this particular experiment may have an optimal configuration. Let’s try to find it by running the experiment with several different split_size values.

means = []
stds = []
split_sizes = [1.3.5.8.10.12.20.40.60]

for split_size in split_sizes:
    setup = "model = PipelineParallelResNet50(split_size=%d)" % split_size
    pp_run_times = timeit.repeat(
        stmt, setup, number=1, repeat=num_repeat, globals=globals())
    means.append(np.mean(pp_run_times))
    stds.append(np.std(pp_run_times))

fig, ax = plt.subplots()
ax.plot(split_sizes, means)
ax.errorbar(split_sizes, means, yerr=stds, ecolor='red', fmt='ro')
ax.set_ylabel('ResNet50 Execution Time (Second)')
ax.set_xlabel('Pipeline Split Size')
ax.set_xticks(split_sizes)
ax.yaxis.grid(True)
plt.tight_layout()
plt.savefig("split_size_tradeoff.png")
plt.close(fig)
Copy the code

The results showed that setting split_size to 12 achieved the fastest training speed, resulting in 3.75/2.43-1=54% acceleration. We still have the opportunity to accelerate the training process further. For example, currently all operations on CUDA :0 are placed on its default stream. This means that the computation of the next split cannot overlap with the copy operation of the previous split. However, since prev and Next split are different tensors, there is no problem superimposing a calculation of one tensor with a copy of the other. This implementation requires multiple streams on two Gpus, and different subnet structures require different stream management policies. Since there is no one-size-fits-all multi-flow solution for all model parallel use cases, we will not discuss it in this tutorial.

This article presents several performance measurements. When you run the same code on your own machine, you may see different performance results, depending on the underlying hardware and software. The correct way to get the best performance for your environment is to first generate the resulting curve, determine the optimal split size based on the curve, and then apply that split size to the pipe input.

0x03 Distributed Problems and Solutions

Now that we’ve looked at model parallelism on a single machine, we’ll look at distributed model parallelism training across multiple servers.

3.1 train of thought

Let’s imagine what we would do if we implemented the distributed optimizer ourselves.

If the model is divided into three parts, there are three hosts to train.

+----------------------------------------------------------------+
| Model                                                          |
|                                                                |
| +-----------------+  +------------------+  +-----------------+ |
| | Sub+model 1     |  | Sub+model 2      |  | Sub+model 3     | |
| |                 |  |                  |  |                 | |
| |                 |  |                  |  |                 | |
| +-----------------+  +------------------+  +-----------------+ |
|                                                                |
+----------------------------------------------------------------+

+-------------------+  +------------------+  +-----------------+
| Host 1            |  | Host 2           |  | Host 3          |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
|                   |  |                  |  |                 |
+-------------------+  +------------------+  +-----------------+
Copy the code

We will explicitly deploy these three parts on each of the three hosts, each with its own set of training code, each with its own local optimizer that optimizes the parameters of the local submodel.

+---------------------+         +---------------------+         +---------------------+
| Host 1              |         | Host 2              |         | Host 3              |
|                     |         |                     |         |                     |
| +-----------------+ |         | +-----------------+ |         | +-----------------+ |
| | Sub model 1     | |forward  | | Sub model 2     | |forward  | | Sub model 3     | |
| |                 | +-------> | |                 | +-------> | |                 | |
| |_parameters <--+ | |         | |_parameters <--+ | |         | |_parameters <--+ | |
| |               | | | <-------+ |               | | | <-------+ |               | | |
| |               | | | backward| |               | | | backward| |               | | |
| +-----------------+ |         | +-----------------+ |         | +-----------------+ |
|                 |   |         |                 |   |         |                 |   |
|                 |   |         |                 |   |         |                 |   |
| ------------------+ |         | +-----------------+ |         | +-----------------+ |
| |Optimizer 1    | | |         | | Optimizer 2   | | |         | | Optimizer 3   | | |
| |               | | |         | |               | | |         | |               | | |
| |    step() +---+ | |         | |    step() +---+ | |         | |     step()+---+ | |
| |                 | |         | |                 | |         | |                 | |
| +-----------------+ |         | +-----------------+ |         | +-----------------+ |
+---------------------+         +---------------------+         +---------------------+
Copy the code

But there are several problems we need to solve:

  • How to divide models on different machines? How do I split up the code on different machines?
  • How can forward and back propagation be linked together across machines?
  • Do machines run synchronously or asynchronously?
  • If it is synchronous, how can the whole system run in the same step?
  • How do you put these optimizers together? Or do the optimizers work independently and have nothing to do with each other?
  • How do you try to get the user to change the code less?
  • How can developers feel like they’re working on native version code?

If you think about it, it’s complicated. If we implement it ourselves based on PyTorch, you’ll see that this might end up being a PipeDream. So let’s see what PyTorch does.

3.2 PyTorch’s idea

PyTorch uses RPC to solve these problems.

3.2.1 The Four Heavenly Kings

As mentioned earlier, PyTorch’s distributed framework uses four Heavenly Kings:

  • ** Remote procedure call (RPC) ** Runs the function on the specified worker with the given arguments and gets the return value or creates a reference to the return value. There are three main apis:rpc_sync()(Synchronous),rpc_async()(asynchronous) andremote()Async and returns a reference to a remote return value.
    • If the user code cannot continue without a return value, use the synchronization API.
    • Otherwise, use the asynchronous API to get the Future and wait for it when the caller needs to return a value.
    • The Remote () API is suitable for situations where you need to create something remotely but never get it to the caller.
  • A remote reference (RRef) is a distributed shared pointer to a local or remote object, a local or cross-machine variable reference.
  • **Distributed Autograd ** sews together all the local Autograd engines that participate in the forward propagation of workers and automatically connects them during the backward propagation to calculate the gradient. This is especially useful when forward passing needs to span multiple machines, such as distributed model parallel training, parameter server training, and so on. With this feature, user code no longer needs to worry about how to send gradients across RPC boundaries and in what order the local Autograd engine should be started, which can become very complicated if there are nested and interdependent RPC calls in the forward pass.
  • Distribution optimizerThe construction of aOptimizer()(e.g.,SGD().Adagrad()Etc.) and a list of RRefs parameters. That is, create one on top of each different Ref ownerOptimizer()Instance, and then runstep()Update parameters accordingly. When the user carries out distributed forward and backward propagation, parameters and gradients will be scattered among multiple workers, so it is necessary to optimize each related worker. The Distributed Optimizer combines all of these native optimizers into one, providing concise constructors andstep()API.

3.2.2 Logical Relationship

Using the official diagram, we can see the internal architecture and logical relationships of the PyTorch distributed package. The distributed optimizer builds on the other three.

We’ll explain how to use it later with code.

0x04 PyTorch Distributed optimizer

First of all, we will ignore all parts of script for clarity of analysis.

4.1 the sample

DistributedOptimizer can be used as follows:

  1. Gets a list of remote parameters to tune (RRef). These can also be local parameters wrapped in a local RRef.
  2. willOptimizerClass acts as the local optimizer to run all RRef Owners.
  3. The distributed optimizer creates instances of its local optimizers on each worker node and holds rrefs for those local optimizers.
  4. When callingtorch.distributed.optim.DistributedOptimizer.step(), the distributed optimizer remotely executes all local optimizers on the appropriate remote worker using RPC.torch.distributed.optim.DistributedOptimizer.stepYou must obtain a distributed autogradcontext_idAs input, the local optimizer saves gradients in the relevant context.
  5. If multiple concurrent distributed optimizers update the same parameters on the worker at the same time, those updates are serialized through locks.

It seems a little abstract, so we need to do it step by step.

4.2 Simple end-to-end example

To sum up, here is a simple end-to-end example using distributed Autograd and distributed optimizer. If you put your code in a file called “dist_autograd_simple.py”, To run this command, run MASTER_ADDR=”localhost” MASTER_PORT=29500 python dist_autograd_simple.py:

import multiprocessing as mp
import torch
import torch.distributed.autograd as dist_autograd
from torch.distributed import rpc
from torch import optim
from torch.distributed.optim import DistributedOptimizer

def random_tensor() :
    return torch.rand((3.3), requires_grad=True)

def _run_process(rank, dst_rank, world_size) :
    name = "worker{}".format(rank)
    dst_name = "worker{}".format(dst_rank)

    # Initialize RPC.
    rpc.init_rpc(
        name=name,
        rank=rank,
        world_size=world_size
    )

    # Use a distributed autograd context.
    with dist_autograd.context() as context_id: The local optimizer will store gradients in the relevant context
        # Forward pass (create references on remote nodes).
        rref1 = rpc.remote(dst_name, random_tensor) Create a random_tensor at the far end
        rref2 = rpc.remote(dst_name, random_tensor) Create a random_tensor at the far end
        loss = rref1.to_here() + rref2.to_here() # get the list of remote parameters to optimize (' RRef ')

        # Backward pass (run distributed autograd).
        dist_autograd.backward([loss.sum()])

        # Build DistributedOptimizer.
        dist_optim = DistributedOptimizer( The distributed Optimizer creates instances of its local Optimizer on each worker node and will hold rrefs for these local optimizers.
        optim.SGD,
        [rref1, rref2],
        lr=0.05.)# Run the distributed optimizer step.
        dist_optim.step()

def run_process(rank, dst_rank, world_size) :
    _run_process(rank, dst_rank, world_size)
    rpc.shutdown()

processes = []

# Run world_size workers.
world_size = 2
for i in range(world_size):
    p = mp.Process(target=run_process, args=(i, (i + 1) % 2, world_size))
    p.start()
    processes.append(p)

for p in processes:
    p.join()
Copy the code

4.3 define

The DistributedOptimizer takes remote references to the parameters scattered over workers, and then runs the optimizer locally for those parameters.

For a single worker, if it received from the same or different client ~ torch. Distributed. Optim. DistributedOptimizer. Step concurrent invocations, these calls will be conducted on the worker serial, Because each worker’s optimizer can only handle one set of gradients at a time.

The DistributedOptimizer definition doesn’t really see much, because of the Python language nature, we can’t see class member variables in a uniform place, but there is one functional_optim_map that deserves our attention. Here, each built-in optimizer is configured with a new optimizer, such as optim.Adagrad for _FunctionalAdagrad. Let’s choose a new optimizer.

class DistributedOptimizer:
    """
    DistributedOptimizer takes remote references to parameters scattered
    across workers and applies the given optimizer locally for each parameter.

    This class uses :meth:`~torch.distributed.autograd.get_gradients` in order
    to retrieve the gradients for specific parameters.

    Concurrent calls to
    :meth:`~torch.distributed.optim.DistributedOptimizer.step`,
    either from the same or different clients, will
    be serialized on each worker -- as each worker's optimizer can only work
    on one set of gradients at a time. However, there is no guarantee that
    the full forward-backward-optimizer sequence will execute for one client
    at a time. This means that the gradients being applied may not correspond
    to the latest forward pass executed on a given worker. Also, there is no
    guaranteed ordering across workers.

    `DistributedOptimizer` creates the local optimizer with TorchScript enabled
    by default, so that optimizer updates are not blocked by the Python Global
    Interpreter Lock (GIL) in the case of multithreaded training (e.g. Distributed
    Model Parallel). This feature is currently enabled for most optimizers. You
    can also follow `the recipe`__ in PyTorch tutorials to enable TorchScript support
    for your own custom optimizers.

    Args:
        optimizer_class (optim.Optimizer): the class of optimizer to
            instantiate on each worker.
        params_rref (list[RRef]): list of RRefs to local or remote parameters
            to optimize.
        args: arguments to pass to the optimizer constructor on each worker.
        kwargs: arguments to pass to the optimizer constructor on each worker.
        
    """
    
    # dict to map a user passed in optimizer_class to a functional
    # optimizer class if we have already defined inside the
    # distributed.optim package, this is so that we hide the
    # functional optimizer to user and still provide the same API.
    functional_optim_map = {
        optim.Adagrad: _FunctionalAdagrad,
        optim.Adam: _FunctionalAdam,
        optim.AdamW: _FunctionalAdamW,
        optim.SGD: _FunctionalSGD,
        optim.Adadelta: _FunctionalAdadelta,
        optim.RMSprop: _FunctionalRMSprop,
        optim.Rprop: _FunctionalRprop,
        optim.Adamax: _FunctionalAdamax,
    }        
Copy the code

4.3.1 _FunctionalSGD

Optim. SGD corresponds to _FunctionalSGD. The code is located in the torch/distributed/optim/functional_sgd py. To define a functional SGD optimizer compatible with TorchScript, PyTorch will use these optimizers functionally. PyTorch does not use param.grad when updating parameters, but explicitly allows the distributed optimizer to pass gradients to the step function. Note: This optimizer should only be used internally by distributed optimizers and not exposed to users.

import torch.optim._functional as F

# Define a TorchScript compatible Functional SGD Optimizer
# where we use these optimizer in a functional way.
# Instead of using the `param.grad` when updating parameters,
# we explicitly allow the distributed optimizer pass gradients to
# the `step` function. In this way, we could separate the gradients
# and parameters and allow multithreaded trainer to update the
# parameters without data traces on accumulating to the same .grad.
# NOTE: This should be only used by distributed optimizer internals
# and not meant to expose to the user.
@torch.jit.script
class _FunctionalSGD(object) :
    def __init__(
        self,
        params: List[Tensor],
        lr: float = 1e-2,
        momentum: float = 0.0,
        dampening: float = 0.0,
        weight_decay: float = 0.0,
        nesterov: bool = False
    ) :
        self.defaults = {
            "lr": lr,
            "momentum": momentum,
            "dampening": dampening,
            "weight_decay": weight_decay,
        }
        self.nesterov = nesterov
        self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {})

        # NOTE: we only have one param_group and don't allow user to add additional
        # param group as it's not a common use case.
        self.param_group = {"params": params}

    def step(self, gradients: List[Optional[Tensor]]) :
        params = self.param_group['params']
        grads = []
        momentum_buffer_list: List[Optional[Tensor]] = []
        lr = self.defaults['lr']
        weight_decay = self.defaults['weight_decay']
        momentum = self.defaults['momentum']
        dampening = self.defaults['dampening']

        for param, gradient in zip(params, gradients):
            if gradient is not None:
                grads.append(gradient)

                if param not in self.state:
                    self.state[param] = {}

                state = self.state[param]
                if 'momentum_buffer' not in state:
                    momentum_buffer_list.append(None)
                else:
                    momentum_buffer_list.append(state['momentum_buffer'])

        with torch.no_grad():
            F.sgd(params,
                  grads,
                  momentum_buffer_list,
                  weight_decay=weight_decay,
                  momentum=momentum,
                  lr=lr,
                  dampening=dampening,
                  nesterov=self.nesterov)

        # update momentum_buffers in state
        for i, p in enumerate(params):
            state = self.state[p]
            momentum_buffer = momentum_buffer_list[i]
            if momentum_buffer is not None:
                state['momentum_buffer'] = momentum_buffer
Copy the code

4.4 the initialization

4.4.1 initialization

This code mainly corresponds to: the distributed Optimizer creates instances of its local Optimizer on each worker node and will hold rrefs for these local optimizers. In our sample code, params_rref is a list of parameters that need to be optimized, one for each. That is, the DistributedOptimizer generates optimizers on all nodes. Store in self.remote_optimizers as rpc.rref (_LocalOptimizer).

def __init__(self, optimizer_class, params_rref, *args, **kwargs) :
    per_worker_params_rref = defaultdict(list)
    for param in params_rref: # 
        per_worker_params_rref[param.owner()].append(param) # [owner] = param

    Get the corresponding local optimizer class
    if optimizer_class in DistributedOptimizer.functional_optim_map and jit._state._enabled:
        optim_ctor = DistributedOptimizer.functional_optim_map.get(optimizer_class)
    else: optim_ctor = optimizer_class self.is_functional_optim = (optim_ctor ! = optimizer_class)if self.is_functional_optim:
        optimizer_new_func = _new_script_local_optimizer
    else:
        optimizer_new_func = _new_local_optimizer # This will be explained below

    remote_optim_futs = []
    for worker, param_rrefs in per_worker_params_rref.items():
        remote_optim_rref_fut = rpc.rpc_async(
            worker, # Generate its local optimizer on top of the worker
            optimizer_new_func, # rpc_async is called
            args=(optim_ctor, param_rrefs) + args,
            kwargs=kwargs,
        )
        remote_optim_futs.append(remote_optim_rref_fut)

    self.remote_optimizers = _wait_for_all(remote_optim_futs) # locally saved optimizer on each node at the remote end
Copy the code

4.4.2 Generating the localOptimizer

_new_local_optimizer generates _LocalOptimizer.

def _new_local_optimizer(optim_cls, local_params_rref, *args, **kwargs) :
    return rpc.RRef(
        _LocalOptimizer(optim_cls, local_params_rref, *args, **kwargs))
Copy the code

_LocalOptimizer is a local optimizer that runs on top of the remote worker node, and the master owns proxies for these optimizers.

class _LocalOptimizer(object) :
    # Ideally we would only need to share a lock for instances of
    # _LocalOptimizer that deal with the same parameters. We are
    # making a simplifying assumption here that if there is more
    # than one instance of _LocalOptimizer per worker, they will
    # be optimizing the same parameters (e.g. each data parallel
    # trainer will create its own instance of _LocalOptimizer but
    # they will all optimize the same parameters on each worker)
    global_lock = Lock()

    def __init__(self, optim_cls, local_params_rref, *args, **kwargs) :
        self._local_params = [rref.local_value() for rref in local_params_rref]
        self.optim = optim_cls( The optimizer is still a normal optimizer, because the optimizer code is the same as before, except that the optimized parameter object is a remote node parameter
            self._local_params, Initialize with a parameter proxy
            *args,
            **kwargs)

    def step(self, autograd_ctx_id) :
        Get the gradient calculated in the distribution context
        all_local_grads = dist_autograd.get_gradients(autograd_ctx_id)

        with _LocalOptimizer.global_lock:
            for param, grad in all_local_grads.items():
                param.grad = grad
            self.optim.step() # Parameter optimization
Copy the code

4.4.3 Wait Complete

Wait for the asynchrony to complete with _wait_for_all.

def _wait_for_all(rpc_futs) :
    # TODO: improve error propagation
    exception = None
    results = []
    for fut in rpc_futs:
        try:
            results.append(fut.wait())
        except Exception as e:
            results.append(e)
            exception = e
    if exception is not None:
        raise exception
    return results
Copy the code

The corresponding logic is as follows:

  • Ref1, ref2 are remote parameters to be optimized, both torch. Rand ((3, 3)).
  • Optim_rref1 and optim_rref2 are rrefs of the local optimizer on Node 2 and Node 3, respectively.
                                                      +----------------------------------+
+--------------------------------------------+        | Node 2                   worker 1|
| Node 1                              master |        |                                  |
|                                            |        |    +--------------------------+  |
|                                            |        |    | _LocalOptimizer          |  |
|  +---------------------------------+       |        |    |                          |  |
|  | DistributedOptimizer            |       |        |    |                          |  |
|  |                                 |       |        |    |   optim = _FunctionalSGD |  |
|  |                                 |       |        |    |                          |  |
|  |     remote_optimizers = [       |       |        |    |   _local_params = rref1  |  |
|  |                optim_rref1 +------------------------> |                     +    |  |
|  |                ,                |       |        |    |                     |    |  |
|  |                optim_rref2 +-------+    |        |    +--------------------------+  |
|  |                ]                |  |    |        |                          |       |
|  |                                 |  |    |        |                          v       |
|  |                                 |  |    |   +-------------->   torch.rand((3.3) | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | | +-----------------------------------+ | | | | | Node3                   worker 2 |
|                                       |    |   |    |                                   |
|                                       |    |   |    |     +--------------------------+  |
|                                       |    |   |    |     | _LocalOptimizer          |  |
|                                       |    |   |    |     |                          |  |
|                                       +-----------------> |                          |  |
|                                            |   |    |     |   optim = _FunctionalSGD |  |
|                                            |   |    |     |                          |  |
|                             rref1 +------------+    |     |   _local_params = rref2  |  |
|                                            |        |     |                     +    |  |
|                                            |        |     |                     |    |  |
|                             rref2 +------------+    |     +--------------------------+  |
|                                            |   |    |                           |       |
|                                            |   |    |                           |       |
|                                            |   |    |                           v       |
|                                            |   +--------------->   torch.rand((3.3) | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +Copy the code

4.5 Updating Parameters

When optimizing, the DistributedOptimizer iterates over the saved optimizers, calling _local_Optimizer_step one by one.

Why can these remote optimizers be called uniformly on Node 1? Because the next round of forward propagation can only be called after all parameters have been updated, it can be called uniformly and wait for everything to complete.

def step(self, context_id) :
    """ Performs a single optimization step. This will call :meth:`torch.optim.Optimizer.step` on each worker containing parameters to be optimized, and will block until all workers return. The provided ``context_id`` will be used to retrieve the corresponding :class:`~torch.distributed.autograd.context` that contains the gradients that should be applied to the parameters. Args:  context_id: the autograd context id for which we should run the optimizer step. """
    dist_autograd._is_valid_context(context_id)

    if self.is_functional_optim:
        optimizer_step_func = _script_local_optimizer_step
    else:
        optimizer_step_func = _local_optimizer_step # 

    rpc_futs = []
    for optimizer in self.remote_optimizers: # traversal _LocalOptimizer
        rpc_futs.append(rpc.rpc_async( Asynchronous remote invocation
            optimizer.owner(),
            optimizer_step_func, # call one by one
            args=(optimizer, context_id),
        ))
    _wait_for_all(rpc_futs)
Copy the code

4.5.1 Local Optimization

_local_optimizer_step simply takes _LocalOptimizer and calls its step.

def _local_optimizer_step(local_optim_rref, autograd_ctx_id) :
    local_optim = local_optim_rref.local_value()
    local_optim.step(autograd_ctx_id)
Copy the code

Step of _LocalOptimizer first takes a distributed gradient and then uses this gradient for parameter optimization.

class _LocalOptimizer(object) :

    def step(self, autograd_ctx_id) :
        Get the gradient calculated in the distribution context
        all_local_grads = dist_autograd.get_gradients(autograd_ctx_id)

        with _LocalOptimizer.global_lock:
            for param, grad in all_local_grads.items():
                param.grad = grad
            self.optim.step() # Parameter optimization
Copy the code

4.5.2 Obtaining distributed gradient

The Python code for get_gradients doesn’t really make sense.

def get_gradients(context_id) : # real signature unknown; restored from __doc__
    """ get_gradients(context_id: int) -> Dict[Tensor, Tensor] Retrieves a map from Tensor to the appropriate gradient for that Tensor accumulated in the provided context corresponding to the given ``context_id`` as part of the distributed autograd backward pass. Arguments: context_id(int): The autograd context id for which we should retrieve the gradients. Returns: A map where the key is the Tensor and the value is the associated gradient for that Tensor. Example:: >>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2]) """
    return {}
Copy the code

The is located in the torch of the corresponding c + + / / jit/runtime/CSRC register_distributed_ops. CPP. Is the function that calls the context.

// Implementations located in
// torch/csrc/jit/runtime/register_distributed_ops.cpp
TORCH_LIBRARY_IMPL(aten, CatchAll, m) {
  m.impl("get_gradients"[] (int64_t context_id) {
    const auto& autogradContext =
        dist_autograd::DistAutogradContainer::getInstance().retrieveContext(
            context_id);
    return autogradContext->getGradients(a);/ / context
  });
}
Copy the code

The getGradients code for the C++ world looks like this:

const c10::Dict<torch::Tensor, torch::Tensor> DistAutogradContext::
    getGradients(a)const {
  std::lock_guard<std::mutex> guard(lock_);
  // block current streams before accessing gradients to make sure that
  // gradient computations are finished before use.
  for (auto& entry : gradReadyEvents_) {
    auto& event = entry.second;
    event.block(impl_.getStream(event.device()));
  }
  return accumulatedGrads_; // The distributed gradient accumulates here
}
Copy the code

In the torch/CSRC/distributed/autograd/context/context. H of are:

// DistAutogradContext which stores information for a single distributed
// autograd pass on a worker.
class TORCH_API DistAutogradContext {
  // Gradients accumulated in this context so far. The key is the variable on
  // which the gradient needs to be accumulated and the value is the gradient
  // that needs to be accumulated on that variable..
  c10::Dict<torch::Tensor, torch::Tensor> accumulatedGrads_;
Copy the code

So our logical expansion is as follows:

  1. The DistributedOptimizer calls the step methods of optim_rref1 and optim_rref2 to run on the remote worker to optimize.
  2. Above Worker 1 and Worker 2_LocalOptimizerRespectively get on the local_local_params_Optimize.
  3. Optimization results in_Node DistAutogradContextIn theaccumulatedGrads_Accumulation.

In this way, the sub-models of the entire model are trained/optimized in uniform steps on each Node.

                                                   +--------------------------------------+
                                                   | Node 2                      worker 1 |
                                                   |                                      |
                                                   |    +--------------------------+      |
                                                   |    | DistAutogradContext      |      |
                                                   |    |                          |  3   |
                                                   |    |     accumulatedGrads_ <------+  |
+-----------------------------------------+        |    |                          |   |  |
| Node 1master | | +--------------------------+ | | | | | +--------------------------+ | | | +-------------------------------+ |  +--------> | _LocalOptimizer | | | | | DistributedOptimizer | | | | | | | | | | | | | | | optim = _FunctionalSGD | | | | | | | | | | | | | | | remote_optimizers = [ | | | | | _local_params = rref1 | | | | | optim_rref1 +---------------+ | + | | | | | |, | | +---------> step() +-------------------+ | | | optim_rref2 +-------+ | | | | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | |] + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- > + |2 |           |
| |                          |    |  |    |        |                          v           |
| |                          |    |  |    |   +----------------> torch.rand((3.3) | | |1| | | | | | | | | step() { | | | | | +--------------------------------------+ | | | | | | | | | optim_rref1.step()+--+ |  | | | +--------------------------------------+ | | | | | | | Node3                      worker 2 |
| |     optim_rref2.step()+--+    |  |    |   |    |                                      |
| |                          |    |  |    |   |    |     +--------------------------+     |
| |   }                      |    |  |    |   |    |     | _LocalOptimizer          |     |
| |                          |    |  |    |   |    |     |                          |     |
| +-------------------------------+  +-----------------> |                          |     |
|                            |            |   |    |     |   optim = _FunctionalSGD |     |
|                            |            |   |    |     |                          |     |
|                          1 |            |   |    |     |   _local_params = rref2  |     |
|                            |            |   |    |     |                     +    |  3  |
|                            +-----------------------------> step() +------------------v  |
|                                         |   |    |     |                     |    |  |  |
|                         rref1 +-------------+    |     +--------------------------+  |  |
|                                         |        |                        2  |       |  |
|                                         |        |                           v       |  |
|                         rref2 +-------------------------------> torch.rand((3.3))   |  |
|                                         |        |                                   |  |
+-----------------------------------------+        |     +--------------------------+  |  |
                                                   |     | DistAutogradContext      |  |  |
                                                   |     |                          |  |  |
                                                   |     |     accumulatedGrads_ <-----+  |
                                                   |     |                          |     |
                                                   |     +--------------------------+     |
                                                   +--------------------------------------+
Copy the code

0x05 PipeDream optimizer

Finally, let’s take a look at PipeDream and see how it implements the distributed optimizer.

  • Since PipeDream starts all the code on each worker, how does each local optimizer determine the parameters it wants to optimize?
  • How to update parameters during optimization?

5.1 How do I determine optimization parameters

Let’s talk about it in advance:

  • Each node has a different module, so the parameters to be optimized for each optimizer are local module parameters.
  • Each node optimizes its own module.

We need to start from scratch.

5.1.1 the main method

Came to the runtime/translation/main_with_runtime. Py. Here you first build a StageRuntime and then use the parameters of StageRuntime to build the optimizer.

def main() :
    r = runtime.StageRuntime(
        model=model, distributed_backend=args.distributed_backend,
        fp16=args.fp16, loss_scale=args.loss_scale,
        training_tensor_shapes=training_tensor_shapes,
        eval_tensor_shapes=eval_tensor_shapes,
        training_tensor_dtypes=dtypes,
        inputs_module_destinations=inputs_module_destinations,
        target_tensor_names=target_tensor_names,
        configuration_maps=configuration_maps,
        master_addr=args.master_addr,
        rank=args.rank, local_rank=args.local_rank,
        num_ranks_in_server=args.num_ranks_in_server,
        verbose_freq=args.verbose_frequency,
        model_type=runtime.TRANSLATION,
        enable_recompute=args.recompute)
    
    if use_adam_optimizer:
        optimizer = adam.AdamWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, betas=(0.9.0.999),
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
            macrobatch=args.macrobatch)
    else:
        optimizer = sgd.SGDWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, momentum=args.momentum,
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)    
Copy the code

5.1.2 build the runtime

StageRuntime’s Initialize function builds modules, which use the node’s stages to build their own modules.

We excerpted from the previous article.

Stage_to_module_map sets the stage-to-modules relationship in order to get the modules corresponding to this stage.

This stage (value 3) corresponds to two modules with index 3 and 4, namely,3 and 3 below.

module_to_stage_map = {list: 5} [0.1.2.3.3]
Copy the code

The specific code is:

def initialize(self, model, inputs_module_destinations, configuration_maps, master_addr, rank, local_rank, num_ranks_in_server) :
  
        if module_to_stage_map is None:
            self.modules_with_dependencies = ModulesWithDependencies(model)
        else:
            Use this stage to find your modules.
            modules = stage_to_module_map[self.stage]
            self.modules_with_dependencies = ModulesWithDependencies(
                [model[module] for module in modules])
        
        Determine which model layers
        modules = self.modules_with_dependencies.modules()            

        Get master_parameters and model_parameters
        if self.fp16:
            self.master_parameters = []
            self.model_parameters = []
            for i in range(len(modules)):
                import apex.fp16_utils as fp16_utils
                module_parameters, module_master_parameters = \
                    fp16_utils.prep_param_lists(modules[i])
                self.master_parameters.extend(module_master_parameters)
                self.model_parameters.extend(module_parameters)
        else:
            self.master_parameters = list(self.parameters())
            self.model_parameters = None     
            
            
Copy the code

For example, the model is assigned to two nodes, each with two layers, where node 2 has a DDP data parallelism.

The model parameters of each Node are different. The parameters of Node 1 to be optimized are the parameters of Layer 1 and Layer 2. The parameters to be optimized for Node 2 are the parameters of Layer 3 and Layer 4.

                                              Node 2
                                              +----------------------------------------+
                                              | Stage 2                   StageRuntime |
                                              |                                        |
Node 1                                        |           CommunicationHandler         |
+---------------------------------------+     |                                        |
| Stage 1        StageRuntime           |     |      +----------------------------+    |
|                                       |     |      | +------------------------+ |    |
|                                       |     |      | |Rank 2                  | |    |
|         CommunicationHandler          |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|      +-----------------------+        |     |      | |  Layer 3 +---> Layer 4 | |    |
|      |Rank 1                 |        |     |      | |                        | |    |
|      |                       |        |     | DDP  | |                        | |    |
|      | Layer 1 +---> Layer 2 |        +----------->+ +------------------------+ |    |
|      |                       |        |     |      | +------------------------+ |    |
|      |                       |        |     |      | |Rank 3                  | |    |
|      +-----------------------+        |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|   master_parameters = Parameters(     |     |      | |  Layer 3 +---> Layer 4 | |    |
|                   Layer 1, Layer 2) | | | | | | | | | | | | | | | | model_parameters | | | +------------------------+ | | | | | +----------------------------+ | +---------------------------------------+ | | | | | master_parameters = Parameters( | |  Layer3, Layer 4) |
                                              |                                        |
                                              |                                        |
                                              |  model_parameters                      |
                                              |                                        |
                                              +----------------------------------------+
Copy the code

5.1.3 SGDWithWeightStashing

Then SGDWithWeightStashing is built with Runtime master_parameters and model_parameters.

OptimizerWithWeightStashing is SGDWithWeightStashing base class.

class SGDWithWeightStashing(OptimizerWithWeightStashing) : # base class
    """ SGD optimizer with weight stashing. """
    def __init__(self, modules, master_parameters, model_parameters,
                 loss_scale, num_versions, lr=required, momentum=0,
                 dampening=0, weight_decay=0, nesterov=False, verbose_freq=0,
                 macrobatch=False) :
        super(SGDWithWeightStashing, self).__init__(
            optim_name='SGD',
            modules=modules, master_parameters=master_parameters,
            model_parameters=model_parameters, loss_scale=loss_scale,
            num_versions=num_versions, lr=lr, momentum=momentum,
            dampening=dampening, weight_decay=weight_decay,
            nesterov=nesterov, verbose_freq=verbose_freq,
            macrobatch=macrobatch,
        )
Copy the code

Base class OptimizerWithWeightStashing generates a native optimizer, assignment in base_optimizer.

class OptimizerWithWeightStashing(torch.optim.Optimizer) :
    """Wrapper class that adds weight stashing to a vanilla torch.optim.Optimizer. Arguments: - optim_name: the name of optimizer, required to create the corresponding base_optimizer (torch.optim.{optim_name}). - optimizer_args: the keyword arguments passed to base_optimizer. """

    def __init__(self, optim_name, modules, master_parameters, model_parameters,
                 loss_scale, num_versions, verbose_freq=0, macrobatch=False,
                 **optimizer_args) :
        self.modules = modules
        self.master_parameters = master_parameters
        self.model_parameters = model_parameters  # model_parameters is None if not fp16.
        self.loss_scale = loss_scale

        # Only need at most 2 versions if using macrobatching.
        if macrobatch:
            num_versions = min(2, num_versions)
        self.num_versions = num_versions
        
        Generate a native optimizer
        self.base_optimizer = getattr(torch.optim, optim_name)(
            master_parameters, **optimizer_args)
        self.latest_version = Version()
        self.current_version = Version()
        self.initialize_queue()
        self.verbose_freq = verbose_freq
        self.batch_counter = 0

        # If macrobatching, push and pop versions at the right rate.
        if macrobatch:
            self.update_interval = self.num_versions
        else:
            self.update_interval = 1
Copy the code

The logic extends as follows: each optimizer uses its own Node parameters to optimize.

                                              +----------------------------------------+
                                              | Stage 2                   StageRuntime |
                                              |                                        |
                                              |           CommunicationHandler         |
+---------------------------------------+     |                                        |
| Stage 1        StageRuntime           |     |      +----------------------------+    |
|                                       |     |      | +------------------------+ |    |
|                                       |     |      | |Rank 2                  | |    |
|         CommunicationHandler          |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|      +-----------------------+        |     |      | |  Layer 3 +---> Layer 4 | |    |
|      |Rank 1                 |        |     |      | |                        | |    |
|      |                       |        |     | DDP  | |                        | |    |
|      | Layer 1 +---> Layer 2 |        +----------->+ +------------------------+ |    |
|      |                       |        |     |      | +------------------------+ |    |
|      |                       |        |     |      | |Rank 3                  | |    |
|      +-----------------------+        |     |      | |                        | |    |
|                                       |     |      | |                        | |    |
|   master_parameters = Parameters(     |     |      | |  Layer 3 +---> Layer 4 | |    |
|                   Layer 1, Layer 2)   |     |      | |                        | |    |
|                             +         |     |      | |                        | |    |
|   model_parameters          |         |     |      | +------------------------+ |    |
|                             |         |     |      +----------------------------+    |
|  +---------------------------------+  |     |                                        |
|  |SGDWithWeightStashing     |      |  |     |                                        |
|  |                          |      |  |     |  master_parameters = Parameters(       |
|  |   base_optimizer = SGB(  v      |  |     |                      Layer 3, Layer 4) |
|  |              master_parameters) |  |     |                               +        |
|  |                                 |  |     |  model_parameters             |        |
|  +---------------------------------+  |     |                               |        |
|                                       |     |  +----------------------------------+  |
+---------------------------------------+     |  |SGDWithWeightStashing       |     |  |
                                              |  |                            |     |  |
                                              |  |      base_optimizer = SGB( v     |  |
                                              |  |               master_parameters) |  |
                                              |  +----------------------------------+  |
                                              |                                        |
                                              +----------------------------------------+
Copy the code

5.2 optimization

5.2.2 Overall optimization

The whole thing is running asynchronously, which is asynchronous optimization.

def train(train_loader, r, optimizer, epoch) :

  	# omit others
    
    # start num_warmup_minibatches forward passes
    for i in range(num_warmup_minibatches):
        r.run_forward()

    for i in range(n - num_warmup_minibatches):
        # perform forward pass
        r.run_forward()

        # perform backward pass
        if args.fp16:
            r.zero_grad()
        else:
            optimizer.zero_grad()
        optimizer.load_old_params()

        r.run_backward()
        optimizer.load_new_params()
        optimizer.step()

    # finish remaining backward passes
    for i in range(num_warmup_minibatches):
        optimizer.zero_grad()
        optimizer.load_old_params()
        r.run_backward()
        optimizer.load_new_params()
        optimizer.step()

    # wait for all helper threads to complete
    r.wait()
Copy the code

5.2.2 Optimizer optimization

Optimized directly using SGDWithWeightStashing’s step method. The last and the class OptimizerWithWeightStashing (torch. Optim. Optimizer) step method.

def step(self, closure=None) :
    """Performs a single optimization step. Arguments: closure (callable, optional): A closure that reevaluates the model and returns the loss. """
    # Update the gradient every `update_interval` steps.
    ifself.batch_counter % self.update_interval ! = self.update_interval -1:
        self.batch_counter += 1
        return None

    if self.model_parameters is not None:
        import apex.fp16_utils as fp16_utils
        fp16_utils.model_grads_to_master_grads(self.model_parameters,
                                               self.master_parameters)
        ifself.loss_scale ! =1.0:
            # Processing gradient
            for parameter in self.master_parameters:
                parameter.grad.data = parameter.grad.data / self.loss_scale

    for p in self.param_groups[0] ['params'] :if p.grad is not None: # Continue with the accumulated gradient
            p.grad.div_(self.update_interval)

    loss = self.base_optimizer.step() # optimize
    if self.model_parameters is not None:
        import apex.fp16_utils as fp16_utils
        fp16_utils.master_params_to_model_params(self.model_parameters,
                                                 self.master_parameters)
    self.latest_version = self.latest_version.incr()
    if self.num_versions > 1:
        self.buffered_state_dicts = self.queue[0] [0]
        self.queue.append(self.get_params(clone=False))

    self.batch_counter += 1
    return loss
Copy the code

Details are as follows:

                                               Node 2
                                               +-----------------------------------------+
                                               | Stage 2                    StageRuntime |
                                               |                                         |
Node 1                                         |           CommunicationHandler          |
+-----------------------------------------+    |                                         |
| Stage 1                    StageRuntime |    |      +----------------------------+     |
|                                         |    |      | +------------------------+ |     |
|                                         |    |      | |Rank 2                  | |     |
|          CommunicationHandler           |    |      | |                        | |     |
|                                         |    |      | |                        | |     |
|       +-----------------------+         |    |      | |  Layer 3 +---> Layer 4 | |     |
|       |Rank 1                 |         |    |      | |                        | |     |
|       |                       |         |    | DDP  | |                        | |     |
|       | Layer 1 +---> Layer 2 |         +---------->+ +------------------------+ |     |
|       |                       |         |    |      | +------------------------+ |     |
|       |                       |         |    |      | |Rank 3                  | |     |
|       +-----------------------+         |    |      | |                        | |     |
|                                         |    |      | |                        | |     |
|  master_parameters = Parameters(        |    |      | |  Layer 3 +---> Layer 4 | |     |
|                  Layer 1, Layer 2)      |    |      | |                        | |     |
|                                +        |    |      | |                        | |     |
|  model_parameters              |        |    |      | +------------------------+ |     |
|                                |        |    |      +----------------------------+     |
|  step()                        |        |    |                                         |
|   +                            |        |    |                                         |
|   |                            |        |    |  master_parameters = Parameters(        |
|   |  +-------------------------------+  |    |                      Layer 3, Layer 4) | | | |SGDWithWeightStashing | | | | + | | | | | | | | model_parameters | | | | | base_optimizer = SGB( v | | | | | | | | master_parameters) | | | step() | | | | | | | | + | | | +----> base_optimizer.step() | | | | | | | | | | | | +-------------------------------+ | | +-------------------------------+ | | | |SGDWithWeightStashing | | | | | | | | | |  | +-----------------------------------------+ | | | base_optimizer = SGB( v | | | | | master_parameters) | | | | | | | | +------> base_optimizer.step() | | | | | | | +-------------------------------+ | | | +-----------------------------------------+Copy the code

The Distributed optimizer series is now complete, and we will introduce the PyTorch ZeroRedundancyOptimizer in a follow-up analysis of ZeRO, probably in a few weeks. We’ll start with the next article, which will introduce some examples of official documentation for PyTorch distribution, in order to put the whole logic of PyTorch distribution together and see how it can be used in practice.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Torch. Optim. Optimizer source code read and flexible use

Optimizer is the optimizer principle

Pytorch Optimizer (Optim) operates with different parameter groups and different learning rate Settings

Pytorch momentum, momentum

Various optimization methods to summarize comparison (SGD/momentum/Nesterov/adagrad/adadelta)

Optimizer algorithm and PyTorch implementation (1) : indelible SGD

The PyTorch optimizer is introduced using Optim.sgd as an example

—- Optimizer (SGD, Adam)

Optimizing neural networks and optimizer selection using Torch. Optim in PyTorch — PyTorch Chinese

Pytorch optimizer: SGD

Let’s talk about GPU communication

developer.nvidia.com/gpudirect

www.nvidia.cn/data-center…

www.nvidia.cn/data-center…