0 x00 the

This article is based on several official PyTorch documents to understand the design and internal structure of distributed Autograd. It does not translate word for word, but adds some of my own understanding. Subsequent distributed Autograd articles will be analyzed based on this article.

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

0x01 Distributed RPC Framework

This article is mainly based on pytorch.org/docs/master… Is the benchmark, but the original document requires users to be familiar with the Autograd mechanism and the distributed RPC framework. Since we have already analyzed the Autograd mechanism, we will first study the distributed RPC framework.

1.1 RPC framework

Remote Procedure Call (RPC) is a design or technical idea, not a protocol or specification.

The simplest way to understand RPC is that one node requests a service provided by another node, but a “local call” feel needs to be maintained for user code, that is, remote function calls need to be made as if they were local, so that the remote service or code appears to be running locally.

RPC needs to address several issues:

  • How to communicate: That is, how to establish a connection between the caller and the service provider.
  • How to address: that is, how the caller finds the service provider and knows what services are in it.
  • How to send parameters: When the caller initiates a remote call, the parameters of the method need to be transmitted to the server through TCP and other protocols. How to serialize parameters?
  • How the parameters are received: How the service provider deserializes and invokes the parameters once they are received.
  • How to return: How the service provider sends the return value to the caller after invoking a locally provided service.

1.2 Four pillars of PyTorch RPC

The following is from the official document pytorch.org/docs/master…

The distributed RPC framework provides a multi-machine model training mechanism through a set of primitives to allow remote communication, and a higher-level API to automatically distinguish models split across multiple machines. The distributed RPC framework makes it easy to run functions remotely, supports referencing remote objects without copying real data, and provides Autograd and optimizer apis for transparently running backwards and updating parameters across RPC boundaries. These functions can be divided into four sets of apis.

  1. Remote procedure call (RPC) supports running a function on a specified worker with a given parameter and getting the return value or creating a reference to the return value. There are three main RPC apis: rpc_sync() (synchronous), rpc_async() (asynchronous), and remote() (asynchronous and returns a reference to a remote return value). If the user code cannot continue without a return value, use the synchronization API. Otherwise, use the asynchronous API to get the Future and wait for it when the caller needs to return a value. The Remote () API is useful when you need to create something remotely but never need to get it to the caller. Imagine the driver process setting parameter servers and trainers. A Driver can create an embedded table on a parameter server and then share a reference to the embedded table with the trainer, but it never uses the embedded table locally itself. In this case, rpc_sync() and rpc_async() are no longer appropriate because they always mean that the return value is sent to the caller either immediately or in the future.
  2. Remote reference (RRef)Used as a distributed shared pointer to a local or remote object. It can be shared with other workers, and reference counts are handled transparently. Each RRef has only one owner, and objects only exist in that owner. A non-owner worker with an RRef can obtain a copy of the object from the owner by explicit request. When the worker needs to access a data object, but it is not the object’s creatorremote()This is useful when the caller of the function is also not the owner of the object. The distributed optimizer is an example of such a use case.
  3. Distributed Autograd sews together all the local Autograd engines involved in forward propagation of workers and automatically contacts them during backward propagation to calculate gradients. This is especially useful when forward passing needs to span multiple machines, such as distributed model parallel training, parameter server training, and so on. With this feature, user code no longer needs to worry about how to send gradients across RPC boundaries and in what order the local Autograd engine should be started, which can become very complicated if there are nested and interdependent RPC calls in the forward pass.
  4. Distribution optimizerThe construction of aOptimizer()(e.g.,SGD().Adagrad()Etc.) and a list of RRefs parameters. That is, create one on top of each different Ref ownerOptimizer()Instance, and then runstep()Update parameters accordingly. When the user carries out distributed forward and backward propagation, parameters and gradients will be scattered among multiple workers, so it is necessary to optimize each related worker. The Distributed Optimizer combines all of these native optimizers into one, providing concise constructors andstep()API.

1.3 RRef

Below we start with pytorch.org/docs/master… Learn the basic concepts and some design details of the remote reference protocol as a benchmark.

RRef stands for Remote REFerence. It is a reference to an object that is located on the local or remote worker and is transparently referenced internally. Conceptually, it can be thought of as a distributed shared pointer. An application can call remote() to create an RRef. Each RRef is owned by the caller (owner) of remote() and can be used by multiple users. The owner stores the actual data and tracks the global reference count. Each RRef can be uniquely identified by the global RRefId assigned by the remote() caller at creation time.

In the owner worker, only one OwnerRRef instance contains real data, while in the user worker, any number of UserRRefs can be included as needed, and UserRRef does not save data. When using RRP, the owner uses the globally unique RRefId to get the unique OwnerRRef instance. In rpc_sync(), rpc_async(), or remote() calls, the owner creates a UserRRef and uses it as a parameter or return value. The owner is notified and the reference count is updated accordingly. If there is no UserRRef instance globally and no reference to OwnerRRef on the owner, the OwnerRRef and its data will be deleted.

1.3.1 Assumptions

The design of the RRef protocol is based on the following assumptions.

  • Transient Network Failures: RRef is designed to deal with Transient Network Failures by retrying messages. RRef cannot handle node crashes or permanent network partitions. When these events occur, the application should shut down all workers, restore to the previous checkpoint, and then resume training.
  • Non-idempotent UDF (Non-idempotent UDFs) : We assume provided torpc_sync()rpc_async()remote()User functions (UDFs) are not idempotent and therefore cannot be retried. However, the internal RRef control message is idempotent and can be retried if the message fails.
  • Out of Order Message Delivery: We do not make assumptions about the Order of Message Delivery between a pair of nodes, because both sender and receiver use multiple threads, so there is no guarantee of which Message will be processed first.

Next, we’ll give you an overview of how to use it. For details, you can see pytorch.org/docs/master…

1.3.2 Synchronous Invocation

Here is the synchronous call API, which executes a blocking RPC call on worker to to run func. RPC messages are sent and received in parallel with the execution of Python code. This method is thread-safe.

torch.distributed.rpc.rpc_sync( to , func , args = None , kwargs = None , timeout = - 1.0 )
Copy the code

Specific parameters are as follows:

  • To – Name /rank/WorkerInfo of the target worker.
  • func (callable) — a callable function such as Python callables, built-in operators (such as add()), and annotated TorchScript functions.
  • argsfuncThe parameter tuple of the call.
  • kwargsfuncInvoke the dictionary of keyword arguments.
  • Timeout – The timeout time (in seconds) to be used for this RPC

The return value is the result of running func with args and kwargs.

Sample:

Make sure MASTER_ADDR and MASTER_PORT are set on both workers.

export MASTER_ADDR=localhost
export MASTER_PORT=5678
Copy the code

Then run the following code in two different processes

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
Copy the code
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
Copy the code

1.3.2 Asynchronous Invocation

The following is the asynchronous call API, which executes a non-blocking RPC call on worker to to run func. RPC messages are sent and received in parallel with the execution of Python code. This method is thread-safe. This method immediately returns a Future that can be waited for.

torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=- 1.0)
Copy the code

Specific parameters are as follows:

  • to– Name /rank/ of the target workerWorkerInfo.
  • func (callable) — a callable function such as Python callables, built-in operators (such as add()), and annotated TorchScript functions.
  • argsfuncThe parameter tuple of the call.
  • kwargs– isfuncInvoke the dictionary of keyword arguments.
  • Timeout – The timeout time (in seconds) to be used for this RPC

Returns a Future object that can be waited for. Once done, the return value of func can be retrieved from the object.

Sample:

Make sure MASTER_ADDR and MASTER_PORT are set on both workers.

>>> export MASTER_ADDR=localhost
>>> export MASTER_PORT=5678
Copy the code

Then run the following code in two different processes

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
Copy the code
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
Copy the code

0 x02 sample

We follow with pytorch.org/docs/master… Learning on the basis of.

Suppose you have two nodes and a very simple model that spans two node partitions. This can be done using torch.distributed. RPC as follows.

The main motivation behind distributed Autograd is to run backpropagation Loss on this distributed model, and we have calculated and recorded the gradients of all tensors that require gradients.

import torch
import torch.distributed.rpc as rpc

def my_add(t1, t2) :
  return torch.add(t1, t2)

# On worker 0:
t1 = torch.rand((3.3), requires_grad=True)
t2 = torch.rand((3.3), requires_grad=True)

# Perform some computation remotely.
t3 = rpc.rpc_sync("worker1", my_add, args=(t1, t2))

# Perform some computation locally based on remote result.
t4 = torch.rand((3.3), requires_grad=True)
t5 = torch.mul(t3, t4)

# Compute some loss.
loss = t5.sum(a)Copy the code

0x03 Autograd record during forward propagation

PyTorch builds an Autograd diagram during forward propagation, which is used to perform backward propagation. For more details, see how Autograd encodes history.

For distributed Autograd, we need to track all RPCS during forward propagation to ensure that backward propagation is performed correctly. To this end, when RPC is executed, we append send and RECV functions to the autograd diagram.

  • thesendThe function is attached to the originating node of RPC, whose output edge points to the AUTOgrad function of the RPC input tensor. During backward propagation,sendThe input to the function is received from the target and corresponds torecvOutput of the function.
  • therecvThe function is attached to the RPC’s receiving target node and its input is taken from operators that execute on the RPC receiving target using input tensors. During backward propagation,recvThe output gradient of the function will be sent above the source node and assendMethod input.
  • everysend-recvThe pair is assigned a globally uniqueautograd_message_idUniquely identify thesend-recvright This is useful for finding the corresponding function on the remote node during backward propagation.
  • forRRef, whenever we calltorch.distributed.rpc.RRef.to_here(), we all add an appropriate for the involved tensorssend-recvright

For example, this is what the autograd diagram of our example above looks like (t5.sum() is excluded for simplicity).

As we can see, the send method is the sender in forward propagation, but the receiver in back propagation.

0x04 Distributed Autograd Context

Every use of distributed autograd forward and backward propagation are assigned a unique torch. The distributed. Autograd. The context, and this context has a globally unique autograd_context_id. Context is created on each node if necessary.

Context is used as follows:

  1. Multiple nodes running distributed back propagation may accumulate gradients on the same tensor and store gradients on the tensor’s.gradAbove the law. Before we run the optimizer, tensor of.gradGradients from various distributed back propagation may be accumulated. It’s kind of like takingtorch.autograd.backward()Multiple calls are made locally. In order to provide a means of separating each backpropagation gradient, the gradient will be accumulated in each backpropagation processtorch.distributed.autograd.context.
  2. During forward propagation, we store the propagated values of each Autograd in contextsendandrecvFunction. This ensures that we keep references to the appropriate nodes in the Autograd diagram to keep them active. In addition, this makes it easy to find the corresponding during backward propagationsendandrecvFunction.
  3. In general, we also use this context to store some metadata propagated by each distributed Autograd.

From the user’s point of view, the Autograd context is set as follows:

import torch.distributed.autograd as dist_autograd
with dist_autograd.context() as context_id:
  loss = model.forward()
  dist_autograd.backward(context_id, loss)
Copy the code

Note that the forward propagation of the model must be invoked in the distributed Autograd context manager, because a valid context is required to ensure that all send and RECV methods are stored and backward propagation is performed on all participating nodes.

0x05 Distributed Reverse propagation

In this section, we outline the challenges of accurately calculating dependencies during distributed backpropagation and also describe several algorithms that perform distributed backpropagation (with tradeoffs within the algorithms).

5.1 Calculating dependencies

First, consider running the following code on a single machine

import torch
a = torch.rand((3.3), requires_grad=True)
b = torch.rand((3.3), requires_grad=True)
c = torch.rand((3.3), requires_grad=True)
d = a + b
e = b * c
d.sum.().backward()
Copy the code

Here is the autograd diagram for the code above.

As part of the back propagation, the first step performed by the Autograd engine is to calculate the number of dependencies for each node in the Autograd diagram. This helps the Autograd engine know when the nodes in the diagram are ready to execute. The numbers add(1) and MUl (0) in parentheses indicate the number of dependencies. As you can see, this means that during backward propagation, the ADD node requires one input and the MUL node requires no input (in other words, no execution). The local Autograd engine calculates these dependencies by traversing the graph from the root node (in this case, D).

In fact, some nodes in an Autograd diagram may not execute in backward propagation. This fact poses a challenge to distributed Autograd. Consider this code using RPC.

import torch
import torch.distributed.rpc as rpc

a = torch.rand((3.3), requires_grad=True)
b = torch.rand((3.3), requires_grad=True)
c = torch.rand((3.3), requires_grad=True)

d = rpc.rpc_sync("worker1", torch.add, args=(a, b))
e = rpc.rpc_sync("worker1", torch.mul, args=(b, c))
loss = d.sum(a)Copy the code

The associated Autograd diagram for the above code would be:

Computing the dependencies for this distributed Autograd diagram is more challenging and requires some overhead (in terms of computing or network communication).

For performance-sensitive applications, we can avoid a lot of overhead by assuming that every SEND and RECV function is a valid component of back propagation (most applications do not execute unused RPCS). This simplifies the distributed Autograd algorithm and is more efficient, but at the cost of applications that need to be aware of these limitations. This algorithm is called the FAST mode algorithm, which is described in detail below.

In general, as part of backward propagation, it may not be necessary for every send and RECV function to be valid. To solve this problem, we propose a SMART mode algorithm, which will be described in a later section. Please note that only FAST mode algorithm is implemented at present.

5.2 FAST Mode algorithm

The key assumption of the algorithm is that when we run back propagation, each send function has a dependency of 1. In other words, we assume that we will receive gradients through RPC from another node.

The algorithm is as follows:

  1. We start with the worker with backpropagated roots (all roots must be local).
  2. Find all of the current Distributed Autograd ContextsendFunction.
  3. From the supplied root and all that we retrievedsendFunction, we evaluate the dependency locally.
  4. After the dependencies are calculated, start the local Autograd engine using the provided root.
  5. When the Autograd engine executes thisrecvFunction, therecvThe function sends the input gradient to the appropriate worker via RPC. eachrecvAll functions know the target worker ID because it is logged as part of the forward propagation. throughautograd_context_idandautograd_message_idrecvThe function is sent to the remote host.
  6. When the remote host receives this request, we useautograd_context_idandautograd_message_idTo find the right onesendFunction.
  7. If this is the first time the worker receives a pair givenautograd_context_id, which evaluates the dependency locally as described in Points 1-3 above.
  8. It will then be received at point 6sendMethod is inserted into the queue for execution on the worker’s local Autograd engine.
  9. And finally, we’re not at Tensor.gradInstead, the gradient is accumulated separately on each Distributed Autograd Context. Gradient is stored inDict[Tensor, Tensor],Dict[Tensor, Tensor]It’s basically a mapping from the Tensor to its associated gradient, and you can retrieve that mapping using the get_gradients() API.

For example, the complete code for distributed Autograd is as follows:

import torch
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc

def my_add(t1, t2) :
  return torch.add(t1, t2)

# On worker 0:

# Setup the autograd context. Computations that take
# part in the distributed backward pass must be within
# the distributed autograd context manager.
with dist_autograd.context() as context_id:
  t1 = torch.rand((3.3), requires_grad=True)
  t2 = torch.rand((3.3), requires_grad=True)

  # Perform some computation remotely.
  t3 = rpc.rpc_sync("worker1", my_add, args=(t1, t2))

  # Perform some computation locally based on remote result.
  t4 = torch.rand((3.3), requires_grad=True)
  t5 = torch.mul(t3, t4)

  # Compute some loss.
  loss = t5.sum(a)# Run the backward pass.
  dist_autograd.backward(context_id, [loss])

  # Retrieve the gradients from the context.
  dist_autograd.get_gradients(context_id)
Copy the code

The distributed Autograd graph with dependencies looks like this (t5.sum() is excluded for simplicity) :

The FAST mode algorithm applied to the above example is as follows:

  1. inWorker 0Up, we go from the rootslossandsend1Start calculating dependencies. As a result,send1rightWorker 0Is 1,mulrightWorker 0The dependency number of is 1.
  2. Now, here we areWorker 0Start the local Autograd engine. We do it firstmulFunction, whose output is ast4Is stored cumulatively in the context of Autograd. Then, we executerecv2And it sends these gradients toWorker 1.
  3. Because this isWorker 1The information about this backpropagation is known for the first time, so it does the dependency calculation and marks it accordinglysend2.addandrecv1The dependency of.
  4. Next, in theWorker 1The localautogradThe engine will besend2Insert the queue and the engine will execute in sequenceaddandrecv1.
  5. When you performrecv1, it sends the gradient toWorker 0.
  6. Due to theWorker 0The dependency for this backward propagation has been calculated, so it will only be locallysend1Insert queue and execute.
  7. In the end,t1.t2andt4The gradient of is accumulated in the distributed Autograd context.

5.3 SMART Mode Algorithm

The full details of the algorithm are still being explored, but for a general idea, you can refer to the Intelligent Patterns section of the Distributed Autograd algorithm in the RFC.

0x06 Distributed Optimizer

The DistributedOptimizer operates as follows:

  1. Get the remote parameters to tune (RRefList). These parameters can also be included locallyRRefThe local parameter of.
  2. Will aOptimizerClass as the local optimizer, the optimizer will be in all the differentRRefRun on the owner.
  3. The distributed optimizer creates a local on each work nodeOptimizerInstance, and for eachOptimizerTo save aRRef.
  4. When callingtorch.distributed.optim.DistributedOptimizer.step(), the distributed optimizer uses RPC to remotely execute all local optimizers on the appropriate remote workers. Must betorch.distributed.optim.DistributedOptimizer.step()Provide a distributed autogradcontext_id. Local optimizer usedcontext_idStore gradients in the appropriate context.
  5. If multiple concurrent distributed optimizers are updating the same set of parameters on a worker, these updates will be sequenced via locks.

0x07 Simple end-to-end example

To sum up, here is a simple end-to-end example using distributed Autograd and distributed optimizer. If you put your code in a file named “dist_autograd_simple.py”, you can run it using the following command: MASTER_ADDR=”localhost” MASTER_PORT=29500 python dist_autograd_simple.py

import torch
import torch.multiprocessing as mp
import torch.distributed.autograd as dist_autograd
from torch.distributed import rpc
from torch import optim
from torch.distributed.optim import DistributedOptimizer

def random_tensor() :
    return torch.rand((3.3), requires_grad=True)

def _run_process(rank, dst_rank, world_size) :
    name = "worker{}".format(rank)
    dst_name = "worker{}".format(dst_rank)

    # Initialize RPC.
    rpc.init_rpc(
        name=name,
        rank=rank,
        world_size=world_size
    )

    # Use a distributed autograd context.
    with dist_autograd.context() as context_id:
        # Forward pass (create references on remote nodes).
        rref1 = rpc.remote(dst_name, random_tensor)
        rref2 = rpc.remote(dst_name, random_tensor)
        loss = rref1.to_here() + rref2.to_here()

        # Backward pass (run distributed autograd).
        dist_autograd.backward(context_id, [loss.sum()])

        # Build DistributedOptimizer.
        dist_optim = DistributedOptimizer(
        optim.SGD,
        [rref1, rref2],
        lr=0.05.)# Run the distributed optimizer step.
        dist_optim.step(context_id)

def run_process(rank, world_size) :
    dst_rank = (rank + 1) % world_size
    _run_process(rank, dst_rank, world_size)
    rpc.shutdown()

if __name__ == '__main__':
  # Run world_size workers
  world_size = 2
  mp.spawn(run_process, args=(world_size,), nprocs=world_size)
Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Pytorch.org/docs/master…

Pytorch.org/docs/master…

Pytorch.org/docs/master…

Pytorch.org/docs/master…