0 x00 the

This article mainly adds my own understanding to the translation of PyTorch’s official documentation, hoping to give you a historical context and basic concepts of PyTorch’s distributed world. If you are interested, you can take a closer look at the history and see how a machine learning system steps into the distributed world/improves its functionality.

Other articles in this series are as follows:

Automatic Differentiation of Deep Learning Tools (1)

Automatic Differentiation of Deep Learning Tools (2)

Automatic differentiation of Deep Learning Tools (3) — Interpretation of examples

PyTorch implements forward propagation (1) — Base class (1)

PyTorch implements forward propagation (2) — Base class (2)

PyTorch how to implement forward propagation (3) – implementation

How to implement back propagation (1)—- call engine

Pytorch how to implement backward propagation (2)—- engine static structure

Pytorch how to implement backward propagation (3)—- engine dynamic logic

PyTorch how to implement backward propagation (4)—- specific algorithm

0x01 PyTorch Distributed History

Inspired by PyTorch’s distributed masterpiece, let’s take a look at PyTorch’s distributed history up to 1.9.

If you want to explore PyTorch source code, Gemfield and PyTorch source code interpretation columns are recommended.

PyTorch distributed history to github.com/pytorch/pyt… The author roughly divides the current history into seven stages.

Respectively is:

  1. Using Torch. Multiprocessing encapsulates Python’s native Multiprocessing module so that multiple CPU cores can be leveraged.
  2. Importing THD (Distributed PyTorch) gives you the underlying library for distributed computing.
  3. The Torch. Distributed package was introduced, which allows the exchange of tensors between multiple machines. Use this package to train in larger batches over multiple machines.
  4. Released the c10d library, which become a torch. Distributed package and torch nn. The parallel. DistributedDataParallel package back end, the basis of THD is rejected at the same time.
  5. A distributed RPC framework is provided to support parallel training of distributed models. It allows functions and references to remote objects to be run remotely without copying the real data around them, and provides Autograd and Optimizer apis to transparently propagate backward and update parameters across RPC boundaries.
  6. Introducing resilience training, Torchelastic offers a strict superset of the “Torch.distributed. Launch” CLI and adds fault tolerance and resilience features.
  7. Pipelined parallelism is introduced, torchgpipe.

Its historical evolution diagram is as follows:

V1.0 v1.1 v1.2 v0.1.8 v1.3 v1.7 THD C10D TorchElastic + + + | | | | | | | | | | | | | | | | | | +-------+--------+------------+-------------+-----------+----------+------------+----------> | | | | | | | | | | | | | | | | | | | | + + + + Multiprocessing torch. The distributed RPC Pipeline v0.1.2 v0.2 v1.4 v1.8 v0.1.6 v0.4 v1.5 v1.9 v1.6Copy the code

If you are interested, you can take a look at how a machine learning system can step into the distributed world. If you are not interested, you can skip to the following overview.

1.1 Multiprocessing

PyTorch 0.1.2

Using Torch. Multiprocessing encapsulates Python’s native Multiprocessing module so that multiple CPU cores can be leveraged.

The reason is that there are technical problems with using threads in Python, primarily the Global Interpreter Lock, and therefore multiple processes should be used.

With Python, one cannot use threads because of a few technical issues. Python has what is called Global Interpreter Lock, which does not allow threads to concurrently execute python code.

Hence, the most pythonic way to use multiple CPU cores is multiprocessing

We made PyTorch to seamlessly integrate with python multiprocessing. This involved solving some complex technical problems to make this an air-tight solution, and more can be read in this in-depth technical discussion.

PyTorch 0.1.6

Multiprocessing supports CUDA.

Uptil now, Tensor sharing using multiprocessing only worked for CPU Tensors. We’ve now enabled Tensor sharing for CUDA tensors when Using python-3. You can read more notes here: pytorch.org/docs/notes/…

1.2 THD base library

PyTorch 0.1.8

Importing THD (Distributed PyTorch) gives you the underlying library for distributed computing.

Merged an initial version of THD (distributed pytorch)

1.3 the torch. Distributed repositories

PyTorch 0.2

We introduce the torch.distributed package that allows you to exchange Tensors among multiple machines. Using this package, you can scale your network training over multiple machines and larger mini-batches. For example, you are given the primitives to implement Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour.

The distributed package follows an MPI-style programming model. This means that there are functions provided to you such as send, recv, all_reduce that will exchange Tensors among nodes (machines).

For each of the machines to first identify each other and assign unique numbers to each other (ranks), we provide simple initialization methods:

  • shared file system (requires that all processes can access a single file system)
  • IP multicast (requires that all processes are in the same network)
  • environment variable (requires you to manually assign ranks and know an address of a node reachable from all processes)

This version introduces the Torch. Distributed package, which allows the exchange of tensors between multiple machines. Use this package to train in larger batches over multiple machines.

The Distributed follows the MPI-style programming model, which provides methods such as send, RECV, and ALL_reduce to exchange tensors between different nodes (machines).

Because you need multiple machines to identify each other, you need a mechanism to uniquely identify each machine. This is a rank. The Distributed package provides several simple initialization methods:

  • Shared file system (all processes on all machines can access this file system)
  • IP multicast (requires all processes to be on the same network)
  • Environment variables (requires the user to manually specify the rank and provide a node address accessible to all processes)

World size is the number of processes that will participate in the training. Each process will be assigned a rank, which is a number between 0 and world_size-1 and is unique in this job. It will be used as a process identifier and will be used instead of an address, for example, to specify which rank (process) the tensor should be sent to.

Primitives in distributed computing include send and RECV in synchronous mode and ISend and iRECV in asynchronous mode. Because certain communication patterns occur too frequently, PyTorch developed higher-order functions, such as ALL_reduce, which are set communication primitives for the entire process group and are more efficient.

However, distributed packages are still too low-level, so they are basically used to implement higher-order algorithms or customize special algorithms. Because data parallel training is so common, PyTorch created the advanced helper DistributedDataParallel, It’s almost a substitute for nn.DataParallel.

PyTorch 0.4

This version has several connections.

  • Increased DistributedDataParallelCPU, this class and DistributedDataParallel are similar, but the main support on the CPU training (DistributedDataParallel target is GPU). This class supportsmpi.gloo and tcpThese backends (TCP backends were later abolished).

Add DistributedDataParallelCPU. This is similar to DistributedDataParallel, but with specific support for models running on the CPU (contrary to DistributedDataParallel, which targets GPU), and supports mpi, gloo and tcp backends #5919.

  • Added new tool scripts. This script can be used on a single machine or multiple machinesDistributedDataParallel.

Helper utility for launching Distributed Training jobs

We have added an utility function to help launch jobs on a distributed setup. In order to launch a script that leverages DistributedDataParallel on either single-node multiple-nodes, we can make use of torch.distributed launch as follows

python -m torch.distributed.launch my_script.py --arg1 --arg2 --arg3
Copy the code
  • A new distributed back end based on NCCL 2.0 has been added for speed improvements, as well as new apis for collective communication across multiple Gpus.

A new distributed backend based on NCCL 2.0

PyTorch now has a new distributed backend, Which Leverages NCCL 2.0 for maximum speed. It also provides new APIs for collective operations on multiple GPUs can enable the new backend via

torch.distributed.init_process_group("nccl")
Copy the code
  • Other improvements include aggregation of multiple small broadcast operations, mixing accuracy, Infiniband support, and more.
  • Coalesce many small broadcasts to improve performance #4978
  • Add mixed-precision support for distributed training #4891
  • Release NCCL distributed backend. Previously it was marked as experimental. # 4921
  • Enable Infiniband support for Gloo data channel with automatic IB device detection #4795

1.4 c10d library

PyTorch 1.0

torch.distributed new “C10D” library

The torch.distributed package and torch.nn.parallel.DistributedDataParallel module are backed by the new “C10D” library. The main highlights of the new library are:

  • C10D is performance driven and operates entirely asynchronously for all backends: Gloo.NCCL, and MPI.
  • Significant Distributed Data Parallel performance improvements especially for slower network like ethernet-based hosts
  • Adds async support for all distributed collective operations in the torch.distributed package.
  • Adds send and recv support in the Gloo backend

This version released c10d library, which become a torch. The distributed package and torch nn. The parallel. DistributedDataParallel package back end, the basis of the library’s main point is:

  • Because C10D is a completely asynchronous operation, there is a significant performance improvement for all the back ends (Gloo, NCCL, and MPI).
  • For slow networks like Ethernet, distributed data parallelism has been greatly improved.
  • Asynchronous support has been added for all distributed collection operations in the Torch. Distributed package.
  • Add send and RECV support to the Gloo backend.

There are several other changes.

  • TCP backends are removed, Gloo and MPI backends are recommended for CPU set communication, and NCCL is recommended for GPU set communication.
  • The old (THD based) Torch. Distributed package was scrapped.
  • The old (THD) based torch. Nn. Parallel. DistributedDataParallel bag was abandoned.
  • torch.distributed: the TCP backend is removed, we recommend to use Gloo and MPI backends for CPU collectives and NCCL backend for GPU collectives.
  • the old (THD-backed) torch.distributed package is deprecated but still available at torch.distributed.deprecated.
  • The old (THD-backed) torch.nn.parallel.DistributedDataParallel is deprecated but still available at torch.nn.parallel.deprecated.DistributedDataParallel.

PyTorch 1.1

Nn. The parallel. DistributedDataParallel can support multiple GPU model, this model of parallel parallel across server and data collaboration.

DistributedDataParallel new functionality and tutorials

nn.parallel.DistributedDataParallel: can now wrap multi-GPU modules, which enables use cases such as model parallel (tutorial) on one server and data parallel (tutorial) across servers.

C10d ProcessGroup: : getGroupRank has been removed.

PyTorch 1.2

This release has made the following improvements:

Distributed Package supports CPU modules, sparse tensors, and local gradient accumulation.

Distributed Package

  • DistributedDataParallel: support CPU modules. (20236)
  • DistributedDataParallel: support sparse tensors. (19146)
  • DistributedDataParallel: support local gradient accumulation. (21736)

There are also some other minor improvements, such as the addition of device Guard for MPI operations.

PyTorch 1.3

Torch. Distributed support for macOS was added, but only with the Gloo backend, allowing users to reuse code from other platforms with only one line of code change. Some other improvements have also been made.

This release adds macOS support for torch.distributed with the Gloo backend. You can more easily switch from development (e.g. on macOS) to deployment (e.g. on Linux) without having to change a single line of code. The prebuilt binaries for macOS (stable and nightly) include support out of the box.

  • torch.distributed.all_reduce_coalesced Support allreduce of a list of same-device tensors (24949.25470.24876)
  • torch.distributed.all_reduce Add bitwise reduction ops (BAND, BOR, BXOR) (26824)

1.5 RPC framework

PyTorch 1.4.0

This release began experimenting with distributed model training.

As models like RoBERTa’s scale up to billions of parameters, model parallel training becomes increasingly important because it can help researchers push the limits. Version 1.4.0 provides a distributed RPC framework to support parallel training of distributed models. It allows functions and references to remote objects to be run remotely without copying the relevant real data, and provides Autograd and Optimizer apis to transparently propagate backward and update parameters across RPC boundaries.

Distributed Model Parallel Training [Experimental]

With the scale of models, such as RoBERTa, continuing to increase into the billions of parameters, model parallel training has become ever more important to help researchers push the limits. This release provides a distributed RPC framework to support distributed model parallel training. It allows for running functions remotely and referencing remote objects without copying the real data around, and provides autograd and optimizer APIs to transparently run backwards and update parameters across RPC boundaries.

Torch.distributed. RPC is a newly introduced package. Its basic building blocks allow functions to be run remotely in model training and reasoning, which is useful for scenarios such as distributed model parallelism or implementing parametric server frameworks. More specifically, it contains four pillars: RPC, remote references, distributed Autograd, and distributed optimizer. See the documentation and tutorial for more details.

RPC [Experimental]

torch.distributed.rpc is a newly introduced package. It contains basic building blocks to run functions remotely in model training and inference, which will be useful for scenarios like distributed model parallel or implementing parameter server frameworks. More specifically, it contains four pillars: RPC, Remote Reference, Distributed Autograd, and Distributed Optimizer. Please refer to the documentation and the tutorial for more details.

PyTorch 1.5

Torch.distributed. RPC has been officially released.

The torch.distributed. RPC package is designed to support distributed training patterns that are not suitable for distributed dataparallel. Examples include parameter server training, distributed model parallelism, and distributed pipeline parallelism. The functionality in the torch.distributed. RPC package can be divided into four main groups of apis.

  • The RPC API allows you to run a function with a given parameter on a specified target worker process, and you can get a return value or create a distributed reference to the return value.
  • RRef (remote reference) is a reference to an object on another worker. A worker holding an RRef can explicitly request a copy of an object, and can also share a lightweight RRef with other workers without worrying about reference counting. This is especially useful when multiple workers need to repeatedly access different versions of the same remote object.
  • With distributed autoloading, the application can calculate gradients automatically, even if the model has been split across multiple workers using RPC. PyTorch stitches together the local Autograd graphs at RPC boundaries during forward propagation, and crosses the boundaries for participants to start local Autograd in backward propagation.
  • Distributed optimizerModel parameters are updated using gradients calculated by distributed Autograd. Its constructor accepts a local optimizer (e.gSGD.AdagradEtc.) and a parameter RRef list, itsstepThe function automatically uses the local optimizer to update parameters on all the different RRef owners (workers).

Distributed RPC framework APIs [Now Stable]

The torch.distributed.rpc package aims at supporting a wide range of distributed training paradigms that do not fit into DistributedDataParallel. Examples include parameter server training, distributed model parallelism, and distributed pipeline parallelism. Features in the torch.distributed.rpc package can be categorized into four main sets of APIs.

  • The RPC API allows running a function on a specified destination worker with given arguments and fetches the return value or creates a distributed reference to the return value.
  • The RRef (Remote REFerence) serves as a reference to an object on another worker. A worker holding an RRef can explicitly request copies of the object, and it can also share the light-weight RRef with other workers without worrying about reference counting. This is especially useful when multiple workers need to repeatedly access different versions of the same remote object.
  • With Distributed Autograd, applications can automatically compute gradients even if a model is split on multiple workers using RPC. This is achieved by stitching together local autograd graphs at RPC boundaries in the forward pass and reaching out to participants to transparently launch local autograd in the backward pass.
  • The Distributed Optimizer uses gradients computed by Distributed Autograd to update model parameters. Its constructor takes a local optimizer (e.g., SGD.Adagrad, etc.) and a list of parameter RRefs, and its step() function automatically uses the local optimizer to update parameters on all distinct RRef owner workers.



PyTorch 1.6

This release includes a number of improvements to DDP and RPC, as well as new features, including:

Numerous improvements and new features for both distributed data parallel (DDP) training and the remote procedural call (RPC) packages.

  • The TensorPipe back end of RPC

PyTorch 1.6 introduces a new back end for RPC modules that utilizes the TensorPipe library. The TensorPipe library is a machine-learning-oriented TensorPipe point-to-point communication primitive designed to complement the current distributed training primitives in PyTorch (Gloo, MPI, etc.) that are aggregate communication and chunking. The paired and asynchronous nature of TensorPipe makes it useful for building new networking patterns that go beyond data parallelism: Client-server approach (e.g., embedded parameter server, actor-Learner separation in impala-style RL, etc.) and model-pipeline parallel training (e.g., GPipe), Gossip SGD, etc.

TensorPipe backend for RPC

Introduces a new backend for the RPC Module which Leverages the TensorPipe Library, a tensor-aware point-to-point communication primitive targeted at machine learning, intended to complement the current primitives for distributed training in PyTorch (Gloo, MPI, …) which are collective and blocking. The pairwise and asynchronous nature of TensorPipe lends itself to new networking paradigms that go beyond data parallel: client-server approaches (e.g., parameter server for embeddings, actor-learner separation in Impala-style RL, …) and model and pipeline parallel training (think GPipe), gossip SGD, etc.

  • [Beta] DDP+RPC

PyTorch distributed supports two powerful paradigms: DDP for fully synchronized data parallel training, and RPC framework for distributed model parallelism.

Currently, the two features work independently, and users cannot mix and match the two features to try to mix parallel patterns. Since PyTorch 1.6, we have enabled DDP and RPC to work seamlessly together so that users can combine the two technologies for data parallelism and model parallelism. For example, the user wants to place large embedded tables on the parameter server and use RPC framework for embedding lookup, but store smaller dense parameters on the trainer and synchronize the dense parameters with DDP.

[Beta] DDP+RPC

PyTorch Distributed supports two powerful paradigms: DDP for full sync data parallel training of models and the RPC framework which allows for distributed model parallelism. Currently, These two features work independently and users can’t mix and match these to try out hybrid parallelism paradigms.

Starting PyTorch 1.6, we’ve enabled DDP and RPC to work together seamlessly so that users can combine these two techniques to achieve both data parallelism and model parallelism. An example is where users would like to place large embedding tables on parameter servers and use the RPC framework for embedding lookups, but store smaller dense parameters on trainers and use DDP to synchronize the dense parameters. Below is a simple code snippet.

  • [Beta] RPC – Asynchronous User Functions

RPC asynchronous user functions support yield and resume on the server side when executing user-defined functions. Prior to this feature, an RPC thread would wait for the user function to return while the called party processed the request. If the user function contains IO (for example, nested RPC) or signaling (for example, waiting for another request to be unblocked), the corresponding RPC thread will be idle, waiting for these events. As a result, some applications must use a large number of threads and send additional RPC requests, which can lead to performance degradation. To yield a user function in such an event, the application needs to: 1) encapsulate the function with @rpc.functions.async_executionDecorator; 2) Let the function return ‘torch. Futures.Future’ and install the recovery logic as a callback to the ‘Future’ object.

[Beta] RPC – Asynchronous User Functions

RPC Asynchronous User Functions supports the ability to yield and resume on the server side when executing a user-defined function. Prior to this feature, when an callee processes a request, one RPC thread waits until the user function returns. If the user function contains IO (e.g., nested RPC) or signaling (e.g., waiting for another request to unblock), the corresponding RPC thread would sit idle waiting for these events. As a result, some applications have to use a very large number of threads and send additional RPC requests, which can potentially lead to performance degradation. To make a user function yield on such events, applications need to: 1) Decorate the function with the @rpc.functions.async_execution decorator; and 2) Let the function return a torch.futures.Future and install the resume logic as callbacks on the Future object.

  • [Beta] Fork/Join Parallelism

This release adds support for language-level construction and runtime support for coarse-grained parallelism in TorchScript code. This support is useful for situations such as running models in an integration in parallel or bidirectional components in a recursive network in parallel, and unlocks the computing power of parallel architectures (such as many core cpus) for task-level parallelism.

Parallel execution of TorchScript programs is supported by two primitives: “torch.jit. Fork” and “torch.jit. Wait”.

[Beta] Fork/Join Parallelism

This release adds support for a language-level construct as well as runtime support for coarse-grained parallelism in TorchScript code. This support is useful for situations such as running models in an ensemble in parallel, or running bidirectional components of recurrent nets in parallel, and allows the ability to unlock the computational power of parallel architectures (e.g. many-core CPUs) for task level parallelism.

Parallel execution of TorchScript programs is enabled through two primitives: torch.jit.fork and torch.jit.wait.

1.6 Flexibility training

PyTorch 1.7

There are some improvements to DDP and RPC, as well as new features, including:

  • [Stable] TorchElastic now bundled into PyTorch docker image

Torchelastic offers a strict superset of the “Torch.distributed. Launch” CLI and adds fault tolerance and resiliency. If users are not interested in fault tolerance, they can get accurate functionality/behavior by setting ‘max_restarts=0’ and adding the convenience of automatically assigning ‘RANK’ and ‘MASTER_ADDR’ ports (instead of manually specifying them in ‘torch.distributed. Launch’).

By bundling “Torchelastic” with PyTorch in the same Docker image, users can immediately start trying Out Torchelastic without having to install “Torchelastic” separately. In addition to convenience, it is also a good choice to add support for elastic parameters to the existing Distributed PyTorch operator of Kubeflow.

[Stable] TorchElastic now bundled into PyTorch docker image

Torchelastic offers a strict superset of the current torch.distributed.launch CLI with the added features for fault-tolerance and elasticity. If the user is not be interested in fault-tolerance, they can get the exact functionality/behavior parity by setting max_restarts=0 with the added convenience of auto-assigned RANK and MASTER_ADDR|PORT (versus manually specified in torch.distributed.launch).

By bundling torchelastic in the same docker image as PyTorch, users can start experimenting with torchelastic right-away without having to separately install torchelastic. In addition to convenience, this work is a nice-to-have when adding support for elastic parameters in the existing Kubeflow’s distributed PyTorch operators.

  • [Beta] Support for uneven dataset inputs in DDP

PyTorch 1.7 introduces a new context manager, with the use of “torch. Nn. The parallel. DistributedDataParallel” training model used in combination, to support the use of uneven across different process, the size of the data set for training. This feature provides greater flexibility when using DDP and prevents users from having to manually ensure that data sets in different processes are the same size. Using this context manager, DDP automatically handles uneven data set sizes, which prevents errors or hangs at the end of training.

[Beta] Support for uneven dataset inputs in DDP

PyTorch 1.7 introduces a new Context manager to be used in conjunction with models using torch.nn.parallel.DistributedDataParallel to enable training with uneven dataset size across different processes. This feature enables greater flexibility when using DDP and prevents the user from having to manually ensure dataset sizes are the same across different process. With this context manager, DDP will handle uneven dataset sizes automatically, which can prevent errors or hangs at the end of training.

Other features include:

  • [Beta] NCCL Reliability – Async Error/Timeout Handling
  • [Beta] TorchScript remote and rpc_sync
  • [Beta] Distributed optimizer with TorchScript support
  • [Beta] Enhancements to RPC-based Profiling
  • [Prototype] Windows support for Distributed Training

1.7 Assembly line training

PyTorch 1.8

This release adds some significant improvements, such as: improved NCCL reliability; Pipeline parallel support; RPC profiling; And support to add gradient compression communication hook.

Pipelining parallelism introduces fairscale.nn.Pipe, which is actually torchgpipe.

Significant updates and improvements to distributed training including: Improved NCCL reliability; Pipeline parallelism support; RPC profiling; and support for communication hooks adding gradient compression.

Upstream fairscale.nn.Pipe into PyTorch as torch.distributed.pipeline (#44090)

PyTorch 1.9

mainly

  • Some bug fixes for Distributed/TorchElastic.
  • Significant improvements to RPC to support large-scale GPU distributed training.
  • Distributed training, GPU utilization, and SM efficiency are supported in PyTorch Profiler.

After studying the history, let’s look at the distributed overview.

0x02 Distributed Overview

The following is mainly based on pytorch.org/tutorials/b… Based on official documents, plus their own understanding.

An introduction to 2.1

2.1.1 torch. Distributed packet

The Torch. Distributed package in PyTorch provides a communication primitive for multi-process parallelism, allowing these processes to communicate between several compute nodes running on one or more computers. The torch. Distributed package is different from multiprocessing (Torch. Multiprocessing), which supports multiple machines connected over the network. And the user must explicitly launch a separate copy of the main training script for each process.

In the case of single machine and synchronous model, Torch. Distributed or torch. Nn. The parallel. DistributedDataParallel () wrapper may still than other data parallel methods (such as the torch. Nn. DataParallel) has the advantages:

  • Each process maintains its own optimizer and performs a complete optimization step in each iteration. While this may seem redundant, because the gradients are already clustered and averaged across processes, they are the same for each process, meaning that no parametric broadcast step is required, reducing the time it takes to transfer tensors between nodes.
  • Each process includes a separate Python interpreter, eliminating the extra interpreter overhead and “GIL turbulence” that comes from a single Python process driving multiple threads of execution, multiple model copies, or multiple Gpus. This is especially important for models that rely heavily on the Python runtime, often with a recursive layer or many components.

Starting with PyTorch V1.6.0, functionality torch. Distributed can be divided into three main components:

  • Distributed data parallel training (DDP) is a widely used single – program multi – data training paradigm. With DDP, the model is replicated on each process, and each model copy is provided with a different set of input data samples. DDP is responsible for gradient communication to keep model copies in sync and overlap them with gradient calculations to speed up training.

  • Rpc-based distributed training (RPC) is designed to support generic training structures that cannot accommodate data parallel training, such as distributed pipeline parallelism, parameter server paradigm, and the combination of DDP and other training paradigms. It helps manage the remote object life cycle and extends the Autograd engine beyond machine boundaries.

  • The collective Communication (C10D) library supports sending tensors across processes within a group. It provides collective communication apis (such as ALL_reduce and ALL_Gather) and P2P communication apis (such as Send and ISend). DDP and RPC (process group backend) are built on TOP of C10D in V1.6.0, where the former uses collective communication and the latter uses P2P communication. In general, developers do not need to use this raw communication API directly, because the DDP and RPC features above can serve many distributed training scenarios. However, there are some use cases where this API is still useful. An example is distributed parameter averaging, where the application wants to calculate the average of all model parameters after passing backwards, rather than using DDP to convey gradients. This can separate communication from computation and allow more fine-grained control over the content of communication, but on the other hand, it also forgoes the performance optimization provided by DDP. An example using the C10D communication API is shown in writing a distributed application with PyTorch.

    • Communication: Torch. The underlying Communication of the Distributed uses the Collective Communication (C10D) library to support sending tensors across processes in the group, and supports two types of Communication apis:

      • collective communication APIs: Distributed Data-Parallel Training (DDP)
      • P2P communication APIs: RPC-Based Distributed Training (RPC)

    The two communication apis correspond to two Distributed Training modes in PyTorch: Distributed Data-Parallel Training (DDP) and RPC-based Distributed Training (RPC).

Most of the existing documentation is written for DDP or RPC, and the remainder of this article details the material for these two components.

2.1.2 Knowledge links

PyTorch’s MultiProcessing module, which encapsulates Python’s native Multiprocessing module, is 100 percent API compatible and registers a custom Reducers that allows different processes to read and write to the same data using an IPC mechanism (shared memory). However, there are many weaknesses in the way it works in CUDA, such as the need to dictate the life cycle of various processes, resulting in multiprocessing on CUDA often behaving beyond expectations.

2.2 Data parallel training

Based on the basic knowledge of Torch. Distributed, we can use different distributed or parallel training methods according to the specific situation of our machine and task. PyTorch provides a variety of options for data parallel training. Typically, applications range from simple to complex, from prototype to mass production. The common trajectory of these applications is:

  1. If the data and model can be placed on a GPU and training speed is not a concern, single-device training is used.
  2. If you have multiple Gpus on your server and you want to speed up training with minimal code changes, you can use single-machine multi-GPU DataParallel.
  3. If you want to speed up training even further and are willing to write more code to set it up, you can use single-machine multi-GPU distributed Data Aparallel.
  4. If your application needs to scale across machine boundaries, use multi-machine distributed Data aparallel and startup scripts.
  5. Use Torchelastic to start distributed training if errors are expected (for example, OOM) or resources can be dynamically added and removed during training.

2.3 torch.nn.DataParallel

The DataParallel package allows you to utilize multiple Gpus for parallelism using minimal code. It requires only one line change to the application code. The tutorial Optional: Data Parallelism shows an example. It is important to note that while DataParallel is very easy to use, it does not usually provide the best performance. This is because the implementation of DataParallel replicates the model in each forward pass and its single-process, multi-threaded parallelism is affected by GIL contention. For better performance, consider using distributed Data aparallel.

2.4 torch.nn.parallel.DistributedDataParallel

DistributedDataParallel requires one more step to set up than DataParallel by calling init_process_group. DDP uses multi-process parallelism, so there is no GIL contention between model copies. In addition, the model is broadcast at DDP build time rather than each forward propagation, which also helps speed up training. DDP comes with a variety of performance tuning techniques. For more in-depth explanation, see this DDP paper (VLDB’20).

DDP materials are as follows:

  1. The DDP notes provide an introductory example and some brief explanations of its design and implementation. If this is your first time using DDP, start with this document.
  2. Getting Started with Distributed Data Parallel explains some common problems with DDP training, including unbalanced workloads, checkpoints, and multi-device models. Note that DDP can easily be combined with the parallelism of single-machine multi-device models described in the single-machine model parallelism best Practices tutorial.
  3. The startup and Configuration distributed Data parallel application file shows how to use the DDP startup script.
  4. The Shard Optimizer States With ZeroRedundancyOptimizer recipe demonstrates how ZeroRedundancyOptimizer can help reduce optimized memory usage for distributed data parallel training.

2.5 TorchElastic

As applications grow in complexity and size, failover becomes an urgent requirement.

Sometimes, errors like OOM are inevitably encountered while using DDP, but the DDP itself cannot recover from these errors and the basic try-except block does not work. This is because DDP requires that all processes run in a tightly synchronized manner and that all AllReduce traffic initiated in different processes must match.

If a process in the group throws an OOM exception, it is likely to cause an out-of-sync (mismatched AllReduce operations) that will crash or hang. Use Torchelastic to start distributed data parallel training if you expect a failure during training, or if resources may dynamically leave and join.

2.6 General distributed training

Many training paradigms are not suitable for data parallelism, such as parametric server paradigm, distributed pipeline parallelism, reinforcement learning applications with multiple observers or agents, etc. Torch.distributed. RPC aims to support common distributed training scenarios.

The torch.distributed. RPC package has four pillars:

  • RPC supports running a given function on a remote worker.
  • RRef helps manage the life cycle of remote objects. The reference counting protocol is described in the RRef annotation.
  • Distributed Autograd extends the Autograd engine beyond machine boundaries. For more details, see Distributed Autograd design.
  • The distributed optimizer can automatically contact all participating workers to update parameters using gradients calculated by the distributed Autograd engine.

The RPC tutorial is as follows (selected articles will be analyzed later) :

  1. The tutorial on using the Distributed RPC Framework begins with a simple reinforcement learning (RL) example to illustrate RPC and RRef. It then applies the basic distributed model in parallel to the RNN example to show how to use distributed Autograd and distributed optimizer.
  2. A tutorial in implementing a parametric server using the Distributed RPC framework borrowed from HogWild! The essence of training, which is applied to asynchronous parameter server (PS) training applications.
  3. The Distributed Pipe Parallelism tutorial with RPC extends the stand-alone pipe parallelism example (introduced in stand-alone model parallelism best Practices) to a distributed environment and shows how to implement it using RPC.
  4. This tutorial demonstrates how to implement RPC batching using the @rpc.functions. Async_execution decorator, which can help speed reasoning and training. It uses RL and PS examples similar to those used in tutorials 1 and 2 above.
  5. This tutorial demonstrates how to combine DDP with RPC so that you can train models with distributed data parallelism and distributed model parallelism.

0 x03 summary

We conclude with an official diagram that shows the internal architecture and logical relationships of the PyTorch distributed package.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Distribution of PyTorch

Pytorch.org/docs/stable…

NVIDIA NCCL official documentation

Pytorch.org/tutorials/i…

M. 3 cschool. Cn/pytorch/pyt…

Pytorch.org/tutorials/b…

Pytorch.org/tutorials/i…

Pytorch.org/tutorials/i…

Pytorch.org/tutorials/i…

Pytorch.org/tutorials/i…

Pytorch.org/tutorials/i…

Pytorch.org/tutorials/i…

Pytorch.org/tutorials/a…

Pytorch.org/tutorials/i…

Pytorch.org/tutorials/a…

Pytorch.org/docs/master…

Pytorch.org/docs/master…

Pytorch.org/tutorials/i…