0 x00 the

In the previous articles, we looked at the basic modules of PyTorch’s distributed logic. The next few articles will take a look at how to put these modules into practice and give a general overview of PyTorch’s distributed logic. This article describes how to implement a parameter server using a distributed RPC framework.

In this paper, pytorch.org/tutorials/i… To add their own understanding.

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

PyTorch Distributed Autograd (4) —-

PyTorch Distributed Autograd (5) —-

PyTorch Distributed Autograd (6) —-

PyTorch Distributed optimizer (1)—- cornerstone

PyTorch Distributed optimizer (2)—- Data parallel optimizer

PyTorch Distributed optimizer (3)—- model parallel

PyTorch Distributed (14) — Use Distributed Autograd and Distributed Optimizer

Note: This article is not translated exactly in the order of the original, but reorganized according to my own understanding.

0 x01 review

This tutorial introduces a simple example of implementing a parameter server using PyTorch’s distributed RPC framework. The parameter server framework is a paradigm that consists of a set of servers used to store parameters, such as large embedded tables, and multiple trainers query the parameter server to retrieve the latest parameters. These trainers can run a training loop locally, occasionally synchronizing with the parameter server to get the latest parameters. For more information about the parametric server approach, see www.cs.cmu.edu/~muli/file/…

We will use the distributed RPC framework to build an example where multiple trainers use RPC to communicate with the same parameter server and use RRef to access state on a remote parameter server instance. Each trainer will spliced an Autograd calculation graph across multiple nodes by using distributed Autograd, and start each trainer’s own back propagation in a distributed manner.

Note: This tutorial introduces the use of a distributed RPC framework that can be used to split the model across multiple machines or to implement a parameter server training strategy (the trainer gets parameters hosted on a different machine). If you are looking for data parallelism training across multiple GPU replication models, see the Distributed Data Parallelism Tutorial.

0x02 Basic Network

We’ll start by introducing the basic network. Let’s start with the familiar: import the modules we need and define a simple ConvNet that will be trained on the MNIST dataset. The following networks are mainly from the networks defined in PyTorch/Examples Repo.

# --------- MNIST Network to train, from pytorch/examples -----
class Net(nn.Module) :
    def __init__(self, num_gpus=0) :
        super(Net, self).__init__()
        print(f"Using {num_gpus} GPUs to train")
        self.num_gpus = num_gpus
        device = torch.device(
            "cuda:0" if torch.cuda.is_available() and self.num_gpus > 0 else "cpu")
        print(f"Putting first 2 convs on {str(device)}")
        # Put conv layers on the first cuda device
        self.conv1 = nn.Conv2d(1.32.3.1).to(device)
        self.conv2 = nn.Conv2d(32.64.3.1).to(device)
        # Put rest of the network on the 2nd cuda device, if there is one
        if "cuda" in str(device) and num_gpus > 1:
            device = torch.device("cuda:1")

        print(f"Putting rest of layers on {str(device)}")
        self.dropout1 = nn.Dropout2d(0.25).to(device)
        self.dropout2 = nn.Dropout2d(0.5).to(device)
        self.fc1 = nn.Linear(9216.128).to(device)
        self.fc2 = nn.Linear(128.10).to(device)

    def forward(self, x) :
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.max_pool2d(x, 2)

        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        # Move tensor to next device if necessary
        next_device = next(self.fc1.parameters()).device
        x = x.to(next_device)

        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output
Copy the code

0x03 Auxiliary function

Next, let’s define some helper functions that will be useful for the rest of our script. Let’s use rpc_sync and RRef to define a function that calls the given method on the object on the remote node. We generate a handle to the remote object from the given rref argument so that we can run the remote object on the node that owns it (rref.owner()). On the caller node, we run this command synchronously by using rpc_sync, which means we block until we receive the response.

# --------- Helper Methods --------------------

# On the local node, call a method with first arg as the value held by the
# RRef. Other args are passed in as arguments to the function called.
# Useful for calling instance methods.
def call_method(method, rref, *args, **kwargs) :
    return method(rref.local_value(), *args, **kwargs)

# Given an RRef, return the result of calling the passed in method on the value
# held by the RRef. This call is done on the remote node that owns
# the RRef. args and kwargs are passed into the method.
# Example: If the value held by the RRef is of type Foo, then
# remote_method(Foo.bar, rref, arg1, arg2) is equivalent to calling
# <foo_instance>.bar(arg1, arg2) on the remote node and getting the result
# back.
def remote_method(method, rref, *args, **kwargs) :
    args = [method, rref] + list(args)
    return rpc.rpc_sync(rref.owner(), call_method, args=args, kwargs=kwargs)
Copy the code

0 x04 start

4.1 Startup Mode

To run this use case locally, run the following command in a separate terminal window to start the master and worker: python rpc_parameter_server.py –world_size= world_size –rank= rank.

  • Python rpc_parameter_server.py –world_size=2 –rank=0 for the master node whose world size is 2.

  • For the trainer, the command is python rpc_parameter_server.py –world_size=2 –rank=1.

Note that this tutorial assumes zero to two Gpus for training, which can be configured by passing –num_gpus=N to the training script. When the trainer and master are running on different machines, you can pass in the command line arguments –master_addr=ADDRESS and –master_port=PORT to indicate which ADDRESS and PORT the master worker is listening on.

4.2 Startup Script

First, let’s look at the various parameters needed to start the parameter server and trainer.

  • world_sizeCorresponding to the total number of nodes that will participate in training, is the sum of all trainers and parameter servers.
  • We must also pass a unique for each individual processrank, the value ranges from 0 (where a parameter server will run) toworld_size - 1.
  • master_addrmaster_portIs used to identify the Rank 0 process that each node will use to discover each other.
  • To test this example locally, simply pass inlocalhostAnd the samemaster_portGo to all the instances.

Note that for demonstration purposes, this example only supports 0-2 Gpus, but you can extend the pattern to use other Gpus.

# --------- Launcher --------------------
if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="Parameter-Server RPC based training")
    parser.add_argument(
        "--world_size".type=int,
        default=4.help="""Total number of participating processes. Should be the sum of master node and all training nodes.""")
    parser.add_argument(
        "--rank".type=int,
        default=None.help="Global rank of this process. Pass in 0 for master.")
    parser.add_argument(
        "--num_gpus".type=int,
        default=0.help="""Number of GPUs to use for training, currently supports between 0 and 2 GPUs. Note that this argument will be passed to the parameter servers.""")
    parser.add_argument(
        "--master_addr".type=str,
        default="localhost".help="""Address of master, will default to localhost if not provided. Master must be able to accept network traffic on the address + port.""")
    parser.add_argument(
        "--master_port".type=str,
        default="29500".help="""Port that master is listening on, will default to 29500 if not provided. Master must be able to accept network traffic on the host and port.""")

    args = parser.parse_args()
    assert args.rank is not None."must provide rank argument."
    assert args.num_gpus <= 3.f"Only 0-2 GPUs currently supported (got {args.num_gpus})."
    os.environ['MASTER_ADDR'] = args.master_addr
    os.environ['MASTER_PORT'] = args.master_port
Copy the code

Now we will create a parameter server process or trainer process based on the command line arguments. If the rank passed in is 0, we will create a ParameterServer, otherwise a TrainerNet.

Note that we use torch. Multiprocessing to start the child process corresponding to the function we are executing and use P.in () on the main thread to wait for the process to finish. We also used PyTorch Dataloaders to generate training dataloaders and test dataloaders for loading data from the MNIST dataset.

processes = []
world_size = args.world_size
if args.rank == 0:
    This is the parameter server
    p = mp.Process(target=run_parameter_server, args=(0, world_size))
    p.start()
    processes.append(p)
else:
    # This is trainer
    # Get data to train on
    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('.. /data', train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=32, shuffle=True)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST('.. /data', train=False,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=32, shuffle=True)
    # start training worker on this node
    p = mp.Process(
        target=run_worker, # start trainer
        args=(
            args.rank,
            world_size, args.num_gpus,
            train_loader,
            test_loader))
    p.start()
    processes.append(p)

for p in processes:
    p.join()
Copy the code

The current logic is as follows. We assume that a master and a worker will run different codes in their own processes:

rpc_parameter_server.py      Master      +    Worker         rpc_parameter_server.py
           +                             |                          +
           |                             |                          |
           | rank == 0                   |                          |
           |                             |                          |
           v                             |                          v
                                         |
run_parameter_server                     |                    mp.Process(run_worker)
                                         |
                                         |
                                         |
                                         +

Copy the code

4.3 Starting the Parameter Server

First, we will initialize our parameter server. Note that there is only one parameter server instance in all processes, and all trainers will talk to the same parameter server and update the same storage model. As you can see in run_parameter_server, the server itself does not perform any independent actions; it simply waits for requests from the trainer and responds to them by running the requested functions.

There are two main steps in the code: initialize RPC and rpc.shutdown() for the parameter server. Note that there is no actual initialization of the parameter server.

Note that rpc.shutdown() does not immediately shutdown the parameter server. Instead, it waits for all workers (trainers in this case) to also call rpc.shutdown(). This ensures that the parameter server does not go down until all trainers have completed the training process.

def run_parameter_server(rank, world_size) :
    # The parameter server just acts as a host for the model and responds to
    # requests from trainers, hence it does not need to run a loop.
    # rpc.shutdown() will wait for all workers to complete by default, which
    # in this case means that the parameter server will wait for all trainers
    # to complete, and then exit.
    print("PS master initializing RPC")
    rpc.init_rpc(name="parameter_server", rank=rank, world_size=world_size)
    print("RPC initialized! Running parameter server...")
    rpc.shutdown() # Guarantee no downtime
    print("RPC shutdown on parameter server.")
Copy the code

Logic expansion is as follows:

          rpc_parameter_server.py      Master      +    Worker    rpc_parameter_server.py
                     +                             |                     +
                     |                             |                     |
                     | rank == 0                   |                     |
                     |                             |                     |
                     v                             |                     v
                                                   |
          run_parameter_server                     |               mp.Process(run_worker)
                     +                             |
                     |                             |
                     |                             |
                     v                             |
                                                   |
rpc.init_rpc("parameter_server", rank, world_size) |
                                                   |
                                                   +

Copy the code

4.4 start the worker

The internal logic of the run_worker function also starts RPC and then enters the main loop.

# Main loop for trainers.
def run_worker(rank, world_size, num_gpus, train_loader, test_loader) :
    print(f"Worker rank {rank} initializing RPC")
    rpc.init_rpc(
        name=f"trainer_{rank}",
        rank=rank,
        world_size=world_size)

    print(f"Worker {rank} done initializing RPC")

    run_training_loop(rank, num_gpus, train_loader, test_loader)
    rpc.shutdown()
Copy the code

Logic expansion is as follows:

 rpc_parameter_server.py      Master      +    Worker    rpc_parameter_server.py
            +                             |                     +
            |                             |                     |
            | rank == 0                   |                     |
            |                             |                     |
            v                             |                     v
                                          |
 run_parameter_server                     |               mp.Process(run_worker)
            +                             |                     +
            |                             |                     |
            |                             |                     |
            v                             |                     |
                                          |                     v
rpc.init_rpc("parameter_server"           |               run_worker
             ,rank,world_size)            |                     +
                                          |                     |
                                          |                     |
                                          |                     |
                                          |                     v
                                          |       rpc.init_rpc(f"trainer_{rank}"
                                          |                    ,rank,world_size)
                                          |                     +
                                          |                     |
                                          |                     |
                                          |                     v
                                          +             run_training_loop

Copy the code

4.5 Setting up the Parameter Server

Get_parameter_server is where you build the parameter server.

param_server = None
global_lock = Lock()

def get_parameter_server(num_gpus=0) :
    global param_server
    # Ensure that we get only one handle to the ParameterServer.
    with global_lock:
        if not param_server:
            # construct it once
            param_server = ParameterServer(num_gpus=num_gpus)
        return param_server
Copy the code

Where exactly is the parameter server set up? In fact, TrainerNet is initialized in the main cycle of worker, which will be discussed later.

0x05 TrainerNet

Next, we will define our TrainerNet class. This is also a subclass of nn.Module, and our __init__ method will use the rpc.remoteAPI to get an RRef or remote reference to our parameter server. Note that we are not copying the parameter server to our local process here; instead, we can think of self.param_server_rref as a distributed shared pointer to the parameter server located on a separate process.

TrainerNet is a training function, not a business function. It must be distinguished from the previous class Net(nn.Module). TrainerNet is just a staging station or adapter implemented for the overall code logic needs. TrainNet has a param_server_rref that points to ParameterServer. So ParameterServer is available via TrainerNet.

5.1 General Code

# --------- Trainers --------------------

# nn.Module corresponding to the network trained by this trainer. The
# forward() method simply invokes the network on the given parameter
# server.
class TrainerNet(nn.Module) :
    def __init__(self, num_gpus=0) :
        super().__init__()
        self.num_gpus = num_gpus
        self.param_server_rref = rpc.remote(
            "parameter_server", get_parameter_server, args=(num_gpus,)) Build parameter server

    def get_global_param_rrefs(self) : # This function will be used later
        remote_params = remote_method(
            ParameterServer.get_param_rrefs,
            self.param_server_rref)
        return remote_params

    def forward(self, x) :
        model_output = remote_method(
            ParameterServer.forward, self.param_server_rref, x)
        return model_output
Copy the code

5.2 Generating a Parameter Server

The parameter server is generated in the initialization method with the following code.

def __init__(self, num_gpus=0) :
    super().__init__()
    self.num_gpus = num_gpus
    self.param_server_rref = rpc.remote(
        "parameter_server", get_parameter_server, args=(num_gpus,))
Copy the code

Get_parameter_server is called by the worker, but get_parameter_server runs on the master server. Set up parameter server on master.

The logic is expanded as follows:

 rpc_parameter_server.py      Master      +    Worker    rpc_parameter_server.py
            +                             |                     +
            |                             |                     |
            | rank == 0                   |                     |
            |                             |                     |
            v                             |                     v
                                          |
 run_parameter_server                     |               mp.Process(run_worker)
            +                             |                     +
            |                             |                     |
            |                             |                     |
            v                             |                     |
                                          |                     v
rpc.init_rpc("parameter_server"           |               run_worker
             ,rank,world_size)            |                     +
            +                             |                     |
            |                             |                     |
            |                             |                     |
            |                             |                     v
            |                             |       rpc.init_rpc(f"trainer_{rank}"
            |                             |                    ,rank,world_size)
            |                             |                     +
            |                             |                     |
            |                             |                     |
            |                             |                     v
            |                             |             run_training_loop
            |                             |                     +
            |                             |                     |
            |                             |                     |
            |                             |                     v
            |                             |     net = TrainerNet(num_gpus=num_gpus)
            |                             |                     +
            |                             |                     |
            |                             |                     |
            v                             |                     v
    get_parameter_server  <-------------------+ self.param_server_rref = rpc.remote(
            +                             |                 "parameter_server",
            |                             |                 get_parameter_server,
            |                             |                 args=(num_gpus,))
            v                             |
      ParameterServer                     |
                                          +

Copy the code

Now that we’ve completed the initialization phase, let’s look at the runtime phase.

5.3 establish rref

Within TrainerNet, there is also a get_global_PARAM_rrefs method, which we will describe how to use later. But I’m going to analyze this method first.

Why provide this method? It is necessary to read through the DistributedOptimizer documentation, especially the API signatures. Our optimizer needs to optimize some remote parameters, but how do we pass them in when we build the optimizer locally? These remote parameters are locally rRefs, so when we build the optimizer locally, we pass the list of RRefs representing the remote parameters.

Since the only remote node interacting with a given TrainerNet is ParameterServer, we only need to call a remote_method on ParameterServer. So we use the get_param_rrefs method defined in the ParameterServer class. This method returns a list of RRefs pointing to the parameters that need to be optimized.

Please note that in this case we TrainerNet did not define our own parameters; If it defines its own parameters (which need to be optimized), we also need to wrap each parameter in a RRef and include it in the DistributedOptimizer input.

class TrainerNet(nn.Module) :.def get_global_param_rrefs(self) :
        remote_params = remote_method(
            ParameterServer.get_param_rrefs,
            self.param_server_rref)
        return remote_params
Copy the code

5.4 Forward function

Let’s look again at the Forward method, which invokes (synchronous) RPC to run forward propagation of the model network defined on ParameterServer. We pass self.param_server_rref, which is a remote handle for ParameterServer to run RPC. Calling forward will send an RPC to the node where ParameterServer is running to call the ParameterServer’s forward function and return the result Tensor corresponding to the output of the model.

class TrainerNet(nn.Module) :.def forward(self, x) :
        model_output = remote_method(
            ParameterServer.forward, self.param_server_rref, x)
        return model_output
Copy the code

0x06 Parameter server

Let’s look at the parameter server.

6.1 General Code

The overall code of the parameter server is as follows:

# --------- Parameter Server --------------------
class ParameterServer(nn.Module) :
    def __init__(self, num_gpus=0) :
        super().__init__()
        model = Net(num_gpus=num_gpus)
        self.model = model
        self.input_device = torch.device(
            "cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu")

    def forward(self, inp) :
        inp = inp.to(self.input_device)
        out = self.model(inp)
        This output is Forwardedover RPC, which as of 1.5.0 only accepts CPU tensors.
        # Tensors must be moved in and out of GPU memory due to this.
        out = out.to("cpu")
        return out

    # Use dist autograd to retrieve gradients accumulated for this model.
    # Primarily used for verification.
    def get_dist_gradients(self, cid) :
        grads = dist_autograd.get_gradients(cid)
        This output is Forwardedover RPC, which as of 1.5.0 only accepts CPU tensors.
        # Tensors must be moved in and out of GPU memory due to this.
        cpu_grads = {}
        for k, v in grads.items():
            k_cpu, v_cpu = k.to("cpu"), v.to("cpu")
            cpu_grads[k_cpu] = v_cpu
        return cpu_grads

    # Wrap local parameters in a RRef. Needed for building the
    # DistributedOptimizer which optimizes parameters remotely.
    def get_param_rrefs(self) :
        param_rrefs = [rpc.RRef(param) for param in self.model.parameters()]
        return param_rrefs
Copy the code

6.2 the initialization

As mentioned earlier, the formal initialization of the parameter server is done in TrainerNet, so let’s look at how to initialize it.

The parameter server is a derived class of nn.Module that holds handles to the model network defined above. This is done using the business model Network Class Net(nn.module) introduced earlier to generate internal Model member variables. The parameter server then uses this for specific parameter processing. We will also save an input device to which our input needs to be transferred before invoking the model.

# --------- Parameter Server --------------------
class ParameterServer(nn.Module) :
    def __init__(self, num_gpus=0) :
        super().__init__()
        model = Net(num_gpus=num_gpus) # Our business network model
        self.model = model # self. Model is the handle
        self.input_device = torch.device( # Input device
            "cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu")
Copy the code

6.3 Forward function

Next, we will define the forward propagation function. Note that we move the output to the CPU regardless of the device on which the model outputs, because the distributed RPC framework currently only supports sending CPU tensors through RPC. We deliberately disallowed SENDING CUDA tensors over RPC because the caller/callee may use a different device (CPU/GPU) and may support RPC sending CUDA tensors in future releases.

class ParameterServer(nn.Module) :.def forward(self, inp) :
        inp = inp.to(self.input_device)
        out = self.model(inp)
        This output is Forwardedover RPC, which as of 1.5.0 only accepts CPU tensors.
        # Tensors must be moved in and out of GPU memory due to this.
        out = out.to("cpu")
        return out
Copy the code

6.4 Miscellaneous Functions

Next, we’ll define some miscellaneous functions that are useful for training and validation.

  • get_dist_gradientsA distributed Autograd context ID is received and calleddist_autograd.get_gradientsTo retrieve the gradient calculated by distributed Autograd. More information can be found hereFound in the distributed Autograd document. Note that we also iterate over the generated dictionary and convert each tensor to a CPU tensor, as the framework currently only supports sending tensors over RPC.
  • get_param_rrefsWe will iterate over our model parameters and wrap them as (local) RRefs. This method will be called by the Trainer node via RPC and will return a list of parameters to be distributed optimized. This parameter list will be used asDistributed optimizerTherefore, the parameter server must convert all parameters that must be optimized toRRefS list. The corresponding code gets the Net parameters and ultimately returns them to the DistributedOptimizer on the worker side.
# Use dist autograd to retrieve gradients accumulated for this model.
# Primarily used for verification.
def get_dist_gradients(self, cid) :
    grads = dist_autograd.get_gradients(cid)
    This output is Forwardedover RPC, which as of 1.5.0 only accepts CPU tensors.
    # Tensors must be moved in and out of GPU memory due to this.
    cpu_grads = {}
    for k, v in grads.items():
        k_cpu, v_cpu = k.to("cpu"), v.to("cpu")
        cpu_grads[k_cpu] = v_cpu
    return cpu_grads

# Wrap local parameters in a RRef. Needed for building the
# DistributedOptimizer which optimizes paramters remotely.
def get_param_rrefs(self) :
    param_rrefs = [rpc.RRef(param) for param in self.model.parameters()]
    return param_rrefs
Copy the code

6.5 Logical Relationship

We need a logical diagram to sort it out:

  1. When the DistributedOptimizer is generated, the get_global_PARAM_rrefs method of TrainerNet is called to get the parameters that need to be distributed optimized.
  2. TrainerNet calls get_param_rrefs of ParameterServer to fetch the ParameterServer.
  3. ParameterServer calls the Parameters () method of Net to obtain the final required parameters.
  4. These parameters are returned to the DistributedOptimizer, which then optimizes them.
                        Master             +      Worker
                                           |
                                           |
+--------------------+                     |   +----------------------------------------+
| ParameterServer    |                     |   | run_training_loop                      |
|                    |           4         |   |     +-------------------------+        |
|                    | +-------------------------->  | TrainerNet              |        |
|                    |                     |   |     |                         |        |
|                    | <---------------------------+ |                         |        |
|        model       |   2get_param_rrefs | | | | | | ^ + | | | | | | | | | | | | | | | | | | | | | +---+----+----------------+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | ^ | | | | | | | | | | | |4 |  1 | get_global_param_rrefs  |
       4 | 3 | model.parameters()          |   |         |    |                         |
         |   |                             |   |         v    |                         |
         |   v                             |   |     +---+----+----------------+        |
+--------+---+-------+                     |   |     | DistributedOptimizer    |        |
| Net                |                     |   |     |                         |        |
|                    |                     |   |     |                         |        |
|                    |                     |   |     |                         |        |
|                    |                     |   |     +-------------------------+        |
+--------------------+                     |   +----------------------------------------+
                                           +

Copy the code

0x07 Worker Main loop

Now that the initialization is complete and the parameter server is parsed, let’s look at the worker main loop, which creates our network and optimizer, runs some inputs over the network and calculates losses. The training loop looks a lot like a native trainer, but since our model network is distributed, some changes have been made.

7.1 General Code

In the main loop, we initialize “TrainerNet” and build “DistributedOptimizer”.

Note that, as mentioned above, we must pass in all global (across all nodes participating in distributed training) parameters that need to be optimized. In addition, we pass in the native optimizer to use, SGD in this case. In addition, we can configure the underlying optimizer algorithm in the same way we created the local optimizer. For example, we can pass in a custom learning rate that will be used as the learning rate for all local optimizers.

The code of worker main loop run_training_loop is as follows, where model_output = net(data) will call forward method of TrainerNet.

def run_training_loop(rank, num_gpus, train_loader, test_loader) :
    # Runs the typical neural network forward + backward + optimizer step, but
    # in a distributed fashion.
    net = TrainerNet(num_gpus=num_gpus)
    # Build DistributedOptimizer.
    param_rrefs = net.get_global_param_rrefs()
    opt = DistributedOptimizer(optim.SGD, param_rrefs, lr=0.03)
    for i, (data, target) in enumerate(train_loader):
        with dist_autograd.context() as cid:
            # 1. Call TrainerNet's forward
            model_output = net(data)
            target = target.to(model_output.device)
            # 2. Count your losses
            loss = F.nll_loss(model_output, target)
            if i % 5= =0:
                print(f"Rank {rank} training batch {i} loss {loss.item()}")
            # 3. Back propagation
            dist_autograd.backward(cid, [loss])
            # Ensure that dist autograd ran successfully and gradients were
            # returned.
            # 4. Validate
            assertremote_method( ParameterServer.get_dist_gradients, net.param_server_rref, cid) ! = {} opt.step(cid)print("Training complete!")
    print("Getting accuracy....")
    get_accuracy(test_loader, net)
Copy the code

7.2 training

We need to analyze the specific training code in run_training_loop, which is the main training loop.

We loop through iterables given by PyTorch’s DataLoader. Before writing a typical forward/back/optimizer loop, we first wrapped the logic in the Distributed Autograd context. Note that RPC calls in the forward propagation of the model need to be recorded so that an appropriate computation graph can be constructed. It includes all the distributed worker processes involved in backward propagation. The Distributed Autograd Context returns a “context_ID”, which is an identifier that identifies the gradient accumulation and optimization for a particular iteration.

Instead of calling the typical loss.Backward () to initiate backward propagation of the local worker process, we call dist_autograd.backward() in the context, passing in loss and context_id, which is the root from which we want backward propagation to start. In addition, we pass this “context_id” to the optimizer call, because the optimizer call needs to be able to find the corresponding gradient on all nodes for this particular backpass calculation.

for i, (data, target) in enumerate(train_loader):
    with dist_autograd.context() as cid:
        # 1. Call TrainerNet's forward
        model_output = net(data)
        target = target.to(model_output.device)
        # 2. Count your losses
        loss = F.nll_loss(model_output, target)
        if i % 5= =0:
            print(f"Rank {rank} training batch {i} loss {loss.item()}")
        # 3. Back propagation
        dist_autograd.backward(cid, [loss])
        # Ensure that dist autograd ran successfully and gradients were
        # returned.
        # 4. Validate
        assertremote_method( ParameterServer.get_dist_gradients, net.param_server_rref, cid) ! = {}# 5. Updated
        opt.step(cid)
Copy the code

The current general idea is as follows:

  • ParameterServer runs on top of Master, which contains the Net business model.
  • The trainer runs on Worker, including TrainerNet, which is only a transfer station or adapter implemented for the overall code logic needs. TrainNet has a param_server_rref that points to ParameterServer, so ParameterServer is available via TrainerNet.
  • The Forward operation process is trainernet. Forward > parameterServer. Forward -> net.forward ().
  • The concrete Backward is done automatically by calling the dist_autograd.backward with the dist. Autograd engine.
  • Optimizer updates are done automatically by the DistributedOptimizer.

We extend the previous logic (ignoring the initialization part) as shown below, and the values in the figure below correspond to the values in the training code comments.

             Master      +    Worker
                         |
  ParameterServer        |        run_training_loop                  TrainerNet
        +                |                +                              +
        |                |                |                              |
        |                |                v                              |
        |                |          net = TrainerNet                     |
        |                |                +                              |
        |                |                |                              |
        +                |                |                              |
model = Net(num_gpus)    |                v                              |
        +                |     param_server_rref = rpc.remote(           |
        |                |          "parameter_server". | | | get_parameter_server,) | | | + | | | | | | | | | | | opt = DistributedOptimizer(param_rrefs) | | | + | | | | | | |  | | | | v | | | model_output = net(data) | | | + | | | |1                 |
        |                |                +----------------------------> |
        |                |                |                              +
        |                |                |                   ParameterServer.forward
        |                |                |                              +
        |                |                |            2| | <--------------------------------------------------------------+ + | | | forward | | | + | | | | | | | + | | | out =  self.model(inp) | | | + | | | |return out |      3         |                              |
        +------------------------------>  |                              |
        |                |                |                              |
        |                |                +                              |
        |                |          F.nll_loss                           |
        |                |                +                              |
        |                |         dist_autograd.backward                |
        |         4      |                +                              |
        | <----------------------get_dist_gradients                      |
        |                |                +                              |
        |                |                |                              |
        |         5      |                +                              |
        | <-----------------------+ opt.step                             |
        |                |                |                              |
        |                |                |                              |
        v                +                v                              v
Copy the code

7.3 accuracy

The following is the calculation of the model’s accuracy after we complete the training, much like a traditional local model. Note, however, that the network we pass to this function is an instance of TraineNet, so forward propagation calls RPC in a transparent manner.

def get_accuracy(test_loader, model) :
    model.eval()
    correct_sum = 0
    # Use GPU to evaluate if possible
    device = torch.device("cuda:0" if model.num_gpus > 0
        and torch.cuda.is_available() else "cpu")
    with torch.no_grad():
        for i, (data, target) in enumerate(test_loader):
            out = model(data, -1)
            pred = out.argmax(dim=1, keepdim=True)
            pred, target = pred.to(device), target.to(device)
            correct = pred.eq(target.view_as(pred)).sum().item()
            correct_sum += correct

    print(f"Accuracy {correct_sum / len(test_loader.dataset)}")
Copy the code

0 x08 summary

Ps-lite is a classic implementation of a parametric server, but it is not pS-Lite.

  • Ps-lite is similar to a traditional server implementation, with its own active business loop that can respond to explicit user requests, its own explicit logic, and its own KV storage locally.
  • In PyTorch’s two official documents, the parameter server is another idea. There is no active loop, NO KV storage, and no server logic. Instead, the business model can be stored directly, and the business is driven by the Trainer.

How to achieve this depends on the user’s own business needs.

The pS-Lite article is linked below for comparison if you are interested.

Machine learning parameters server PS-Lite (4) —– application node implementation

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Pytorch.apachecn.org/docs/1.7/65…

Pytorch.org/tutorials/i…