0 x00 the

This is the fifth part of the PyTorch distributed series. Based on the translation of several official documents, I have added my own reflections and led you to explore other fields. In the next five or six parts, I will make an in-depth analysis.

Other articles in this series are as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

0x01 Data Parallel

DistributedDataParallel is data parallelism, so let’s first review what data parallelism is through two diagrams.

The first image is from www.cnblogs.com/yh-blog/p/1… Its original provenance is unknown.

We can see the difference between model parallelism and data parallelism.

The second figure comes from FairScale Github source code, which clearly shows a data parallel operation mode, including:

Model sharding, local forward calculation, local back propagation, AllReduce to synchronize gradients, and local gradient update.

0x02 DDP Running logic

The Torch. Distributed package provides multi-process parallel communication primitives for PyTorch on multiple compute nodes, enabling parallelization of computation across processes and clusters. Torch. Nn. The parallel. DistributedDataParallel based on the torch. The function of the distributed package provides a synchronous distributed training wrapper, the wrapper to PyTorch model encapsulation for training. Its core function is multiprocess-based communication, which is different from the parallelism provided by Multiprocessing package-torch. Multiprocessing and DataParrallel.

The following is the overall architecture of DDP. You can see the position of DDP in the architecture, dependencies and so on. Image from source code.

We use a diagram to illustrate the operation logic of DDP.

Photos come from www.telesens.co/2019/04/04/…

The specific logic is as follows:

  1. Loading model phase. Each GPU owns a copy of the model, so there is no need to copy the model. Processes with rank 0 broadcast network initialization parameters to each of the other processes, ensuring that models in each process have the same initialization value.
  2. Load data phase. DDP does not broadcast data, but loads data in parallel using multiple processes. On host, each worker process loads its own data from the hard disk to Page-locked memory. The DistributedSampler ensures that the data loaded by each process does not overlap each other.
  3. Forward propagation phase. Running forward propagation on each GPU computes the output. Each GPU performs the same training, so there is no need to have a primary GPU.
  4. Count the losses. Calculate losses on each GPU.
  5. Back propagation phase. Run backward propagation to calculate gradients, and perform all-reduce operations on gradients while calculating gradients.
  6. Update model parameters stage. Because each GPU starts training from exactly the same model, and the gradient is all-reduced, each GPU ends up with the same copy of the mean gradient at the end of the backpropagation, weight updates are the same on all Gpus, and model synchronization is not required. Note that during each iteration, Buffers in the model need to be broadcast from the process with rank 0 to other processes in the process group.

0x03 VS DataParallel

3.1 Essential Differences

Since DataParallel can be used for DataParallel training, why distribute DataParallel? Here we need to know the implementation principles and differences between the two methods:

  • Large model training.

    • If the model is too large to fit on a single GPU, it must be split into multiple Gpus using model parallelism.

      • DataParallel – Gradient training for large-scale models is difficult because models must be put into a single GPU. That is, it is impossible to cooperate with model parallelization (splitting a model across multiple Gpus).
      • Distributeddata aparallel can include only parts of a large model, so you can work with model parallelism.
    • If the data is too large to fit on a single computer, data parallelism is required.

      • In this case, each DistributedDataParallel process can use the model in parallel, and all processes will use the data in parallel. It’s not much different from DP at this point.
    • If your model needs to span multiple machines, or if your use cases do not fit the data parallelism paradigm, see the RPC API for more general distributed training support.

  • Multi-process or multi-thread:

    • DataParallel is a single-process, multi-threaded, parallel training scheme that can only be run on a single machine.

    • DistributedDataParallel is multi – process and suitable for single – machine and multi – machine training. DistributedDataParallel also replicates the model up front, rather than at each iteration, and avoids global interpreter locking.

      • Each process maintains its own optimizer and performs a complete optimization step in each iteration. Since the gradients are already gathered and averaged across processes, the gradients are the same for each process, which eliminates the need to broadcast parameter steps, thus reducing the time to transmit the tensor between nodes.
      • Each process contains a separate Python interpreter, thus eliminating the extra interpreter overhead and “GIL turbulence “of a single Python process driving multiple threads of execution, model copies, or gpus.GIL-thrashing). For models that rely heavily on the Python runtime, such as includeRNNLayers or a large number of small piecesmodels(This is especially important.
    • Even on a single machine, DataParallel is generally slower than DistributedDataParallel because of cross-thread GIL contention, the replicated model at each iteration, and the additional overhead of dividing inputs and gathering outputs.

3.2 Realizing differences

The specific implementation differences between DDP and DP are as follows:

  • About optimizers:

    • DDP: Within each iteration, each DDP process has its ownoptimizer, each process completes all optimization steps independently, just like non-distributed training.
    • DP: There is only one in DPoptimizer, executed on the main thread. For the variousGPUThe upper gradient is summed while the principalGPUUpdate the parameters, and then add the model parametersbroadcastTo the otherGPU.
  • Student: Gradient.

    • DDP: Each process calculates losses on its GPU, runs backward propagation to calculate gradients, and performs all-reduce operations on gradients while calculating gradients.
    • DP: After the gradient calculation of each process is completed, each process needs to summarize the gradient to the main process, which uses the gradient to update the model weight, and then broadcast model to all processes (other Gpus) for the next training.
  • About the dissemination of data:

    • DDP: Only a small amount of data such as gradient is exchanged. Due to the model in each process, the initial parameters are consistent (the initial time is done oncebroadcast), and the gradient used to update parameters is consistent each time, so the model parameters of each process are always consistent. In contrast toDataParallelSpeaking,torch.distributedLess data is transferred, so it’s faster and more efficient.

    • DP: In each iteration, there are a number of interactions, such as models, forward outputs, losses, gradients, etc.

0 x04 use

The basic flow of distributed usage in Pytorch is as follows:

  1. First you need to useinit_process_groupInitializes the process group and at the same timedistributedPackage before you can use itdistributedOther functions of the package.
  2. If intra-group communication is required, usenew_groupCreate subgroups.
  3. useDDP(model, device_ids=device_ids)Create a DistributedDataParalle model.
  4. Create a distribution for the data setSampler.
  5. Using the Startup Tooltorch.distributed.launchStart training by executing the script on each host.
  6. usedestory_process_group()Destroy the process group.

4.1 Basic Examples

First, we use pytorch.org/tutorials/i… To have a look.

4.1.1 Setting process Groups

At the beginning of the example, we first set up the process group correctly.

The parameters of init_process_group are explained as follows:

  • “Gloo” indicates that “gloo” is used at the back end.
  • Rank is the rank corresponding to the process. If the rank is 0, the process is the master process and is responsible for broadcasting model status.
  • World_size refers to the total number of parallel processes. If the number of connected processes is smaller than world_size, the process will block above init_process_group. If world_size is reached, the program will continue running. If batCH_size = 16, then the total batch size is 16 * world_size.
import os import sys import tempfile import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim import torch.multiprocessing as mp from torch.nn.parallel import DistributedDataParallel as DDP # On Windows platform, the torch.distributed package only # supports Gloo backend, FileStore and TcpStore. # For FileStore, set init_method parameter in init_process_group # to a local file. Example as follow: # init_method="file:///f:/libtmp/some_file" # dist.init_process_group( # "gloo", # rank=rank, # init_method=init_method, # world_size=world_size) # For TcpStore, same way as on Linux. def setup(rank, world_size): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '12355' # initialize the process group Dist. Init_process_group ("gloo", rank=rank, world_size=world_size) # dist.destroy_process_group()Copy the code

4.1.2 Simple model

Now, let’s create a simple module, wrap it with DDP, and feed it with some virtual input data. Note that since DDP broadcasts model state from the RANK 0 process to all other processes in the DDP constructor, the starting model parameters are the same for all DDP processes, and users need not worry that different DDP processes start with different initial model parameter values.

+-----------+ | | | Rank 0 | | | +-----+-----+ | | Model Parameters | | +---------------+---------v----------------------+ | | | | | | | | | | | | v v v +----+-----+ +----+-----+ +---+-------+  | | | | | | | Rank 1 | | Rank 2 | ...... | Rank n | | | | | | | +----------+ +----------+ +-----------+Copy the code

DDP wraps the lower-level details of distributed communication and provides a clean API as if it were a native model. Gradient synchronous communication occurs during backpropagation and overlaps with backcalculation. When backward() returns, param.grad already contains the synchronous gradient tensor. Because DDP encapsulates distributed communication primitives, the gradient of model parameters can be all-reduce.

class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.net1 = nn.Linear(10, 10) self.relu = nn.ReLU() self.net2 = nn.Linear(10, 5) def forward(self, x): return self.net2(self.relu(self.net1(x))) def demo_basic(rank, world_size): print(f"Running basic DDP example on rank {rank}.") setup(rank, world_size) # create model and move it to GPU with id rank model = ToyModel().to(rank) ddp_model = DDP(model, device_ids=[rank]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), Lr =0.001) optimizer.zero_grad() outputs = ddp_model(torch. Randn (20, 10)) labels = Torch. Randn (20, 10) 5).to(rank) loss_fn(outputs, labels).backward() optimizer.step() cleanup() def run_demo(demo_fn, world_size): mp.spawn(demo_fn, args=(world_size,), nprocs=world_size, join=True)Copy the code

The details are shown below.

+--------------------------+                   +------------------------+
| torch.optim.SGD          |                   | DDP                    |
|                          |    parameters()   |                        |
|                          |                   |      +------------+    |
|                          | <-----------------+      |            |    |
|                          |                   |      |  ToyModel  |    |
|                          |                   |      |            |    |
|                          |                   |      +------------+    |
|                          |                   |                        |
+--------------------------+                   +--------+---------------+
                                                        |
                                                        |
                                                        |  forward outputs
                                                        |
                                                        |
                                                        v

                                               +-------------------------+
                                               | nn.MSELoss()            |
                                               |                         |
                                               |                         |
                                               |                         |
                                               |                         |
                                               +-------------------------+


Copy the code

4.1.3 Processing speed deviation

In DDP, constructors, forward passes, and back passes are distributed synchronization points. We expect different processes to start the same number of synchronization operations and arrive at these synchronization points at roughly the same time and in the same order. Otherwise, the faster process may arrive at the synchronization point earlier, and if the faster process waits too long for the laggard, the earlier process will time out.

Therefore, users are responsible for balancing workload distribution between processes. Sometimes deviations in processing speed are unavoidable due to network latency, resource contention, unpredictable workload spikes, etc. To avoid timeouts in these cases, make sure that init_process_group is called. The timeout argument passes a large enough value.

4.1.4 Saving and loading checkpoints

In general, you can use torch. Save and Torch. Load as checkpoints to restore training.

When using DDP, one optimization is to save the model in only one process and then load the model in all processes, thus reducing the write overhead (much like read-write separation in a database). Because all processes start with the same parameters and synchronize gradients in reverse pass, the optimizer should set the parameters to the same value. If you use this optimization, make sure that no process starts loading until the save is complete.

In addition, when loading a module, you need to provide the appropriate map_location parameter to prevent a process from entering someone else’s device. If map_location is missing, torch. Load will first load the module into the CPU and then copy each parameter to where it was previously saved, which will cause all processes on the same machine to use the same set of devices.

For more advanced failover and resilient support, see TorchElastic. There will also be a series on elasticity.

As you can see from the following figure, Rank 0 is responsible for saving the model to storage, while other ranks load the model locally.

+-----------+ | | | Rank 0 | | | +-----+-----+ | save | Model Parameters | | v +-------+------+ | | +-----------+ Model file +---------------------+ | | | | | +---+----------+ | | | | | | | | | | | | | |load |load load | | | | | | | | | | |  | | v v v +----+-----+ +----+-----+ +---+-------+ | | | | | | | Rank 1 | | Rank 2 | ...... | Rank n | | | | | | | +----------+ +----------+ +-----------+Copy the code

Details are as follows:

def demo_checkpoint(rank, world_size): print(f"Running DDP checkpoint example on rank {rank}.") setup(rank, world_size) model = ToyModel().to(rank) ddp_model = DDP(model, device_ids=[rank]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), Lr =0.001) CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint" if rank == 0: # All processes should see same parameters as they all start from same # random parameters and gradients are synchronized in backward passes. # Therefore, saving it in one process is sufficient. torch.save(ddp_model.state_dict(), CHECKPOINT_PATH) # Use a barrier() to make sure that process 1 loads the model after process # 0 saves it. dist.barrier() # configure map_location properly map_location = {'cuda:%d' % 0: 'cuda:%d' % rank} ddp_model.load_state_dict( torch.load(CHECKPOINT_PATH, map_location=map_location)) optimizer.zero_grad() outputs = ddp_model(torch.randn(20, 10)) labels = torch.randn(20, 5).to(rank) loss_fn = nn.MSELoss() loss_fn(outputs, labels).backward() optimizer.step() # Not necessary to use a dist.barrier() to guard the file deletion below # as the AllReduce ops in the backward pass of DDP already served as # a synchronization. if rank == 0: os.remove(CHECKPOINT_PATH) cleanup()Copy the code

4.2 Combine DDP and model in parallel

Pytorch.org/tutorials/i… The second half is a parallel combination with the model, so let’s look at it.

DDP is also suitable for multi-GPU models. DDP is especially useful when training large models with big data.

class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)
​
    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)
Copy the code

Note that device_IDS and output_device cannot be set when passing a multi-GPU model to DDP.

The input and output data will be placed on the appropriate device through the application or model Forward () method.

def demo_model_parallel(rank, world_size): print(f"Running DDP with model parallel example on rank {rank}.") setup(rank, world_size) # setup mp_model and devices for this process dev0 = (rank * 2) % world_size dev1 = (rank * 2 + 1) % world_size mp_model = ToyMpModel(dev0, dev1) ddp_mp_model = DDP(mp_model) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_mp_model.parameters(), Lr =0.001) optimizer.zero_grad() # outputs = ddp_mp_model(10, 10)) labels = torch.randn(20, 5).to(dev1) loss_fn(outputs, labels).backward() optimizer.step() cleanup() if __name__ == "__main__": n_gpus = torch.cuda.device_count() assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}" world_size = n_gpus run_demo(demo_basic, world_size) run_demo(demo_checkpoint, world_size) run_demo(demo_model_parallel, world_size)Copy the code

Notice that Sampler is not used here. Normally, DistributedSampler should be used in conjunction with DDP. DistributedSampler will divide the data set samples for each process, so that each process can read the samples it should use. Also, the DistributedSampler shuffles the data set using set_EPOCH for the DDP schema.

0x05 How Do I Start Multiple Processes

As mentioned earlier, you need to use multi-machine distributed data aparallel and startup scripts if your application needs to scale across machine boundaries. Torch. Nn. The parallel DistributedDataParallel () support multiple connected through a network of machines, the user must start a primary training for each process explicit script.

Let’s take a look at the startup script github.com/pytorch/exa… . Here is a translation of the MD file.

In this tutorial, we will demonstrate how to build a distributed model training application so that it can be easily launched on multiple nodes. Here there are multiple Gpus per node and PyTorch’s distributed launcher script is used github.com/pytorch/pyt… Launch utility torch.distributed. Launch, a script that can be used to start multiple processes per node for distributed training, which produces multiple distributed training processes on each training node.

This tool can be used for CPU training or GPU training, and if used for gpus, each GPU generates a Process. The tool can be used for both single-node and multi-gpu training.

  • If it is a single-node multi-GPU, it will run a distributed process on a single GPU, which is said to improve the single-node training performance very well.
  • If it is used for multi-node distributed training, better performance of multi-node distributed training can be achieved by generating multiple processes on each node. The acceleration ratio is higher if an Infiniband interface is available.

The tool starts a given number of processes per node (–nproc_per_node) in both single-node or multi-node distributed training cases. If used for GPU training, this number must be less than or equal to the number of Gpus (nproc_per_node) on the current system. Each process will run on a single GPU from GPU 0 to GPU (nproc_per_node-1).

5.1 Prerequisites

Multiple workers train the same global model by processing different parts of a large data set, and each worker independently calculates local gradients (also known as sub-gradients), then synchronizes gradients using AllReduce primitives. Because the same program runs on all applications, but each application runs on a different part of the training data set, this execution model is called single-program multi-data or SPMD in HPC terminology,

5.2 Application Process Topology

A distributed data parallelism (DDP) application can be executed on multiple nodes, where each node can consist of multiple GPU devices. Each node, in turn, can run multiple copies of the DDP application, with each copy processing its model on multiple Gpus.

Let N be the number of nodes running the application and G be the number of Gpus on each node. The total number of application processes running on all nodes at the same time is called World Size, or W. The number of processes running on each node is called Local World Size, or L for short.

Each application process is assigned two ids: local rank in [0, L-1] and global rank in [0, W-1].

To illustrate the terms defined above, consider the case of starting a DDP application on two nodes, each with four Gpus. Then we want each process to span two Gpus. The process to node mapping is shown below:

The following image is also from github.com/pytorch/exa… .

While there are many ways to map processes to nodes, a good rule of thumb is to have a process span a single GPU. This allows DDP applications to have as many parallel read streams as gpus, and in reality provides a good balance between I/O and computing costs.

5.3 Preparing and Starting DDP Applications

Regardless of how a DDP application is started, each process needs a mechanism to know its global and local levels. Therefore, all processes create a ProcessGroup based on which they can participate in collective communication operations such as AllReduce.

A convenient way to start multiple DDP processes and initialize all the parameters (values needed to create a ProcessGroup) is to use the distributed script launch.py provided by PyTorch.

The Launcher can be found in the distributed subdirectory of the local Torch installation directory. Here’s a quick way to get the launch.py path on any operating system:

python -c " from os import path; import torch; print(path.join(path.dirname(torch.__file__), 'distributed', 'launch.py')) "
Copy the code

This will print the following:

/ home/username/miniconda3 / envs/pytorch/lib/python3.8 / site - packages/torch/distributed/launch. PyCopy the code

When the DDP application launches with launch.py, it passes world size, Global Rank, Local Rank, Master Address, and port as command-line arguments to each instance via environment variables. To use the Launcher, your application needs to follow the following conventions:

  • Must beA single workerProvide entry point functions. For example, it should not be usedtorch.multiprocessing.spawnStart the child process.
  • Environment variables must be used to initialize the process group.

For simplicity, applications can assume that each process maps to a single GPU, but in the next section, we’ll also show how to perform process-to-GPU mapping in a more general way.

5.4 Example Applications

This sample DDP application is based on the “Hello, World” application from the DDP tutorial.

5.4.1 Parameter Transfer Conventions

A DDP application takes two command-line arguments:

  1. --local_rank: This parameter will passlaunch.pyThe incoming.
  2. --local_world_size: This is explicitly passed, usually a number or the number of Gpus per node.

The application parses these and calls the spMD_main entry point:

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int, default=0)
    parser.add_argument("--local_world_size", type=int, default=1)
    args = parser.parse_args()
    spmd_main(args.local_world_size, args.local_rank)
Copy the code

In SPMD_main, process groups are initialized using a back end (NCCL or Gloo). The rest of the information required for rendezvous comes from the environment variable set by launch.py:

def spmd_main(local_world_size, local_rank):
    # These are the parameters used to initialize the process group
    env_dict = {
        key: os.environ[key]
        for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
    }
    print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
    dist.init_process_group(backend="nccl")
    print(
        f"[{os.getpid()}] world_size = {dist.get_world_size()}, "
        + f"rank = {dist.get_rank()}, backend={dist.get_backend()}"
    )

    demo_basic(local_world_size, local_rank)

    # Tear down the process group
    dist.destroy_process_group()

Copy the code

Given local rank and world size, the training function demo_basic initializes the DistributedDataParallel model on a set of gpus on the local node via Device_IDS:

def demo_basic(local_world_size, local_rank): # setup devices for this process. For local_world_size = 2, num_gpus = 8, # rank 0 uses GPUs [0, 1, 2, 3] and # rank 1 uses GPUs [4, 5, 6, 7]. n = torch.cuda.device_count() // local_world_size device_ids = list(range(local_rank * n, (local_rank + 1) * n)) print( f"[{os.getpid()}] rank = {dist.get_rank()}, " + f"world_size = {dist.get_world_size()}, n = {n}, device_ids = {device_ids}" ) model = ToyModel().cuda(device_ids[0]) ddp_model = DDP(model, device_ids) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), Lr =0.001) optimizer.zero_grad() outputs = ddp_model(torch. Randn (20, 10)) labels = Torch. Randn (20, 10) 5).to(device_ids[0]) loss_fn(outputs, labels).backward() optimizer.step()Copy the code

The application can be launched on an 8 GPU node, one process per GPU, using launch.py:

python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=8 example.py --local_world_size=8
Copy the code

And produces output similar to the one shown below:

***************************************** Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. ***************************************** [238627] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '8'} [238630] Initializing Process group with: {' MASTER_ADDR ':' 127.0.0.1 ', 'MASTER_PORT' : '29500', 'RANK', '3', 'WORLD_SIZE' : '8'} [238628] Initializing Process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '1', 'WORLD_SIZE': '8'} [238634] Initializing Process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '7', 'WORLD_SIZE': '8'} [238631] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '4', 'WORLD_SIZE': '8'} [238632] Initializing Process group with: {' MASTER_ADDR ':' 127.0.0.1 ', 'MASTER_PORT' : '29500', 'RANK', '5', 'WORLD_SIZE' : '8'} [238629] Initializing Process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '2', 'WORLD_SIZE': '8'} [238633] Initializing Process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '6', 'WORLD_SIZE': '8'} [238633] world_size = 8, rank = 6, backend=nccl [238628] world_size = 8, rank = 1, backend=nccl [238629] world_size = 8, rank = 2, backend=nccl [238631] world_size = 8, rank = 4, backend=nccl [238630] world_size = 8, rank = 3, backend=nccl [238632] world_size = 8, rank = 5, backend=nccl [238634] world_size = 8, rank = 7, backend=nccl [238627] world_size = 8, rank = 0, backend=nccl [238633] rank = 6, world_size = 8, n = 1, device_ids = [6] [238628] rank = 1, world_size = 8, n = 1, device_ids = [1] [238632] rank = 5, world_size = 8, n = 1, device_ids = [5] [238634] rank = 7, world_size = 8, n = 1, device_ids = [7] [238629] rank = 2, world_size = 8, n = 1, device_ids = [2] [238630] rank = 3, world_size = 8, n = 1, device_ids = [3] [238631] rank = 4, world_size = 8, n = 1, device_ids = [4] [238627] rank = 0, world_size = 8, n = 1, device_ids = [0]Copy the code

Again, it can be started with a single process that spans all 8 Gpus:

python /path/to/launch.py --nnode=1 --node_rank=0 --nproc_per_node=1 example.py --local_world_size=1
Copy the code

Create nproc_per_node processes for the current host. Each process executes the training script independently. In addition, assign a local_rank parameter to each process, indicating the number of the current process on the current host.

For example, node_rank = 2 and local_rank = 0 indicate the first process on node_rank.

The following output is produced in turn

[262816] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '1'}
[262816]: world_size = 1, rank = 0, backend=nccl
[262816] rank = 0, world_size = 1, n = 8, device_ids = [0, 1, 2, 3, 4, 5, 6, 7]
Copy the code

5.5 the conclusion

As the author of a distributed data parallel application, your code needs to understand two types of resources: compute nodes and gpus within each node. But keeping track of how a GPU set maps to an application process can be tedious and error-prone.

So we want to dramatically simplify the setup of distributed training by building your application using the launcher as shown in this example.

5.6 Behind the Startup Script

It’s not enough to know what the startup script does; we also need to know what’s going on inside it.

5.6.1 launch. Py

Launch. Py is located in the torch/distributed/launch. Py, but in fact, most of its functions were transferred to the torch/distributed/run. The py.

def main(args=None):
    logger.warn(
        "The module torch.distributed.launch is deprecated "
        "and going to be removed in future."
        "Migrate to torch.distributed.run"
    )
    args = parse_args(args)
    run(args)
Copy the code

So we’re going to look at run.py.

5.6.2 run. Py

As you can see, the basic idea of run.py is to extract information from the command line using config_FROm_args, build the configuration, execute the statement and its parameters, and then call elastic_launch to execute it. Thus, flexibility training is the future trend. We have a follow-up series on resilience training.

def run(args):
    if args.standalone:
        args.rdzv_backend = "c10d"
        args.rdzv_endpoint = "localhost:29400"
        args.rdzv_id = str(uuid.uuid4())
        log.info(
            f"\n**************************************\n"
            f"Rendezvous info:\n"
            f"--rdzv_backend={args.rdzv_backend} "
            f"--rdzv_endpoint={args.rdzv_endpoint} "
            f"--rdzv_id={args.rdzv_id}\n"
            f"**************************************\n"
        )

    config, cmd, cmd_args = config_from_args(args)
    elastic_launch(
        config=config,
        entrypoint=cmd,
    )(*cmd_args)
Copy the code

Run.py can also run independently, for example.

>>> python -m torch.distributed.run
    --nnodes=$NUM_NODES
    --nproc_per_node=$NUM_TRAINERS
    --rdzv_id=$JOB_ID
    --rdzv_backend=c10d
    --rdzv_endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
Copy the code

5.6.3 definition

Since run.py has many configuration parameters, let’s take a look.

  1. Node – Physical instance or container; Maps to the cell coordinated with the Job Manager.

  2. Worker – Worker in distributed training environment.

  3. WorkerGroup – A group of workers (such as trainers) that perform the same function.

  4. LocalWorkerGroup – a subset of workers in a workgroup running on the same node.

  5. RANK – The RANK of the worker in the workgroup. It is a global RANK and can be considered as a global GPU resource list.

  6. LOCAL_RANK – The rank of a worker in a local workgroup, which can be regarded as the list of GPU resources on the current node.

  7. GROUP_RANK – Rank of the worker group. A number between 0 and the maximum number of nodes. If each node runs a single workgroup, that is the rank of that node.

  8. ROLE_RANK – For the ranks shared between workers who have the same role, the role is specified in the “WorkerSpec”.

  9. WORLD_SIZE – Total number of workers in the workgroup. Since nodes will join/leave, WORLD_SIZE will change, and the encoding cannot rely on the stability of WORLD_SIZE.

  10. LOCAL_WORLD_SIZE – The size of the local workgroup, i.e. the number of workers running locally, is equal to –nproc_per_node specified during the torch.distributed. Run run. Currently, torch/distributed/run.py only supports isomorphic LOCAL_WORLD_SIZE. That is, assume that all nodes run the same number of local workers (per role).

  11. ROLE_WORLD_SIZE – The total number of workers with the same role, as specified in WorkerSpec.

  12. Rdzv_id – A user-defined ID that uniquely identifies the workgroup for the job. This ID is used when each node joins a specific workgroup.

  13. Rdzv_backend-rendezvous backend (for example, “C10D”). This is usually a strongly consistent key-value store.

  14. Rdzv_endpoint-rendezvous Back-end endpoint; Usually in the form of ”

    : “.

  15. Run_id: User-defined ID that uniquely identifies an instance of a distributed application. It usually maps to the job ID and is used for

    Allow nodes to join the correct distributed application.

  16. TORCHELASTIC_RESTART_COUNT – The number of times the workgroup has restarted so far.

  17. TORCHELASTIC_MAX_RESTARTS – The maximum number of restarts for the configuration.

  18. TORCHELASTIC_RUN_ID – Equal to rendezvous run_id, the unique job ID.

We’ll have a series on flexibility training later, so we’ll skip it. Stay tuned for the next article as we begin to introduce the store concepts needed for communication.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Github.com/pytorch/exa…

Pytorch.org/tutorials/i…