0 x00 the

In the previous articles, we looked at the basic modules of PyTorch’s distributed logic. The next few articles will take a look at how to put these modules into practice and give a general overview of PyTorch’s distributed logic. This paper introduces how to train a model by combining distributed automatic differentiation and distributed optimizer.

In this paper, pytorch.org/tutorials/i… Part of the translation based on their own understanding.

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

PyTorch Distributed Autograd (4) —-

PyTorch Distributed Autograd (5) —-

PyTorch Distributed Autograd (6) —-

PyTorch Distributed optimizer (1)—- cornerstone

PyTorch Distributed optimizer (2)—- Data parallel optimizer

PyTorch Distributed optimizer (3)—- model parallel

0 x01 instructions

First of all, there are two parts of the original text: reinforcement learning and RNN. This paper only translated the RNN part. Moreover, this paper did not translate the text in full accordance with the order of the original text, but reorganized the text according to my own understanding, and looked at the system from a top-down perspective.

This article uses the RNN model to show how to build distributed model parallel training using RPC apis. The sample RNN model is very small and could easily fit into a single GPU, but we still layered it over two different workers to demonstrate distributed training. Developers can apply similar techniques to distribute larger models across multiple devices and machines.

Note: In these official distributed articles, worker sometimes refers to all processes in a distributed system, while the actual training process is often called trainer. The worker in this paper includes a trainer and a parameter server.

0 x02 start

During startup, the run_worker method starts a trainer and a parameter server, which has no behavior in the code.

def run_worker(rank, world_size) :
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 1:
        # Start trainer
        rpc.init_rpc("trainer", rank=rank, world_size=world_size)
        # trainer Business logic
        _run_trainer()
    else:
        # start the parameter server
        rpc.init_rpc("ps", rank=rank, world_size=world_size)
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)
Copy the code

The details are as follows:

           torch.multiprocessing.spawn
                      +
                      |
                      |
    +-----------------+--------------------+
    |                                      |
    |                                      |
    v                                      v
+---+---------------------+   +------------+-------------+
| "ps"          rank = 0  |   | "trainer"      rank = 1  |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
|                         |   |                          |
+-------------------------+   +--------------------------+
Copy the code

0x03 Trainer

Let’s look at the training cycle. After initializing the model parameters, we create “RNNModel” and “DistributedOptimizer”. The distributed optimizer will get the list of parameters “RRefs”, find all the different owner workers for these parameters, and create the given local optimizer (in this case, “SGD”) on each owner worker with the given parameter (i.e. “LR =0.05”), You can also use other native optimizers).

In the training loop, it does the following:

  • Start by creating the distributed Autograd context, which will help the distributed Autograd engine find gradients and the involved RPC Send/RECV functions.
  • It then starts propagating forward like the local model and runs distributed backward propagation. For distributed backward propagation, you only need to specify the List of roots, which in this case is the Loss tensor. The distributed Autograd engine will automatically traverse the distributed computation graph and write gradients correctly.
  • Next, it runs the ‘step’ function on the distributed optimizer, which contacts all relevant local optimizers to update model parameters. One difference compared to native training is that the user does not need to run itzero_grad(), because each Autograd context has a dedicated space to store gradients, so that gradients from different iterations do not accumulate on the same set of tensors as each iteration creates a context.

The specific code is as follows:

def run_trainer() :
    batch = 5
    ntoken = 10
    ninp = 2
    nhid = 3
    nindices = 3
    nlayers = 4
    hidden = (
        torch.randn(nlayers, nindices, nhid),
        torch.randn(nlayers, nindices, nhid)
    )

    model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)

    # setup distributed optimizer
    opt = DistributedOptimizer( Create a distributed optimizer
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

    def get_next_batch() :
        for _ in range(5):
            data = torch.LongTensor(batch, nindices) % ntoken
            target = torch.LongTensor(batch, ntoken) % nindices
            yield data, target

    # train for 10 iterations
    for epoch in range(10) :for data, target in get_next_batch():
            # create distributed autograd context
            with dist_autograd.context() as context_id: Create a distributed context
                hidden[0].detach_()
                hidden[1].detach_()
                output, hidden = model(data, hidden)
                loss = criterion(output, target)
                # run distributed backward pass
                dist_autograd.backward(context_id, [loss]) Perform distributed backward propagation
                # run distributed optimizer
                opt.step(context_id) # Distributed optimizer for updates
                # not necessary to zero grads since they are
                # accumulated into the distributed autograd context
                # which is reset every iteration.
        print("Training epoch {}".format(epoch))
Copy the code

The logical extension is:

           torch.multiprocessing.spawn
                      +
                      |
                      |
    +-----------------+--------------------+
    |                                      |
    |                                      |
    v                                      v
+---+---------------------+   +------------+-----------------------------------+
| "ps"          rank = 0  |   | "trainer"      rank = 1                        |
|                         |   |                                                |
|                         |   |                                                |
|                         |   |                                                |
|                         |   |    model = rnn.RNNModel('ps')                  |
|                         |   |                                                |
|                         |   |                                                |
|                         |   |    dist_autograd.backward(context_id, [loss])  |
|                         |   |                                                |
|                         |   |                                                |
|                         |   |    DistributedOptimizer.step(context_id)       |
|                         |   |                                                |
|                         |   |                                                |
|                         |   |                                                |
+-------------------------+   +------------------------------------------------+

Copy the code

0 x04 model

Let’s look at the concrete model.

4.1 components

The RNN model design borrows from the Word language model in PyTorch’s example library example, which contains three main components: embedded tables, LSTM layers, and decoders.

4.1.1 Reference Code

It is necessary to post the original reference code for comparison. It can be seen that both Embedding and Linear exist as member variables of RNNModel and the whole RNNModel is very tightly coupled.

class RNNModel(nn.Module) :
    """Container module with an encoder, a recurrent module, and a decoder."""

    def __init__(self, rnn_type, ntoken, ninp, nhid, nlayers, dropout=0.5, tie_weights=False) :
        super(RNNModel, self).__init__()
        self.ntoken = ntoken
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp) Insert table member variables
        if rnn_type in ['LSTM'.'GRU']:
            self.rnn = getattr(nn, rnn_type)(ninp, nhid, nlayers, dropout=dropout)
        else:
            nonlinearity = {'RNN_TANH': 'tanh'.'RNN_RELU': 'relu'}[rnn_type]
            self.rnn = nn.RNN(ninp, nhid, nlayers, nonlinearity=nonlinearity, dropout=dropout)
        self.decoder = nn.Linear(nhid, ntoken) Decoder member variables

			  # omit the last part of the code
Copy the code

4.1.2 Distributed Change

Let’s see how the above model can be modified to take into account the distributed nature.

The following code wraps the embedding table and decoder into sub-modules to pass their constructors to the RPC API. In the EmbeddingTable submodule, we intentionally put the embedded layer on the GPU for demonstration purposes. In V1.4, RPC always creates CPU tensor parameters or return values on the target worker process. If the function takes a GPU tensor, it needs to be explicitly moved to the appropriate device.

class EmbeddingTable(nn.Module) :
    r""" Encoding layers of the RNNModel """
    def __init__(self, ntoken, ninp, dropout) :
        super(EmbeddingTable, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp).cuda()
        self.encoder.weight.data.uniform_(-0.1.0.1)

    def forward(self, input) :
        return self.drop(self.encoder(input.cuda()).cpu()


class Decoder(nn.Module) :
    def __init__(self, ntoken, nhid, dropout) :
        super(Decoder, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.decoder = nn.Linear(nhid, ntoken)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-0.1.0.1)

    def forward(self, output) :
        return self.decoder(self.drop(output))
Copy the code

4.2 the RNN model

As mentioned earlier, to achieve parallel training of distributed models, developers can divide the model into submodules. With the submodules above, we can now combine them using RPC to create an RNN model. We’ll call RPC to create submodule instances remotely and use RRef to find them if necessary. As you can see in the code below, it looks very similar to stand-alone model parallel training. The main difference is that the tensor.to (device) is replaced with RPC.

Ps represents a parameter server that hosts the parameters of the embedded table and decoder. The constructor uses the remote API to create EmbeddingTable objects and decoder objects on the parameter server, and to create LSTM submodules locally.

During forward propagation, the trainer uses the EmbeddingTable RRef to find the remote submodule, and uses RPC to pass input data to the EmbeddingTable and get the lookup results. It then runs the embed through the local LSTM layer, and finally sends the output to the decoder submodule using another RPC.

class RNNModel(nn.Module) :
    def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5) :
        super(RNNModel, self).__init__()

        # setup embedding table remotely
        self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
        # setup LSTM locally
        self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
        # setup decoder remotely
        self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))

    def forward(self, input, hidden) :
        # pass input to the remote embedding table and fetch emb tensor back
        emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
        output, hidden = self.rnn(emb, hidden)
        # pass output to the rremote decoder and get the decoded output back
        decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
        return decoded, hidden
Copy the code

Therefore, the logical diagram is extended as follows:

                 torch.multiprocessing.spawn
                            +
                            |
                            |
          +-----------------+--------------------+
          |                                      |
          |                                      |
          v                                      v
+---------+------------+   +---------------------+-------------------------------------+
|"ps"        rank = 0  |   | "trainer"      rank = 1                                   |
|                      |   |                                                           |
|                      |   |   model = rnn.RNNModel('ps') | | | | | | +---------------+ | | +---------------------------------------+ | | |EmbeddingTable | | | | RNNModel | | |  | | | | | | | | | | <--------------+ self.emb_table_rref | | | +---------------+ | | | | | | +---------------+ | | | | | | |Decoder | <--------------+ self.decoder_rref | | | | | | | | | | | | | | | | self.rnn = LSTM | | | | | | | | | | | +---------------+ | | +---------------------------------------+ | | | | | | | | | | | | forward() { | | | | emb = _remote_method(EmbeddingTable.forward,input) |
|                      |   |       output, hidden = self.rnn(emb, hidden)              |
+----------------------+   |       decoded = _remote_method(Decoder.forward, output)   |
                           |   }                                                       |
                           |                                                           |
                           |                                                           |
                           |   dist_autograd.backward(context_id, [loss])              |
                           |                                                           |
                           |                                                           |
                           |   DistributedOptimizer.step(context_id)                   |
                           |                                                           |
                           +-----------------------------------------------------------+


Copy the code

4.3 Distributed optimizer

Before introducing the distributed optimizer, let’s add a helper function that generates the RRefs list of model parameters that the distributed optimizer will use. In local training, an application can call module.parameters () to get references to all parameter tensors and pass them to the local optimizer for subsequent updates. However, because some parameters exist on a remote machine, the same API does not work in a distributed training scenario. Therefore, instead of using a “tensor” list of parameters, the distributed optimizer uses a “RRef” list, with one “RRef” for each model parameter of local and remote model parameters. The helper functions are as simple as calling module.parameters () and creating a local ‘RRef’ on each parameter.

def _parameter_rrefs(module) :
    param_rrefs = []
    for param in module.parameters():
        param_rrefs.append(RRef(param))
    return param_rrefs
Copy the code

Then, since the RNNModel contains three submodules, we need to call _parameter_rrefs three times and wrap it in another helper function.

class RNNModel(nn.Module) :.def parameter_rrefs(self) :
        remote_params = []
        # get RRefs of embedding table
        remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
        # create RRefs for local parameters
        remote_params.extend(_parameter_rrefs(self.rnn))
        # get RRefs of decoder
        remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
        return remote_params
Copy the code

In the Trainer, generate a distributed optimizer using the example below, which takes some remote parameters as optimization objects.

# setup distributed optimizer
opt = DistributedOptimizer(
    optim.SGD,
    model.parameter_rrefs(),
    lr=0.05.)Copy the code

Our final expansion is as follows:

  • (1) The EMb_TABLE_rref member variable of RNNModel points to the EmbeddingTable on the parameter server.
  • (2) RNNModel decoder_rref member variable points to the Decoder on the parameter server.
  • (3) RNNModel’s RNN member variable points to the local LSTM.
  • The three variables to be optimized inside the DistributedOptimizer point to: 4) parameters to the EmbeddingTable on the parameter server, 5) parameters to the Decoder on the parameter server, and 6) parameters to the local LSTM.

They correspond to the numbers on the graph below.

                 torch.multiprocessing.spawn
                            +
                            |
                            |
            +---------------+--------------------+
            |                                    |
            |                                    |
            v                                    v
  +---------+------------+ +---------------------+----------------------------------------+
  |"ps"        rank = 0  | | "trainer"                                         rank = 1   |
  |                      | |                                                              |
  |                      | |   model = rnn.RNNModel('ps')                                 |
  |                      | |                                                              |
  |  +---------------+   | |   +---------------------------------------+                  |
  |  |EmbeddingTable |   | |   | RNNModel                              |                  |
+--->+               |   | | 1 |                                       |                  |
| |  |               +<------------+ self.emb_table_rref               |    +------+      |
| |  +---------------+   | |   |                            3          |    |LSTM  |  6   |
| |                      | |   |     self.rnn +---------------------------->+      +<---+ |
| |  +---------------+   | | 2 |                                       |    |      |    | |
| |  |Decoder        +<------------+ self.decoder_rref                 |    +------+    | |
| |  |               |   | |   |                                       |                | |
| |  |               |   | |   +---------------------------------------+                | |
| |  |               |   | |                                                            | |
| |  +------+--------+   | |   forward() {                                              | |
| |         ^            | |       emb = _remote_method(EmbeddingTable.forward, input)  | |
| |         |            | |       output, hidden = self.rnn(emb, hidden)               | |
| |         |            | |       decoded = _remote_method(Decoder.forward, output)    | |
| |         |            | |   }                                                        | |
| +----------------------+ |                                                            | |
|           |              |   dist_autograd.backward(context_id, [loss])               | |
|           |              |                                                            | |
| 5         | 4            |  +------------------------------------------------------+  | |
|           |              |  | DistributedOptimizer                                 |  | |
|           |              |  |                                                      |  | |
|           |              |  |     remote_optimizers = [                            |  | |
+-------------------------------------------------------+ optim_rref1,               |  | |
            |              |  |                           optim_rref2+------------------+ |
            +-------------------------------------------+ optim_rref3                |    |
                           |  |                                                      |    |
                           |  |                          ]                           |    |
                           |  |     step(context_id)                                 |    |
                           |  +------------------------------------------------------+    |
                           +--------------------------------------------------------------+
Copy the code

The mobile phone is as follows:

4.4 comparing

As mentioned earlier, distributed model parallel training looks very similar to stand-alone model parallel training. The main difference is that the tensor.to (device) is replaced with RPC. Let’s replace the parameter server with a GPU. Let’s make a rough comparison. It may not be exact, but you can see the key point of distributed training.

  +----------------------+ +-------------------------------------------------------------+
  | GPU                  | | CPU                                                rank = 0 |
  |                      | |                                                             |
  |                      | |   model = rnn.RNNModel()                                    |
  |                      | |                                                             |
  |  +---------------+   | |   +---------------------------------------+                 |
  |  |EmbeddingTable |   | |   | RNNModel                              |                 |
+--->+               |   | | 1 |                                       |                 |
| |  |               +<------------+ self.emb_table_rref               |   +------+      |
| |  +---------------+   | |   |                            3          |   |LSTM  |  6   |
| |                      | |   |     self.rnn +--------------------------->+      +<---+ |
| |  +---------------+   | | 2 |                                       |   |      |    | |
| |  |Decoder        +<------------+ self.decoder_rref                 |   +------+    | |
| |  |               |   | |   |                                       |               | |
| |  |               |   | |   +---------------------------------------+               | |
| |  |               |   | |                                                           | |
| |  +------+--------+   | |   forward() {                                             | |
| |         ^            | |       emb = EmbeddingTable.forward(input)                 | |
| |         |            | |       output, hidden = self.rnn(emb, hidden)              | |
| |         |            | |       decoded = Decoder.forward(output)                   | |
| |         |            | |   }                                                       | |
| +----------------------+ |                                                           | |
|           |              |   loss.backward()                                         | |
|           |              |                                                           | |
| 5         | 4            |  +----------------------------------------+               | |
|           |              |  | Optimizer                              |               | |
|           |              |  |                                        |               | |
|           |              |  |          param_groups = [              |               | |
+-------------------------------------------------------+ optim_rref1, |               | |
            |              |  |                                        |               | |
            |              |  |                           optim_rref2+-----------------+ |
            |              |  |                                        |                 |
            +-------------------------------------------+ optim_rref3  |                 |
                           |  |                          ]             |                 |
                           |  |          step()                        |                 |
                           |  |                                        |                 |
                           |  +----------------------------------------+                 |
                           +-------------------------------------------------------------+
Copy the code

The mobile phone is as follows:

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

GETTING STARTED WITH DISTRIBUTED RPC FRAMEWORK