0 x00 the

In the previous articles, we looked at the basic modules of PyTorch’s distributed logic. The next few articles will take a look at how to put these modules into practice and give a general overview of PyTorch’s distributed logic. This article shows you how to implement batch RPC using asynchronous execution operations, and you can learn about a new implementation of PyTorch for parameter servers.

Based on the translation of IMPLEMENTING BATCH RPC PROCESSING USING ASYNCHRONOUS EXECUTIONS, this paper added my own understanding.

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

PyTorch Distributed Autograd (4) —-

PyTorch Distributed Autograd (5) —-

PyTorch Distributed Autograd (6) —-

PyTorch Distributed optimizer (1)—- cornerstone

PyTorch Distributed optimizer (2)—- Data parallel optimizer

PyTorch Distributed optimizer (3)—- model parallel

PyTorch Distributed (14) — Use Distributed Autograd and Distributed Optimizer

PyTorch distributed (15) — using a distributed RPC framework to implement parameter server

Note: This article is not translated exactly in the order of the original, but reorganized according to my own understanding.

0 x01 preface

1.1 Prerequisites

Prerequisites for this article are as follows:

  • PyTorch Distributed overview
  • Introduction to distributed RPC framework
  • Parameter server is implemented using distributed RPC framework
  • RPC asynchronously executes the decorator

This tutorial demonstrates how to build batch RPC applications using the @rpc.functions. Async_execution decorator, which helps speed up training by reducing the number of blocked RPC threads and integrating CUDA operations on the called side. This is the same idea as using TorchServer for batch reasoning. Batch RPC helps consolidate actions into fewer CUDA operations, thus amortizing overhead.

Note: This tutorial requires PyTorch V1.6.0 or higher.

1.2 Basic Knowledge

Previous tutorials have shown steps to build a distributed training application using Torch.distributed. RPC, but they did not detail what happens to the called party when an RPC request is processed. Starting with PyTorch V1.5, for each RPC request, the caller starts a thread to execute the function in the request, which blocks until the function returns. This applies to many use cases, but there is one problem: if a user function blocks on IO, for example using nested RPC calls or signals (such as waiting for different RPC requests to unblock), the RPC thread on the caller will have to wait idle until the IO completes or a signal event occurs. Therefore, the thread used by the RPC caller may use more than is needed. The reason for this problem is that RPC treats user functions like a black box and knows very little about what happens in the function. In order for user functions to yield and release RPC threads, more hints need to be provided to the RPC system.

Starting with V1.6.0, PyTorch addresses this problem by introducing two new concepts:

  • Torch.futures.Future encapsulates an asynchronous execution and also supports the installation of callback functions.
  • The @rpc.functions.async_execution decorator, which allows the application to tell the caller that the target function will return a future and can be paused and yield multiple times during execution.

Using both tools, application code can decompose user functions into smaller functions, link them together as callback methods for Future objects, and return a Future containing the final result to the caller. On the called side, when the Future object is retrieved, it also installs subsequent RPC response handling as callback methods, which are triggered when the final result is ready. This way, the caller no longer needs to block a thread and just wait for the final return value to be ready. Refer to the @rpc.functions. Async_execution API documentation for a simple example.

In addition to reducing the number of idle threads for callers, these tools make batch RPC processing easier and faster. This tutorial demonstrates how to build distributed bulk update parameter servers and bulk processing reinforcement learning applications using the @rpc.functions.async_execution decorator.

Note: we do not consider the field of reinforcement learning, which will affect our thinking and energy.

1.3 code

Because the original text is mainly reinforcement learning code explanation, and we only focus on ordinary distributed batch update parameter server, so we need to see the original code.

The code is at github.com/pytorch/exa…

import os
import threading
from datetime import datetime

import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
from torch import optim

import torchvision


batch_size = 20
image_w = 64
image_h = 64
num_classes = 30
batch_update_size = 5
num_batches = 6

def timed_log(text) :
    print(f"{datetime.now().strftime('%H:%M:%S')} {text}")

class BatchUpdateParameterServer(object) :

    def __init__(self, batch_update_size=batch_update_size) :
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)

    def get_model(self) :
        return self.model

    @staticmethod
    @rpc.functions.async_execution
    def update_and_fetch_model(ps_rref, grads) :
        self = ps_rref.local_value()
        timed_log(f"PS got {self.curr_update_size}/{batch_update_size} updates")
        for p, g in zip(self.model.parameters(), grads):
            p.grad += g
        with self.lock:
            self.curr_update_size += 1
            fut = self.future_model

            if self.curr_update_size >= self.batch_update_size:
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step()
                self.optimizer.zero_grad()
                fut.set_result(self.model)
                timed_log("PS updated model")
                self.future_model = torch.futures.Future()

        return fut


class Trainer(object) :

    def __init__(self, ps_rref) :
        self.ps_rref = ps_rref
        self.loss_fn = nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)

    def get_next_batch(self) :
        for _ in range(num_batches):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()

    def train(self) :
        name = rpc.get_worker_info().name
        m = self.ps_rref.rpc_sync().get_model().cuda()
        for inputs, labels in self.get_next_batch():
            timed_log(f"{name} processing one batch")
            self.loss_fn(m(inputs), labels).backward()
            timed_log(f"{name} reporting grads")
            m = rpc.rpc_sync(
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda()
            timed_log(f"{name} got updated model")


def run_trainer(ps_rref) :
    trainer = Trainer(ps_rref)
    trainer.train()


def run_ps(trainers) :
    timed_log("Start training")
    ps_rref = rpc.RRef(BatchUpdateParameterServer())
    futs = []
    for trainer in trainers:
        futs.append(
            rpc.rpc_async(trainer, run_trainer, args=(ps_rref,))
        )

    torch.futures.wait_all(futs)
    timed_log("Finish training")


def run(rank, world_size) :
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    options=rpc.TensorPipeRpcBackendOptions(
        num_worker_threads=16,
        rpc_timeout=0  # infinite timeout
     )
    ifrank ! =0:
        rpc.init_rpc(
            f"trainer{rank}",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        # trainer passively waiting for ps to kick off training iterations
    else:
        rpc.init_rpc(
            "ps",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        run_ps([f"trainer{r}" for r in range(1, world_size)])

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = batch_update_size + 1
    mp.spawn(run, args=(world_size, ), nprocs=world_size, join=True)
Copy the code

0 x02 start

Let’s start by looking at how to start.

2.1 Overall Startup

Let’s assume we have a master (rank 0) and a worker. The parameter server runs on the Master, and the training code runs on the worker.

def run(rank, world_size) :
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    options=rpc.TensorPipeRpcBackendOptions(
        num_worker_threads=16,
        rpc_timeout=0  # infinite timeout
     )
    ifrank ! =0:
        rpc.init_rpc( # Training code
            f"trainer{rank}",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        # trainer passively waiting for ps to kick off training iterations
    else:
        rpc.init_rpc( # parameter server
            "ps", 
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        run_ps([f"trainer{r}" for r in range(1, world_size)])

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = batch_update_size + 1
    mp.spawn(run, args=(world_size, ), nprocs=world_size, join=True)
Copy the code

The logic is shown below:

             torch.multiprocessing.spawn
                        +
                        |
                        |
           +------------+-------------------------------------------------
           |                                                             |
           |                                                             |
           v                                                             v
+----------+----------------------------------------------+ +------------+----------------+
| "ps"                                           rank = 0 | | f"trainer{rank}"   rank = 1 |
|                                                         | |                             |
|                                                         | |                             |
|                     rpc.init_rpc                        | |         rpc.init_rpc        |
|                                                         | |                             |
|                                                         | |                             |
|  run_ps([f"trainer{r}" for r in range(1. world_size)]) | | | | | | | | | | | +---------------------------------------------------------+ +-----------------------------+Copy the code

2.2 Starting the Parameter server

Run_ps starts the parameter server and trainer. Note that the Trainer is started in the parameter server, that is, the master not only has one parameter server, but is also responsible for driving the training loop on the Trainer via RPC.

def run_ps(trainers) :
    timed_log("Start training")
    ps_rref = rpc.RRef(BatchUpdateParameterServer())
    futs = []
    for trainer in trainers: # trainer is a string, such as "trainer1"
        futs.append(
            rpc.rpc_async(trainer, run_trainer, args=(ps_rref,)) # run run_trainer
        )

    torch.futures.wait_all(futs)
    timed_log("Finish training")
    
def run_trainer(ps_rref) :
    trainer = Trainer(ps_rref)
    trainer.train() # Call the Trainer's method
Copy the code

Specific expansion is as follows:

The logic of parameter server and trainer is not given here, which will be given after the subsequent analysis. Trainer only gives you one.

             torch.multiprocessing.spawn
                        +
                        |
                        |
           +------------+------------------------------------------------+
           |                                                             |
           |                                                             |
           v                                                             v
+----------+----------------------------------------------+ +------------+----------------+
| "ps"                                           rank = 0 | | f"trainer{rank}"   rank = 1 |
|                                                         | |                             |
|                                                         | |                             |
|                     rpc.init_rpc                        | |         rpc.init_rpc        |
|                                                         | |                             |
|                                                         | |        +-----------------+  |
|  run_ps([f"trainer{r}" for r in range(1. world_size)]) | | | Trainer | | | + | | | | | | | | | +--------> train() | | | | | | | | | | | v | | | +-----------------+ | | +---------------------+---------------------------+ | | | | | | run_ps | | +-----------------------------+ | | | | | | | | | | | | ps_rref = rpc.RRef(BatchUpdateParameterServer())| | | | |for trainer intrainers: | | | | | futs.append( | | | | | rpc.rpc_async(trainer, run_trainer,+---------------+ | | args=(ps_rref,)) | | | | ) | | | +-------------------------------------------------+ |  +---------------------------------------------------------+Copy the code

Here’s what’s on the phone:

0x03 Parameter Server

The specific parameter server code is not shown in the figure above, so let’s examine it next.

Consider a synchronous training application with a parameter server (PS) and multiple trainers. In this application, PS holds the parameters and waits for all trainers to report gradients. In each iteration, it waits until gradients are received from all trainers, and then updates all parameters at once.

The following code shows the implementation of the PS class.

  • PS initialization generates a regular SGB optimizer, not a distributed optimizer, and the optimizer is on top of PS.
  • update_and_fetch_modelMethod is@rpc.functions.async_executionThe decorations will be called by the trainer.
  • Each call returns oneFutureObject that will be used to process the updated model.
  • Most trainers initiate calls that simply accumulate gradients to.gradThe member variable then returns immediately and generates RPC threads on PS.
  • The last trainer to arrive triggers the optimizer step and consumes all previously reported gradients. It then uses the updated model to set it upfuture_modelThat is relying on passingFutureObject to in turn notify previous requests from other trainers and send the updated model to all trainers.

The specific code is as follows:

batch_size = 20
image_w = 64
image_h = 64
num_classes = 30
batch_update_size = 5
num_batches = 6

def timed_log(text) :
    print(f"{datetime.now().strftime('%H:%M:%S')} {text}")

class BatchUpdateParameterServer(object) :

    def __init__(self, batch_update_size=batch_update_size) :
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        # Important: This is a regular SGB optimizer, not a distributed optimizer
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)

    def get_model(self) :
        return self.model

    @staticmethod
    @rpc.functions.async_execution # trainer will call directly
    def update_and_fetch_model(ps_rref, grads) :
        self = ps_rref.local_value()
        timed_log(f"PS got {self.curr_update_size}/{batch_update_size} updates")
        for p, g in zip(self.model.parameters(), grads): # get
            p.grad += g # Cumulative gradient
        with self.lock:
            self.curr_update_size += 1
            fut = self.future_model

            if self.curr_update_size >= self.batch_update_size:
                The last trainer to arrive will trigger the optimizer step and consume all previously reported gradients.
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step() # Update model
                self.optimizer.zero_grad()
                fut.set_result(self.model) Send the updated model to all trainers
                timed_log("PS updated model")
                self.future_model = torch.futures.Future() Set future_model with the updated model

        return fut # This object will be used to process the updated model
Copy the code

The logic extends as follows, omits the steps to generate the trainer for the parameter server:

                            torch.multiprocessing.spawn
                                       +
                                       |
                                       |
                          +------------+--------------------------------------------------------------------------------+
                          |                                                                                             |
                          |                                                                                             |
                          v                                                                                             v
+-------------------------+-----------------------------------------------------------------------------+  +------------+----------------+
|  "ps"                                                                                        rank = 0 |  | f"trainer{rank}"   rank = 1 |
|                                                        +-------------------------------------------+  |  |                             |
|                                                        | BatchUpdateParameterServer                |  |  |                             |
|  rpc.init_rpc                                          |                                           |  |  |         rpc.init_rpc        |
|                                                        |                                           |  |  |                             |
|  run_ps([f"trainer{r}" for r in range(1. world_size)]) | | | | +-----------------------+ | | + | model = resnet50(num_classes) | | | | Trainer | | | | | | | | | | | | | | future_model = Future() | | | | | | | v | | | | | +----> train() | | | +---------------------+---------------------------+ | optimizer = optim.SGD(model.parameters()) | | | | | | | | | run_ps  | | | | | | | | | | | | +-------------------------------------------+ | | +-----------------------+ | | | | | | | | | |  ps_rref = rpc.RRef(BatchUpdateParameterServer())| | | | | | |for trainer in trainers:                        |                                                  |  +-----------------------------+
|  |     futs.append(                                |                                                  |        |
|  |         rpc.rpc_async(trainer, run_trainer,+----------------------------------------------------------------+
|  |                       args=(ps_rref,))          |                                                  |
|  |     )                                           |                                                  |
|  +-------------------------------------------------+                                                  |
|                                                                                                       |
+-------------------------------------------------------------------------------------------------------+

Copy the code

The mobile phone is as follows:

0x04 Trainer

For trainers, they are all initialized with the same set of arguments from PS. In each iteration, do the following:

  • Each trainer first runs forward and backward propagation to generate gradients locally.
  • Each trainer then reports its gradient to PS using RPC and retrieves updated parameters via the return value of the same RPC request.

In the implementation of a trainer, it makes no difference whether the target function is marked @rpc.functions.async_execution. The trainer simply calls update_AND_FETch_model using rpc_sync, which blocks the trainer until the updated model is returned.

As you can see, the parameter server stores the model, which can be returned to the Trainer.

class Trainer(object) :

    def __init__(self, ps_rref) :
        self.ps_rref = ps_rref
        self.loss_fn = nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)

    def get_next_batch(self) :
        for _ in range(num_batches):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()

    def train(self) :
        name = rpc.get_worker_info().name
        Get the model from the parameter server
        m = self.ps_rref.rpc_sync().get_model().cuda()
        for inputs, labels in self.get_next_batch():
            timed_log(f"{name} processing one batch")
            # Use the model to propagate forward/back
            self.loss_fn(m(inputs), labels).backward()
            timed_log(f"{name} reporting grads")
            Call the parameter server's function to submit the gradient
            m = rpc.rpc_sync( After the # rpc_sync operation is complete, m is the latest model
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda()
            timed_log(f"{name} got updated model")
Copy the code

The expansion logic is as follows:

  1. The run_trainer method of the parameter server directly calls the trainer.train() method to perform step by step.
  2. The train method calls self.ps_ref.rpc_sync ().get_model().cuda() to retrieve the model from the parameter server and place it on the local device. The model needs to be stored locally on the worker).
  3. Call self.loss_fn(m(inputs), labels).backward() for forward/back propagation.
  4. Call the parameter server’s update_AND_FETch_model function to commit the gradient, using asynchronous RPC.
  5. In the parameter server’s update_AND_FETch_model, the gradient is accumulated, the model is updated by the regular SGD optimizer over PS, and the new model is published to the trainer by calling fut.set_result(self.model). In the trainer, m = rpc.rpc_sync(…) After this assignment, m is the latest model.
                            torch.multiprocessing.spawn
                                       +
                                       |
                                       |
                          +------------+----------------------------------------------------------------------------------------+
                          |                                                                                                     |
                          |                                                                                                     |
                          v                                                                                                     v
+-------------------------+--------------------------------------------------------------------------------+     +--------------+-------------------------------------+
|  "ps"                                                                                        rank = 0    |     | f"trainer{rank}"                         rank = 1  |
|                                                                                                          |     |                                                    |
|                                                                                                          |     | rpc.init_rpc                                       |
|  rpc.init_rpc                                          +----------------------------------------------+  |     |                                                    |
|                                                        | BatchUpdateParameterServer                   |  |     | +------------------------------------------------+ |
|  run_ps([f"trainer{r}" for r in range(1. world_size)]) | | | | | Trainer | | | + | model = resnet50(num_classes) | | | | | | | | | | | | | +-------------------------------------------+ | | | | | future_model = Future() | | | | | train | | | | v | | | | | | | | | | +---------------------+---------------------------+ | optimizer = optim.SGD(model.parameters()) | | | | | | | | | | run_ps | | | |2  | | |     m = ps_rref.rpc_sync()                |  | |
|  |                                                 |   |    def get_model(self) :   <------------------------------------>           .get_model()               |  | |
|  |                                                 |   |        return self.model                     |  |     | | |                .cuda()                    |  | |
|  | ps_rref = rpc.RRef(BatchUpdateParameterServer())|   |                                              |  |     | | |                                           |  | |
|  | for trainer in trainers:                        |   |                                              |  |     | | | 3   loss_fn(m(inputs), labels).backward() |  | |
|  |     futs.append(                                |   |                                              |  |  4| | | | | | | | rpc.rpc_async(trainer, run_trainer, | | update_and_fe+ch_model <-----------------------------------> BatchUpdateParameterServer | | | | | args=(ps_rref,)) +  | | | | | | | | .update_and_fetch_model() | | | | | ) | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | | | | +-------------------------------------------------+ +---------------------------------------------> m = rpc.rpc_sync(...) .cuda() | | | | | |5| | | | | | | | | | | | | | | +----------------------------------------------------------------------------------------------------------+ | | +----------------+--------------------------+ | | | | | | | | | | +------------------------------------------------+ | |  +----------------------------------------------------+ |1                                                         |
                                             +----------------------------------------------------------------------------------------+


Copy the code

The mobile phone is as follows:

0 x05 contrast

At the end of the article, we compare the classical implementation of parameter servers, PS-Lite, with the previous two implementations of parameter servers.

  • Ps-lite is similar to a traditional server implementation, with its own active business loop that can respond to explicit user requests, its own explicit logic, and its own KV storage locally.
  • In PyTorch’s first two official documents (the first two articles in this series), the parameter server takes a different approach:
    • There is no active loop, no KV storage, no server logic on the parameter server. Instead, the business model can be stored directly, and PS will return the parameters that the business model needs to be optimized to the DistributedOptimizer above the trainer.
    • The business driver is done by the Trainer: the train loop code is in the Trainer, the DistributedOptimizer is in the Trainer, and the DistributedOptimizer is responsible for the distributed optimization.
  • This article, again, looks more like PS-Lite, but with a mash-up of RPC implementations:
    • The PS process starts the trainer’s training cycle.
    • During each iteration, the Trainer retrieves the latest model from the parameter server, and the forward/backward propagation is done by the Trainer.
    • The Trainer submits gradients to the parameter server via asynchronous RPC.
    • Model updates are done using the regular SGD optimizer over PS.
    • After model updates, the model is distributed to the trainer again via asynchronous RPC.

Have to say, the official several articles quickly put a variety of implementation methods to play out the flowers, we can be based on their own business characteristics to reference implementation.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

IMPLEMENTING BATCH RPC PROCESSING USING ASYNCHRONOUS EXECUTIONS