0 x00 the

In the previous articles, we looked at the basic modules of PyTorch’s distributed logic. The next few articles will take a look at how to put these modules into practice and give a general overview of PyTorch’s distributed logic. This article describes how to use RPC to accomplish distributed pipe parallelism.

This article is based on the translation of DISTRIBUTED PIPELINE PARALLELISM USING RPC, adding their own understanding.

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

PyTorch Distributed Autograd (4) —-

PyTorch Distributed Autograd (5) —-

PyTorch Distributed Autograd (6) —-

PyTorch Distributed optimizer (1)—- cornerstone

PyTorch Distributed optimizer (2)—- Data parallel optimizer

PyTorch Distributed optimizer (3)—- model parallel

PyTorch Distributed (14) — Use Distributed Autograd and Distributed Optimizer

PyTorch distributed (15) — using a distributed RPC framework to implement parameter server

PyTorch distributed (16) — Use asynchronous execution to implement batch RPC

PyTorch distributed (17) — combined with DDP and distributed RPC framework

Note: This article is not translated exactly in the order of the original, but reorganized according to my own understanding. The original text was analyzed from bottom to top, from details to the whole, but I always felt awkward in understanding, lacking an overall perception, so we still analyzed with logic from top to bottom and with legend.

0 x01 review

1.1 Prerequisites

This tutorial uses the Resnet50 model to demonstrate distributed pipeline parallelism using the torch.distributed. RPC API. This can be seen as a distributed counterpart to the multi-GPU pipeline parallelism discussed in stand-alone model parallelism best practices.

Prerequisites for this article are as follows:

  • PyTorch Distributed overview
  • Single machine model parallel best practices
  • Introduction to distributed RPC framework
  • RRef auxiliary functions: rref.rpc_sync (), rref.rpc_async (), and rref.remote ()

Pay attention to

  • PyTorch V1.6.0 or higher is required for this tutorial.

  • The full source code for this tutorial can be found at PyTorch/Examples.

1.2 Basic Knowledge

The previous tutorial introduction to the Distributed RPC Framework showed how to implement distributed model parallelism for the RNN model using Torch.distributed. RPC. The tutorial uses a GPU to host the EmbeddingTable, and the provided code works fine. However, if the model exists on multiple Gpus, some additional steps are required to improve the amortization utilization of all Gpus. Pipeline parallelism is a paradigm that can help in this case.

In this tutorial, we use ResNet50 as the example model, which is also used in the stand-alone model parallel best practices tutorial. Similarly, the ResNet50 model is divided into two shards, and the input batches are divided into multiple shards and pipelined into the two model shards. The difference is that instead of executing in parallel using CUDA streams, this tutorial calls asynchronous RPC. Therefore, the solutions presented in this tutorial also work across machine boundaries. The rest of the tutorial covers implementation in four steps.

0 x02 start

The following code shows the target function for all processes, and run_worker is run on all nodes, but its execution code is different.

  • The main logic is defined inrun_masterAmong them, this is the brain and the actual performer of the system.
  • The worker passively waits for a command from the master, so only runsinit_rpcand shutdown.
    • init_rpcJust set up a distributed environment.
    • shutdownBy default, it blocks until all RPC participants have finished working.
    • The specific business work is directly scheduled by the master to run on the worker node through RPC.
def run_worker(rank, world_size, num_split) :
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'

    # Higher timeout is added to accommodate for kernel compilation time in case of ROCm.
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300)

    if rank == 0:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        run_master(num_split)
    else:
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 3
    for num_split in [1.2.4.8]:
        tik = time.time()
        mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True)
        tok = time.time()
        print(f"number of splits = {num_split}, execution time = {tok - tik}")
Copy the code

The logic is as follows:

           torch.multiprocessing.spawn
                      +
                      |
                      |
         +------------+----------------------------+
         |                                         |
         |                                         |
         v                                         v
+--------+---------------------------+      +------+----------+
| "ps"                               |      | f"worker{rank}" |
|                                    |      |                 |
|     rank = 0                       |      |      rank = 1.2 |
|                                    |      |                 |
|     run_worker +----> run_master   |      |      run_worker |
|                                    |      |                 |
+------------------------------------+      +-----------------+
Copy the code

0x03 Define training cycle

Now let’s look at the training loop. We use a dedicated “master “worker to prepare random inputs and labels, and control distributed backpropagation and the distributed optimizer Step.

  • It starts by creating an instance of the DistResNet50 module, specifying the number of microbatches per batch, and also providing the names of two RPC worker threads (that is, “worker1” and “worker2”).

  • We then define the loss function and use parameter_rrefs() to get a parameter list, RRefs, to create the DistributedOptimizer.

  • Finally, the main training loop is very similar to regular local training, except that it is used for dist_autograd to start backpropagation and provides context_id for backpropagation and the optimizer step().

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Run RPC Processes #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

num_batches = 3
batch_size = 120
image_w = 128
image_h = 128


def run_master(split_size) :

    # put the two model parts on worker1 and worker2 respectively
    model = DistResNet50(split_size, ["worker1"."worker2"])
    loss_fn = nn.MSELoss()
    opt = DistributedOptimizer( # Distributed optimizer
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    one_hot_indices = torch.LongTensor(batch_size) \
                           .random_(0, num_classes) \
                           .view(batch_size, 1)

    for i in range(num_batches):
        print(f"Processing batch {i}")
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, image_w, image_h)
        labels = torch.zeros(batch_size, num_classes) \
                      .scatter_(1, one_hot_indices, 1)

        # The distributed autograd context is the dedicated scope for the
        # distributed backward pass to store gradients, which can later be
        # retrieved using the context_id by the distributed optimizer.
        with dist_autograd.context() as context_id:
            outputs = model(inputs)
            dist_autograd.backward(context_id, [loss_fn(outputs, labels)])  # Distributed gradient
            opt.step(context_id)

Copy the code

We will draw the diagram according to the idea of single machine first, and we will expand it in the next section. From a stand-alone point of view, there seems to be nothing unusual.

           torch.multiprocessing.spawn
                      +
                      |
                      |
         +------------+-------------------------------------------+
         |                                                        |
         |                                                        |
         v                                                        v
+--------+----------------------------------------------+  +------+----------+
| "ps"                                                  |  | f"worker{rank}" |
|                                                       |  |                 |
|     rank = 0                                          |  |      rank = 1.2 |
|                                                       |  |                 |
|     run_worker +----> run_master                      |  |      run_worker |
|                           +                           |  |                 |
|                           |                           |  |                 |
|                           |                           |  +-----------------+
|                           v                           |
| +-------------------------+-------------------------+ |
| |                                                   | |
| |                                                   | |
| |  model = DistResNet50(split_size,                 | |
| |                      ["worker1"."worker2"])      | |
| |  loss_fn = nn.MSELoss()                           | |
| |  opt = DistributedOptimizer(                      | |
| |      optim.SGD,                                   | |
| |      model.parameter_rrefs(),                     | |
| |      lr=0.05, | | | |) | | | |for i in range(num_batches):                     | |
| |      with dist_autograd.context() ascontext_id: | | | | outputs = model(inputs) | | | | dist_autograd.backward(context_id, | | | | [loss_fn(outputs, labels)]) | | | | opt.step(context_id) | | | | | | | | | | | +---------------------------------------------------+ | | |  +-------------------------------------------------------+Copy the code

0x04 Splices ResNet50 model into a module

Let’s assume that shard is a black box.

  • First, we create a DistResNet50 module to assemble the two shards and implement pipelining parallel logic. In the constructor, we use two rpc.remote calls to place two shards on two different RPC worker threads and keep the RRef pointing to the two model parts to reference them in forward passing.

  • The Forward function splits the input batch into microbatches and pipelines these microbatches to the two model parts.

    • The first to userpc.remoteThe call applies the first shard to the microbatch and then prints the intermediateRRefForward to the second model shard.
    • Then collect all micro-outputsFutureAnd waits for all micro outputs after the loop.
    • Please note that,remote()andrpc_async()All return immediately and run asynchronously. Therefore, the entire loop is non-blocking and multiple RPCS are started at the same time.
  • The order of execution of a micro-batch on two model parts is maintained by an intermediate output y_rref variable. The order of execution between microbatches is irrelevant.

  • Finally, the forward function concatenates the output of all microbatches into a single output tensor and returns it. The parameter_rrefs function allows us to simplify distributed optimizer building, which we’ll use later. Parameter_rrefs takes parameters from worker 1 and worker 2 for each shard to be optimized. Finally these parameters are passed to the DistributedOptimizer.


class DistResNet50(nn.Module) :
    """ Assemble two parts as an nn.Module and define pipelining logic """
    def __init__(self, split_size, workers, *args, **kwargs) :
        super(DistResNet50, self).__init__()

        self.split_size = split_size

        # Put the first part of the ResNet50 on workers[0]
        self.p1_rref = rpc.remote(
            workers[0].# put it above the first worker
            ResNetShard1,
            args = ("cuda:0",) + args,
            kwargs = kwargs
        )

        # Put the second part of the ResNet50 on workers[1]
        self.p2_rref = rpc.remote(
            workers[1].# put it on the second worker
            ResNetShard2,
            args = ("cuda:1",) + args,
            kwargs = kwargs
        )

    def forward(self, xs) :
        # Split the input batch xs into micro-batches, and collect async RPC
        # futures into a list
        out_futures = []
        for x in iter(xs.split(self.split_size, dim=0)) :# Split the input batch into multiple microbatches
            x_rref = RRef(x) Encapsulate as RRef
            y_rref = self.p1_rref.remote().forward(x_rref) # The first worker handles microbatches
            z_fut = self.p2_rref.rpc_async().forward(y_rref) # The second worker continues processing
            out_futures.append(z_fut)

        # collect and cat all output tensors into one tensor.
        return torch.cat(torch.futures.wait_all(out_futures))

    def parameter_rrefs(self) :
        remote_params = []
        remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())
        remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())
        return remote_params
Copy the code

To demonstrate, we only draw the internal details of worker 1 here. Please remember that worker 1 and worker 2 are the same. Run_master has also been simplified. The pipelining is done in the master’s forward method, as indicated by the arrows 1,2 on the figure.

         torch.multiprocessing.spawn
                    +
                    |
                    |
       +------------+---------------------------------------------------+
       |                                                                |
       |                                                                |
       v                                                                v
+------+----------------------------------------------+         +-------+--------------+
|  "ps"                                     rank = 0  |         |"worker 1"   rank = 1 |
|                                                     |         |                      |
|  run_worker   DistributedOptimizer(p1_rref,p2_rref) |         | run_worker           |
|      +                                              |         |                      |
|      |                                              |         |                      |
|      |        DistResNet50                          |         |      +-------------+ |
|      |                                              |    +-------->  |ResNetShard1 | |
|      v                                              |    |    |      |             | |
|  run_master   p1_rref  +------------------------------------------>  |             | |
|      +                                              |    |    |      +-------+-----+ |
|      |                                              |    |    |              |       |
|      |        p2_rref +-------------------------------+  |    +----------------------+
|      |                                              | |  |                   |
|      |                                              | |  |                   |
|      v                                              | |  |                   |
| +----+--------------------------------------------+ | |  |                   |
| | model = DistResNet50(split_size,                | | |  |                   |
| |                     ["worker1"."worker2") | | | |1                  |2
| | loss_fn = nn.MSELoss()                          | | |  |                   |
| | opt = DistributedOptimizer(                     | | |  |                   |
| |     optim.SGD,                                  | | |  |                   |
| |     model.parameter_rrefs(),                    | | |  |                   |
| | )                                               | | |  |                   v
| | for i in range(num_batches):                    | | |  |    +--------------+--------+
| |     with dist_autograd.context() as context_id: | | |  |    | "worker 2"  rank = 2  |
| |         outputs = model(inputs) +----------------------+    |                       |
| |         dist_autograd.backward(context_id,      | | |       |      +--------------+ |
| |                   [loss_fn(outputs, labels)])   | | +------------> |ResNetShard2  | |
| |         opt.step(context_id)                    | |         |      |              | |
| +-------------------------------------------------+ |         |      +--------------+ |
+-----------------------------------------------------+         +-----------------------+
Copy the code

0x05 Partitions the ResNet50 model

This is the preparatory step that ResNet50 implements in both model shards. The following code is borrowed from the ResNet implementation in TorchVision. The ResNetBase module contains two common building blocks and properties for ResNet Sharding (Shards).

Now we are ready to define two model shards. In the constructor, we simply split all ResNet50 layers into two parts and move each part to the provided device. The forward functions of two shards are as follows:

  • To get an input dataRRef, so that the data can be retrieved locally and then moved to the intended device.
  • After all the layers have been applied to the input, it moves the output to the CPU and returns it. This is because the RPC API requires the tensor to reside on the CPU to avoid invalid device errors when the number of devices in the caller and the called do not match.
import threading
import torch
import torch.nn as nn
from torchvision.models.resnet import Bottleneck

num_classes = 1000

def conv1x1(in_planes, out_planes, stride=1) :
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

class ResNetBase(nn.Module) :
    def __init__(self, block, inplanes, num_classes=1000,
                 groups=1, width_per_group=64, norm_layer=None) :
        super(ResNetBase, self).__init__()

        self._lock = threading.Lock()
        self._block = block
        self._norm_layer = nn.BatchNorm2d
        self.inplanes = inplanes
        self.dilation = 1
        self.groups = groups
        self.base_width = width_per_group

    The auxiliary function is used to build Sequential
    def _make_layer(self, planes, blocks, stride=1) :
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        ifstride ! =1 orself.inplanes ! = planes * self._block.expansion: downsample = nn.Sequential( conv1x1(self.inplanes, planes * self._block.expansion, stride), norm_layer(planes * self._block.expansion), ) layers = [] layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer)) self.inplanes = planes * self._block.expansionfor _ in range(1, blocks):
            layers.append(self._block(self.inplanes, planes, groups=self.groups,
                                      base_width=self.base_width, dilation=self.dilation,
                                      norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def parameter_rrefs(self) :
        r""" Create one RRef for each parameter in the given local module, and return a list of RRefs. """
        return [RRef(p) for p in self.parameters()]


class ResNetShard1(ResNetBase) :
    """ The first part of ResNet. """
    def __init__(self, device, *args, **kwargs) :
        super(ResNetShard1, self).__init__(
            Bottleneck, 64, num_classes=num_classes, *args, **kwargs)

        self.device = device Configure the device
        self.seq = nn.Sequential( # Build Sequential modules
            nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False),
            self._norm_layer(self.inplanes),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            self._make_layer(64.3),
            self._make_layer(128.4, stride=2)
        ).to(self.device) # Put it on the device

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)

    def forward(self, x_rref) :
        x = x_rref.to_here().to(self.device) # Put input on the device
        with self._lock:
            out =  self.seq(x) Apply all layers to the input
        return out.cpu() Output needs to be moved onto the CPU


class ResNetShard2(ResNetBase) :
    """ The second part of ResNet. """
    def __init__(self, device, *args, **kwargs) :
        super(ResNetShard2, self).__init__(
            Bottleneck, 512, num_classes=num_classes, *args, **kwargs)

        self.device = device Configure the device
        self.seq = nn.Sequential( # Build Sequential modules
            self._make_layer(256.6, stride=2),
            self._make_layer(512.3, stride=2),
            nn.AdaptiveAvgPool2d((1.1)),
        ).to(self.device) # Put it on the device

        self.fc =  nn.Linear(512 * self._block.expansion, num_classes).to(self.device)

    def forward(self, x_rref) :
        x = x_rref.to_here().to(self.device) # Put input on the device
        with self._lock:
            out =  self.seq(x) Apply all layers to the input
        return out.cpu() Output needs to be moved onto the CPU
Copy the code

We extend the current logic as follows, here:

  • DistResNet50 is split into two sections, one above worker 1 and one above worker 2.
  • The parameters of both parts are stored on the master via RRef.
  • Ps is the master, it is responsible for driving all the business.
  • Backward propagation is accomplished through distributed optimizer and distributed Autograd.
  • Two workers are simply executed:
    • Responsible for setting up distributed environment and waiting for completion.
    • The specific work is to run directly on the worker by the master through RPC.
  • The pipeline is explicitly configured on top of the master’s forward and completed by an intermediate output, as shown in the arrows of the 1,2 numbers on the figure.
      torch.multiprocessing.spawn
                 +
                 |
                 |
    +------------+---------------------------------------------------+---------------------+
    |                                                                |                     |
    |                                                                |                     |
    v                                                                v                     |
+---+----------------------------------------------+        +--------+------------------+  |
| "ps"                                   rank = 0  |        |"Worker 1" rank = 1 | | | | | | | | run_worker DistributedOptimizer (p1_rref p2_rref) | | run_worker | | | + | | +----------------------+ | | | | | | | ResNetShard1 | | | | | DistResNet50 +--------+-------------------> | | +----------------+ | | | | | | | | | | ResNetBase | | | | | v | | | | | | | | | | run_master p1_rref +--------------------------------------------> parameters()| | | | | + | | | | | | | | | | | | | +-> | | | | | | | | | p2_rref +---------------------------+ | | | | | | | | | | | | | | | | +----------------+ | | | | | | | | | | +----------------------+ | | | | | | | | +---------------------------+ | | | | | | | | | | | +----------------------------+ | | | | | | | | | | | | | | | | | | | v | | | | | | | +---+------------------------------------------+ | | | 1 | |2 | | | model = DistResNet50(split_size, | | | | | | | | | ["worker1","worker2"]) | | | | | | | | | loss_fn = nn.MSELoss() | | | | V v | | | opt = DistributedOptimizer( | | | | +------------+--------------+ | | | optim.SGD, | | | | |"worker 2" rank = 2 | | | | model.parameter_rrefs(), | | | | | | | | | ) | | | | | +--------------------+ | | | | for i in range(num_batches): | | | | | | ResNetShard2 | | | | | with dist_autograd.context() as context_id:| | | | | | +----------------+ | | | | | |  | | | | | | ResNetBase | | +<-+ | | outputs = model(inputs) +---------------------+ | | | | | | | | | | | | | | | | | |  | dist_autograd.backward(context_id, | | +----------------> parameters()| | | | | [loss_fn(outputs, labels)]) | | | | | | | | | | opt.step(context_id) | | | | +----------------+ | | | +----------------------------------------------+ | | +--------------------+ | +--------------------------------------------------+ +---------------------------+Copy the code

The mobile phone is as follows:

Now that the PyTorch official sample articles have been dissected, stay tuned for our next article on flexibility training.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

DISTRIBUTED PIPELINE PARALLELISM USING RPC