0 x00 the

In the previous articles, we looked at the basic modules of PyTorch’s distributed logic. The next few articles will take a look at how to put these modules into practice and give a general overview of PyTorch’s distributed logic. This article describes how to combine DDP and RPC framework.

Based on the translation of COMBINING DISTRIBUTED DATAPARALLEL WITH DISTRIBUTED RPC FRAMEWORK, this article will add my own understanding.

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

PyTorch Distributed Autograd (4) —-

PyTorch Distributed Autograd (5) —-

PyTorch Distributed Autograd (6) —-

PyTorch Distributed optimizer (1)—- cornerstone

PyTorch Distributed optimizer (2)—- Data parallel optimizer

PyTorch Distributed optimizer (3)—- model parallel

PyTorch Distributed (14) — Use Distributed Autograd and Distributed Optimizer

PyTorch distributed (15) — using a distributed RPC framework to implement parameter server

PyTorch distributed (16) — Use asynchronous execution to implement batch RPC

Note: This article is not translated exactly in the order of the original, but reorganized according to my own understanding.

0 x00 review

This tutorial uses a simple example to demonstrate how to combine DistributedDataParallel (DDP) with the distributed RPC framework to train a simple model by combining distributed data parallelism with distributed model parallelism. The source code for this example can be found here.

The previous tutorials introduction to Distributed data parallelism and Introduction to distributed RPC frameworks described how to perform distributed data parallelism and distributed model parallelism training, respectively. However, you may want to combine the two techniques in multiple training paradigms. Such as:

  1. If we have a model with sparse parts (large embedded tables) and dense parts (FC layers), we might want to put the embedded tables on a parameter server and replicate FC layers across multiple trainers using DistributedDataParallel. The distributed RPC framework can then be used to perform embedded lookups on parameter servers.
  2. Enable mixed parallelism as described in the PipeDream paper. We can use a distributed RPC framework to streamline the phases of the model across multiple workers and use distributed Data Aparallel to parallel data for each phase, if necessary.

In this tutorial, we will cover case 1 above. There are altogether 4 workers in our setup, as shown below:

  • One Master, responsible for creating an embedded table (nn.embeddingbag) on the parameter server. The Master is also responsible for driving the training loop on the two trainers.
  • 1 Parameter Server, which stores the embedded table in memory and responds to RPC requests from the Master and Trainer.
  • Two trainers, which stores a FC layer (nn.Linear) that uses DistributedDataParallel for data parallelizing. The Trainer is also responsible for performing forward propagation, backward propagation, and optimizer steps.

The whole training process is as follows:

  1. The Master creates a RemoteModule that stores an embedded table on the parameter server.
  2. The Master starts the training cycle on the trainer and propagates the remote Module to the trainer.
  3. Trainer creates aHybridModelThe embedding lookup is first performed using the remote module provided by master, and then the FC layer encapsulated in DDP is executed.
  4. The Trainer performs forward propagation of the model and backward propagation of losses using Distributed Autograd.
  5. As part of the back propagation, the gradient of the FC layer is first calculated and synchronized to all trainers via AllReduce in the DDP.
  6. Next, distributed Autograd propagates the gradient to the parameter server, where the gradient of the embedded table is updated.
  7. Finally, the distributed optimizer is used to update all parameters.

Note: If you use DDP and RPC together, you should always use Distributed Autograd for back propagation.

0 x01 start

Let’s see how the system starts up. First of all, all workers need to be set up before training. Rank 0 and Rank 1 are our trainers, Rank 2 is our master, and Rank 3 is our parameter server.

The initialization logic is as follows:

  • We used TCP init_method to initialize the RPC framework on all four workers.
  • For Master, the code does the following:
    • After RPC initialization, the master creates a RemoteModule, RemoteModule, that points to a layer of EmbeddingBag stored on the parameter server.
    • The master then iterates through each trainer and passes through therpc_asynccall_run_trainerStart the training loop on each trainer.
    • Finally, the master waits for all training to complete before exiting.
  • Trainers do the following:
    • Trainers are used firstinit_process_groupInitialize a world_size = 2 (for both trainers) for DDPProcessGroup.
    • Next, Trainers use TCP init_method to initialize the RPC frame. Note that the ports initialized by RPC and ProcessGroup are different. This is to avoid port conflicts between the initializations of the two frameworks.
    • Once the initialization is complete, the trainer simply waits for the_run_trainerThe RPC.
  • The parameter server simply initializes the RPC framework and waits for RPCS from the trainer and master.

The specific code is as follows:

def run_worker(rank, world_size) :
    r""" A wrapper function that initializes RPC, calls the function, and shuts down RPC. """

    # We need to use different port numbers in TCP init_method for init_rpc and
    # init_process_group to avoid port conflicts.
    rpc_backend_options = TensorPipeRpcBackendOptions()
    rpc_backend_options.init_method = "tcp://localhost:29501"

    # Rank 2 is master, 3 is ps and 0 and 1 are trainers.
    if rank == 2: # Master code
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        remote_emb_module = RemoteModule( # point to an EmbeddingBag layer stored on the parameter server
            "ps",
            torch.nn.EmbeddingBag,
            args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
            kwargs={"mode": "sum"},)# Run the training loop on trainers.
        futs = []
        for trainer_rank in [0.1]:
            trainer_name = "trainer{}".format(trainer_rank)
            fut = rpc.rpc_async( # Start trainer loop
                trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
            )
            futs.append(fut)

        # Wait for all training to finish.
        for fut in futs:
            fut.wait()
    elif rank <= 1:
        # Initialize process group for Distributed DataParallel on trainers.
        dist.init_process_group(
            backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
        )

        # Initialize RPC.
        trainer_name = "trainer{}".format(rank)
        rpc.init_rpc(
            trainer_name,
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        Just wait for the _run_trainer RPC from the master
        # Trainer just waits for RPCs from master.
    else:
        rpc.init_rpc( # parameter server
            "ps",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        # parameter server do nothing
        pass Do nothing, just wait for RPCS from trainer and Master

    # block until all rpcs finish
    rpc.shutdown()


if __name__ == "__main__":
    # 2 trainers, 1 parameter server, 1 master.
    world_size = 4
    mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)
Copy the code

The current logic is as follows, and we will continue to expand it in the future:

                               torch.multiprocessing.spawn
                                          +
                                          |
                                          |
              +----------------------------------------------------------------+----------------------------------+
              |                           |                                    |                                  |
              |                           |                                    |                                  |
              v                           v                                    v                                  v
+-------------+-------------+  +----------+---------------+ +------------------+------------------+ +-------------+--------+
|trainer 0         rank = 0 |  |trainer 1     rank = 1    | | master                     rank = 2 | |ps          rank = 3  |
|                           |  |                          | |                                     | |                      |
|                           |  |                          | |   rpc.init_rpc                      | |     rpc.init_rpc     |
|                           |  |                          | |                                     | |                      |
|   dist.init_process_group |  |  dist.init_process_group | |   remote_emb_module =  RemoteModule | |                      |
|                           |  |                          | |                                     | |                      |
|                           |  |                          | |                                     | |                      |
|   rpc.init_rpc            |  |  rpc.init_rpc            | |   fut = rpc.rpc_async(_run_trainer) | |                      |
|                           |  |                          | |                                     | |                      |
|                           |  |                          | |                                     | |                      |
+---------------------------+  +--------------------------+ +-------------------------------------+ +----------------------+

Copy the code

The mobile phone is as follows:

0x03 Support System

Support system is mainly refers to _RemoteModule, its role is to establish a model in other cities, the specific code in: the torch/distributed/nn/API/remote_module py.

3.1 features

The RemoteModule instance can only be created after RPC initialization. It can create user-specified modules on the specified remote node. This behaves like the regular nn.Module method, except that RemoteModule executes the forward method on the remote node. RemoteModule is responsible for autograd recording to ensure that backward propagation propagates gradients back to the corresponding RemoteModule.

RemoteModule can use RPC framework < https://pytorch.org/docs/stable/rpc.html > Shared between processors, and won’t produce any overhead, copy the actual module This is equivalent to the use of a ~ torch. Distributed. RPC. RRef pointed to remote module.

3.2 the use of

3.2.1 Mixed model

To create a hybrid model, local modules should generally be created in addition to remote modules, rather than as submodules of any remote modules. If the remote module is placed on a CUDA device, any input CPU tensors are automatically moved onto the same CUDA device. Examples of hybrid models are as follows:

            >>> class HybridModel(nn.Module) :
            >>>     def __init__(self) :
            >>>         nn.Module.__init__(self)
            >>>         self.remote_embedding = RemoteModule(...) Create an embedded layer on the remote side
            >>>         self.local_linear = nn.Linear(...)
Copy the code

3.2.2 use

In this example, the RemoteModule is created with a “worker1/ CPU “parameter, meaning that the RemoteModule will run on the WORKer1 CPU device. The format is < workerName > / , where is of type torch. Device.

    Example::
        >>> # On worker 0:
        >>> import torch
        >>> import torch.distributed.rpc as rpc
        >>> from torch import nn, Tensor
        >>> from torch.distributed.nn.api.remote_module import RemoteModule
        >>>
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> remote_linear_module = RemoteModule(
        >>>     "worker1/cpu", nn.Linear, args=(20.30), > > >) > > >input = torch.randn(128.20)
        >>> ret_fut = remote_linear_module.forward_async(input)
        >>> ret = ret_fut.wait()
        >>> rpc.shutdown()

        >>> # On worker 1:
        >>> import torch
        >>> import torch.distributed.rpc as rpc
        >>>
        >>> rpc.init_rpc("worker1", rank=1, world_size=2)
        >>> rpc.shutdown()
Copy the code

3.3 define

The _RemoteModule initialization logic is as follows:

  • (1). Prepare parameters.
  • (2). Set the running remote worker and remote device.
  • (3). If set_module_interface_cls
    • Using (3.1)_module_interface_clsTo build modules remotely. _
    • (3.2) Build the function proxy generator locally.
    • (3.3) Wait until the creation is complete.
    • (3.4) Build handles locally.
  • (4) _module_interface_CLS is not set.
    • (4.1) Build the function proxy generator locally.
    • (4.2) Create modules on the remote end.
  • (5). Create remote function proxy locally.
class _RemoteModule(nn.Module) :
    def __init__(
        self,
        remote_device: str,
        module_cls: nn.Module,
        args: Tuple = None,
        kwargs: Dict[str.Any] = None,
        _module_interface_cls: Any = None.) :
        """ Args: remote_device (str): Device on the destination worker where we'd like to place this module. The format should be "
      
       /
       ", where the device field can be parsed as torch.device type. E.g., "trainer0/cpu", "trainer0", "ps0/cuda:0". In addition, the device field can be optional and the default value is "cpu". Returns: A remote module instance which wraps the :class:`~nn.Module` created by the user-provided ``module_cls``, it has a blocking ``forward`` method and an asynchronous ``forward_async`` method that returns a future of the ``forward`` call on the user-provided module on the remote side. """
      
        super().__init__()

        # NOTE: if a new attribute is added to this class, also need to add it
        # to ``_REMOTE_MODULE_PICKLED_ATTRIBUTES`` for pickling/unpickling.

        # Default arguments preperation.
        # 1. Prepare parameters
        args = args if args is not None else ()
        kwargs = kwargs if kwargs is not None else {}

        # 2. Set the running remote worker and remote device
        self.on, self.device = _parse_remote_device(remote_device)
        agent = rpc._get_current_rpc_agent()
        # If the device map of the remote worker is set,
        # then enable moving any input CPU tensors to the same cuda device.
        self.is_device_map_set = bool(
            agent._get_device_map(agent.get_worker_info(self.on))
        )
        # ``enable_moving_cpu_tensors_to_cuda`` is less strict than ``is_device_map_set``:
        # If ``enable_moving_cpu_tensors_to_cuda`` is true, but the device map is not set,
        # then any CPU tensors can still be moved to a cuda device to run forward,
        # but the output must be moved back to CPU before being sent over the wire.
        enable_moving_cpu_tensors_to_cuda = torch.device(self.device).type= ="cuda"

        # 3. If _module_interface_CLs is set
        if _module_interface_cls is not None:
            # Users reply on this field to know if this generated RemoteModule is TorchScript-able.
            self.is_scriptable = True

            # 3.1 Use _module_interface_CLs to build modules remotely
            # Instantiate template on remote side.
            fut = rpc.rpc_async(
                self.on,
                _instantiate_template,
                (_module_interface_cls, enable_moving_cpu_tensors_to_cuda),
            )

            # 3.2 Build function proxy generators locally
            # Instantiate template on local side.
            generated_module = (
                instantiator.instantiate_scriptable_remote_module_template(
                    _module_interface_cls, enable_moving_cpu_tensors_to_cuda
                )
            )
            self.generated_methods = generated_module._generated_methods

            Wait for the creation to complete
            # Create the module on the remote side.
            fut.wait()  # Ensure remote_module_cls is available on remote side.

            # 3.4 Build handles locally
            self.module_rref = rpc.rpc_sync(
                self.on,
                _create_module_with_interface,
                (module_cls, args, kwargs, self.device, _module_interface_cls),
            )
        else: # 4 _module_interface_CLs is not set
            self.is_scriptable = False
            Build the function proxy generator locally
            self.generated_methods = (
                _NON_SCRIPTABLE_REMOTE_MODULE_MODULE._generated_methods
            )
            # 4.2 Create modules remotely
            # Create the module on the remote side.
            self.module_rref = rpc.remote(
                self.on,
                _create_module,
                (module_cls, args, kwargs, self.device),
            )

        # Install generated methods.
        # 5. Create remote function proxies locally
        for method in self.generated_methods:
            method_name = method.__name__
            method = torch.jit.export(method)
            setattr(self, method_name, types.MethodType(method, self))
Copy the code

3.4 Main Functions

Its main functions are as follows:

  • RPC. Rpc_sync returned to remote module parameters to the torch. Distributed. RPC. RRef list. Can usually with ~ torch. Distributed. Optim. DistributedOptimizer used in combination.

  • Get_module_rref returns a pointer to a remote Module to torch. Distributed. RPC. RRef (RRef [nn. The Module]) class.

def remote_parameters(self, recurse: bool = True) - >List[rpc.RRef[Parameter]]:
    """ Returns a list of :class:`~torch.distributed.rpc.RRef` pointing to the remote module's parameters. This can typically be used in conjuction with :class:`~torch.distributed.optim.DistributedOptimizer`. Args: recurse (bool): if True, then returns parameters of the remote module and all submodules of the remote module. Otherwise, returns only parameters that are direct members of the remote module. Returns: A list of :class:`~torch.distributed.rpc.RRef` (``List[RRef[nn.Parameter]]``) to remote module's parameters. """
    return rpc.rpc_sync(self.on, _param_rrefs, args=(self.module_rref, recurse))

def get_module_rref(self) -> rpc.RRef[nn.Module]:
    """ Returns an :class:`~torch.distributed.rpc.RRef` (``RRef[nn.Module]``) pointing to the remote module. """
    return self.module_rref
Copy the code

The logic diagram is transformed as follows, adding a remote_emb_module on top of the previous one, which creates a RemoteModule on top of PS.

                                torch.multiprocessing.spawn
                                           +
                                           |
                                           |
               +----------------------------------------------------------------+----------------------------------+
               |                           |                                    |                                  |
               |                           |                                    |                                  |
               v                           v                                    v                                  v
+--------------+-------------+ +-----------+--------------+ +-------------------+-----------------+  +-------------+--------+
|trainer 0          rank = 0 | |trainer 1     rank = 1    | | master                     rank = 2 |  |ps          rank = 3  |
|                            | |                          | |                                     |  |                      |
|                            | |                          | |     rpc.init_rpc                    |  |     rpc.init_rpc     |
|                            | |                          | |                                     |  |                      |
|    dist.init_process_group | |  dist.init_process_group | |   remote_emb_module +----------------------> RemoteModule     |
|                            | |                          | |                                     |  |                      |
|                            | |                          | |                                     |  |                      |
|    rpc.init_rpc            | |  rpc.init_rpc            | |   fut = rpc.rpc_async(_run_trainer) |  |                      |
|                            | |                          | |                                     |  |                      |
|                            | |                          | |                                     |  |                      |
|                            | |                          | |                                     |  |                      |
+----------------------------+ +--------------------------+ +-------------------------------------+  +----------------------+
Copy the code

The mobile phone is as follows:

0x04 HybridModel

Before discussing the Trainer’s details, let’s first introduce the HybridModel that the Trainer uses. The model consists of sparse parts and dense parts.

  • The dense part is an nn.Linear, which is replicated across all trainers using DistributedDataParallel, that is, a nn.Linear layer is wrapped inside the DDP.

  • The sparse part is a remote module (remote_EMb_module) that holds an nn.embeddingbag on the parameter server. That is, this remote module can get a remote reference to the embedded table on the parameter server.

The forward approach of the model is very simple. It uses the RemoteModule to perform the embeddable lookup forward on the parameter server and propagate its output to the FC layer, where THE FC uses DDP.

class HybridModel(torch.nn.Module) :
    r""" The model consists of a sparse part and a dense part. 1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel. 2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server. This remote model can get a Remote Reference to the embedding table on the parameter server. "" "

    def __init__(self, remote_emb_module, device) :
        super(HybridModel, self).__init__()
        self.remote_emb_module = remote_emb_module
        self.fc = DDP(torch.nn.Linear(16.8).cuda(device), device_ids=[device])
        self.device = device

    def forward(self, indices, offsets) :
        emb_lookup = self.remote_emb_module.forward(indices, offsets)
        return self.fc(emb_lookup.cuda(self.device))
Copy the code

The logic extends as follows: Remote_emb_module is also set up on both trainers, pointing to the RemoteModule on PS.

torch.multiprocessing.spawn + | | +-----------------------------------------------------------------------------------+----------------------------------+  | | | | | | | | v v v v +-----------+-------------+ +-----------------------+-------------------+ +---------------------+---------------+ +-------------+--------+ |trainer0       rank = 0 | | trainer 1                        rank = 1 | | master                     rank = 2 |    |ps          rank = 3  |
|                         | |                                           | |                                     |    |                      |
|                         | |                                           | |   rpc.init_rpc                      |    |     rpc.init_rpc     |
| dist.init_process_group | | dist.init_process_group                   | |                                     |    |                      |
|                         | |                                           | |   remote_emb_module +------------------------> RemoteModule     |
| rpc.init_rpc            | | rpc.init_rpc                              | |                                     |    |         ^     ^      |
|                         | |                                           | |                                     |    |         |     |      |
|                         | |                                           | |   fut = rpc.rpc_async(_run_trainer) |    |         |     |      |
|                         | |                                           | |                                     |    |         |     |      |
| +---------------------+ | |            +---------------------------+  | |                                     |    |         |     |      |
| | HybridModel         | | |            |HybridModel                |  | |                                     |    |         |     |      |
| |                     | | |            |                           |  | +-------------------------------------+    +----------------------+
| |                     | | |            |                           |  |                                                      |     |
| |   fc = DDP(Linear)  | | |            |      fc = DDP(Linear())   |  |                                                      |     |
| |                     | | |            |                           |  |                                                      |     |
| |   remote_emb_module | | |            |      remote_emb_module+-------------------------------------------------------------+     |
| |             +       | | |            |                           |  |                                                            |
| +---------------------+ | |            +---------------------------+  |                                                            |
|               |         | |                                           |                                                            |
+-------------------------+ +-------------------------------------------+                                                            |
                |                                                                                                                    |
                +--------------------------------------------------------------------------------------------------------------------+
Copy the code

The mobile phone is as follows:

0 x05 training

5.1 the initialization

We missed the initialization of the trainer before, so let’s analyze it here.

Let’s look at the Settings on the Trainer.

  • First, the Trainer uses the remote Module and his own rank to create the aboveHybridModelThe remote module holds the embedded table on the parameter server.
  • Second, we need to get a list of rrefs that point to all the parameters we want to optimize with the DistributedOptimizer.
    • To retrieve these parameters from the parameter server embedded table, we can call remote_parameters of RemoteModule, which iterates over all the parameters of the embedded table and returns a list of Rrefs. The Trainer calls this method on the parameter server via RPC to get the RRef list of the required parameters.
    • Since the DistributedOptimizer always holds a list of RRefs for parameters that need to be optimized, we need to create rRefs for local parameters of the FC layer. This is done by traversalmodel.fc.parameters()To create an RRef for each parameter and append it to theremote_parameters()In the list returned.
    • Please note that we cannot use itmodel.parameters()Because it calls recursivelymodel.remote_emb_module.parameters()And theRemoteModuleThis operation is not supported.
  • Finally, we create our DistributedOptimizer with all the RRefs and define a CrossEntropyLoss function.
def _run_trainer(remote_emb_module, rank) :
    r""" Each trainer runs a forward pass which involves an embedding lookup on the parameter server and running nn.Linear locally. During the backward pass, DDP is responsible for aggregating the gradients for the dense part (nn.Linear) and distributed autograd ensures gradients updates are propagated to the parameter server. """

    # Setup the model.
    model = HybridModel(remote_emb_module, rank)

    # Retrieve all model parameters as rrefs for DistributedOptimizer.

    # Retrieve parameters for embedding table.
    model_parameter_rrefs = model.remote_emb_module.remote_parameters()

    # model.fc.parameters() only includes local parameters.
    # NOTE: Cannot call model.parameters() here,
    # because this will call remote_emb_module.parameters(),
    # which supports remote_parameters() but not parameters().
    for param in model.fc.parameters(): 
        model_parameter_rrefs.append(RRef(param)) DDP parameters that require distributed optimization are added here

    # Setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model_parameter_rrefs, # Dense parameter and SPARSE parameter are optimized together
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()
Copy the code

We extend the logic as follows, omit the trainer 0 arrow pointing to the parameter server, and add the DistributedOptimizer compared to the figure above.

torch.multiprocessing.spawn + | | +-----------------------------------------------------------------------------------+----------------------------------+  | | | | | | | | v v v v +--------------+-------------+ +-----------------------+-------------------+ +---------------------+---------------+ +---------------+-------------+ |trainer0          rank = 0 | | trainer 1                        rank = 1 | | master                     rank = 2 |  |  ps                rank = 3 |
|                            | |                                           | |                                     |  |                             |
|                            | |                                           | |                                     |  |      rpc.init_rpc           |
| dist.init_process_group    | | dist.init_process_group                   | |   rpc.init_rpc                      |  |                             |
|                            | |                                           | |                                     |  |    +----------------------+ |
| rpc.init_rpc               | | rpc.init_rpc                              | |                            1| | | RemoteModule | | | | | | | remote_emb_module +---------------------> | | | | +------------------------+ | | +---------------------------------------+ | | | | | | | | | _run_trainer | | | | _run_trainer | | | | | | remote_parameters() | | | | | | | | | | | fut = rpc.rpc_async(_run_trainer) | | | | | | | | | | | output = model(indices, offsets) | | | | | | | | | | | | | | dist_autograd.backward | | | | | +------+--------+------+ | | | | | | | opt.step | | | | | ^ ^ | | | | | | | | | | | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + +-----------------------------+ | | | HybridModel | | | | | +-----------------------------+ | | | | | | | | | | | | | HybridModel | | | | | | | | fc = DDP(Linear) | | | | | | | | | | | | | | remote_emb_module | | | | | | fc = DDP(Linear().cuda() | | | | | | | | | | | | | | remote_emb_module+-------------------------------------------------------------------------> | | | +-------------------+  | | | | | | | |2| | | | | | | +-----------------------------+ | | | | | +--------------------+ | | | | +-----------------------------+ |  | | | | |DistributedOptimizer| | | | | |DistributedOptimizer | | | | | | +--------------------+ | | | | | +------------------------------------------------------------------------> | | | | | | +-----------------------------+ |  |3
| +------------------------+ | | +---------------------------------------+ |
+----------------------------+ +-------------------------------------------+


Copy the code

The mobile phone is as follows:

5.2 Training Cycle

Now let’s introduce the main training loop that runs on each trainer. Here get_next_Batch is just a helper function for generating random input and training targets. We run the training loop for multiple Epochs and each batch:

  1. Set the Distributed Autograd Context for Distributed Autograd.
  2. Run the forward propagation of the model and get its output.
  3. Use the loss function to calculate the loss based on our output and target.
  4. Perform Distributed back propagation of losses using Distributed Autograd.
  5. Finally, run the distributed optimizer STEP to optimize all parameters.
    def get_next_batch(rank) :
        for _ in range(10):
            num_indices = random.randint(20.50)
            indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)

            # Generate offsets.
            offsets = []
            start = 0
            batch_size = 0
            while start < num_indices:
                offsets.append(start)
                start += random.randint(1.10)
                batch_size += 1

            offsets_tensor = torch.LongTensor(offsets)
            target = torch.LongTensor(batch_size).random_(8).cuda(rank)
            yield indices, offsets_tensor, target

    # Train for 100 epochs
    for epoch in range(100) :# create distributed autograd context
        for indices, offsets, target in get_next_batch(rank):
            with dist_autograd.context() as context_id:
                output = model(indices, offsets)
                loss = criterion(output, target)

                # Run distributed backward pass
                dist_autograd.backward(context_id, [loss])

                # Tun distributed optimizer
                opt.step(context_id)

                # Not necessary to zero grads as each iteration creates a different
                # distributed autograd context which hosts different grads
        print("Training done for epoch {}".format(epoch))
Copy the code

Due to space constraints, we just refine the trainer above as follows:

  1. Init_process_group to initialize DistributedDataParallel and RPC. Init_rpc to initialize RPC.
  2. In HybridModel, fc is DistributedDataParallel, and remote_emb_module is RemoteModule on parameter servers.
  3. In the DistributedOptimizer, both the FC and remote_EMb_module of HybridModel are optimized in a distributed way.
  4. In _run_trainer, use model(indices, offsets) for forward propagation, where hybridModel.forward is called.
  5. Hybridmodel. forward manipulates embedding and FC.
    1. Embedding uses RPC and parameter servers.
    2. Fc is used to distribute data apartage.
    3. Place the embedded table on the parameter server and replicate the FC layer across multiple trainers using DistributedDataParallel.

These serial numbers correspond to the numbers shown below.

+---------------------------------------------------------------------+
| trainer 1                                                 rank = 1  |
|                +-----------------------------------+                |
|                |    dist.init_process_group      1 |                |
|                |                                   |                |
|                |    rpc.init_rpc                   |                |
|                |                                   |                |
|                +-----------------------------------+                |
| +-----------------------------------------------------------------+ |
| | _run_trainer                                                    | |
| |                                                                 | |
| |     output = model(indices, offsets)                            | |
| |     dist_autograd.backward      +                               | |
| |     opt.step                    |                               | |
| |  +-----------------------------------------------------------+  | |
| |  | HybridModel                  |                          2 |  | |
| |  |                              |                            |  | |
| |  |    fc = DDP(Linear().cuda()  |                            |  | |
| |  |                              |4                           |  | |
| |  |    remote_emb_module         |                            |  | |
| |  |                              |                            |  | |
| |  |                              v                            |  | |
| |  |   +--------------------------+--------------------------+ |  | |
| |  |   |forward                                              | |  | |
| |  |   |  emb_lookup = remote_emb_module.forward()           | |  | |
| |  |   |                  +                                  | |  | |
| |  |   |                  |  5                               | |  | |
| |  |   |                  |                                  | |  | |
| |  |   |                  v                                  | |  | |
| |  |   |  fc(emb_lookup.cuda(device)                         | |  | |
| |  |   |                                                     | |  | |
| |  |   +-----------------------------------------------------+ |  | |
| |  +-----------------------------------------------------------+  | |
| |  +-----------------------------------------------------------+  | |
| |  | DistributedOptimizer                                    3 |  | |
| |  |                                                           |  | |
| |  |         HybridModel.remote_emb_module.remote_parameters() |  | |
| |  |                                                           |  | |
| |  |         HybridModel.fc.parameters()                       |  | |
| |  |                                                           |  | |
| |  +-----------------------------------------------------------+  | |
| +-----------------------------------------------------------------+ |
+---------------------------------------------------------------------+

Copy the code

The mobile phone is as follows:

Note that the source code for the entire example can be found here.

0 x06 than

We’ve looked at three official PyTorch samples, each with a different implementation of the parameter server. For this article, a master is added as the coordinator to unify all workers.

In general, PyTorch’s parameter server implementation is more flexible than PS-Lite or Paracel because of the RPC mechanism:

  • First, the parameter server can now be placed in the GPU.
  • Second, you can place only parameters on the parameter server, you can run optimization code, and you can even start the control Trainer on top of the parameter service.
  • The optimizer can be a common optimizer or a DistributedOptimizer.
  • The training code is run entirely locally from a user-written perspective.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

COMBINING DISTRIBUTED DATAPARALLEL WITH DISTRIBUTED RPC FRAMEWORK