0 x00 the

We saw AutogradMetadata, DistAutogradContainer, and DistAutogradContext as base classes. We have seen how distributed Autograd is passed based on RPC, how it interacts between nodes, and how the nodes maintain these sessions separately. The main purpose of this article is to see how back propagation can be incorporated into the engine.

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

For better illustration, the code in this article will be streamlined accordingly.

0x01 Previous memories

Let’s recall the content of the previous articles.

First, for distributed Autograd, we need to track all RPCS during forward propagation to ensure that backward propagation is performed correctly. To this end, when RPC is executed, we append send and RECV functions to the autograd diagram.

  • thesendThe function is attached to the originating node of RPC, whose output edge points to the AUTOgrad function of the RPC input tensor. During backward propagation,sendThe input to the function is received from the target and corresponds torecvOutput of the function.
  • therecvThe function is attached to the RPC’s receiving target node and its input is taken from operators that execute on the RPC receiving target using input tensors. During backward propagation,recvThe output gradient of the function will be sent above the source node and assendMethod input.
  • everysend-recvThe pair is assigned a globally uniqueautograd_message_idUniquely identify thesend-recvright This is useful for finding the corresponding function on the remote node during backward propagation.
  • For RRef, whenever we calltorch.distributed.rpc.RRef.to_here(), we all add an appropriate for the involved tensorssend-recvright

Second, in the concrete code for forward propagation, we store the send and RECV functions of each Autograd propagation in context. This ensures that we keep references to the appropriate nodes in the Autograd diagram to keep them active. In addition, this makes it easy to find the corresponding SEND and RECV functions during backward propagation.

Again, the following is a torch/CSRC/distributed/RPC/message. The message definition of h section:

// Messages with autograd info
FORWARD_AUTOGRAD_REQ = 0x0f | MessageTypeFlags::REQUEST_TYPE,
FORWARD_AUTOGRAD_RESP = 0x10 | MessageTypeFlags::RESPONSE_TYPE,

// Messages to propagate gradients on the backward pass.
BACKWARD_AUTOGRAD_REQ = 0x11 | MessageTypeFlags::REQUEST_TYPE,
BACKWARD_AUTOGRAD_RESP = 0x12 | MessageTypeFlags::RESPONSE_TYPE,
Copy the code

In the previous section, we saw how FORWARD_AUTOGRAD_REQ is called in forward propagation, assuming the following code: rpc.rpc_sync(“worker1”, torch. Add, args=(T1, T2))

  • Rpc_sync calls _invoke_rpc.
  • _invoke_rpc calls _invoke_rpc_builtin.
  • It then calls pyRpcBuiltin, which in turn calls sendMessageWithAutograd.
  • SendMessageWithAutograd internally builds the FORWARD_AUTOGRAD_REQ message, which is sent using RPC.

At this point, we have a few questions about the overall process:

  • How to initiate the backpropagation at the starting position of the backpropagation graph, and how to pass it to the next link of the backpropagation?
  • When is BACKWARD_AUTOGRAD_REQ called in the internal loop of backpropagation? When is the RECV operation invoked? Where is recvAutogradFunctions_ set in context?
  • How to enter the distributed Autograd engine?

We are going to analyze these questions, the core of which is how to enter the Dist. Autograd engine.

0 x02 calculation chart

Let’s start with a couple of examples from a calculation diagram.

2.1 Common Examples

Looking at general computing first, this is the local version of the official dist.auto legend. It can be seen that AddBackward0, AccumulateGrad and MulBackward0 constitute the calculation diagram.

t1 = torch.rand((3.3), requires_grad=True)
t2 = torch.rand((3.3), requires_grad=True)
t3 = t1 + t2
t4 = torch.rand((3.3), requires_grad=True)
t5 = torch.mul(t3, t4)
next_functions = t5.grad_fn.next_functions
Copy the code

The specific corresponding figure is as follows:

2.2 Distributed Example

Let’s look at the distributed example. This is the code that roughly corresponds to the legend in the official design. We call Torch. Mul (t3, T4) T5 and add Loss.

def worker0() :
    # On worker 0:

    # Setup the autograd context. Computations that take
    # part in the distributed backward pass must be within
    # the distributed autograd context manager.
    with dist_autograd.context() as context_id:
      t1 = torch.rand((3.3), requires_grad=True)
      t2 = torch.rand((3.3), requires_grad=True)

      # Perform some computation remotely.
      t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))

      # Perform some computation locally based on remote result.
      t4 = torch.rand((3.3), requires_grad=True)
      t5 = torch.mul(t3, t4)

      # Compute some loss.
      loss = t5.sum(a)# Run the backward pass.
      dist_autograd.backward(context_id, [loss])

      # Retrieve the gradients from the context.
      dist_autograd.get_gradients(context_id)

      print(loss)
Copy the code

Under distributed, T3 operates remotely.

  • T5 corresponds to muL, t5.grad_fn is

    .
  • Grad_fn is

    , that is, recV corresponds to CppFunction.
  • Loss is tensor(5.5680, grad_fn=).
  • The rest are None.

Let’s show the design legend again. The example code above is worker 0 on the left below. T3 is actually running on worker 1.

2.3 Distributed annotated edition

For better illustration, we printed some logs as comments.

def _verify_send(send_function) :
    print(send_function.name())
    next_funcs = send_function.next_functions
    print(next_funcs[0] [0].name())
    print(next_funcs[1] [0].name())

def _verify_recv(recv_function) :
    print(recv_function.name())
    next_funcs = recv_function.next_functions
    print(len(next_funcs))

def worker0() :
    # On worker 0:

    # Setup the autograd context. Computations that take
    # part in the distributed backward pass must be within
    # the distributed autograd context manager.
    with dist_autograd.context() as context_id:
      t1 = torch.rand((3.3), requires_grad=True)
      t2 = torch.rand((3.3), requires_grad=True)

      # Perform some computation remotely.
      #t3 = rpc.rpc_sync("worker1", my_add, args=(t1, t2))
      t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))

      # Perform some computation locally based on remote result.
      t4 = torch.rand((3.3), requires_grad=True)
      t5 = torch.mul(t3, t4)

      # Compute some loss.
      loss = t5.sum(a)print("--- send ---")
      ctx = dist_autograd._retrieve_context(context_id)
      send_functions = ctx._send_functions()
      _verify_send(list(send_functions.values())[0])

      print("--- loss ---")
      print(loss)
      mul_func = loss.grad_fn.next_functions[0] [0]
      print(mul_func.name())
      next_funcs = mul_func.next_functions
      print(next_funcs[0] [0].name())
      print(next_funcs[1] [0].name())

      print("---- recv ----")
      recv_functions = ctx._recv_functions()
      _verify_recv(list(recv_functions.values())[0])

      # Run the backward pass.
      dist_autograd.backward(context_id, [loss])

      # Retrieve the gradients from the context.
      dist_autograd.get_gradients(context_id)

Copy the code

The printed result is:

--- send ---
torch::distributed::autograd::SendRpcBackward
torch::autograd::AccumulateGrad
torch::autograd::AccumulateGrad
        
--- loss ---
tensor(3.5197. grad_fn=<SumBackward0>) MulBackward0 torch::distributed::autograd::RecvRpcBackward torch::autograd::AccumulateGrad ---- recv ---- torch::distributed::autograd::RecvRpcBackwardCopy the code

After adding the distributed correlation operator, the legend is as follows:

0x03 Reverse Propagation

We are going to look at how to enter dist Autograd engine, combined with our legend, which is:

  • How does worker 0 actively initiate backpropagation and then enter the distributed engine?
  • How does Woker 0 internally initiate a backpropagation request to Worker 1?
  • How does Worker 1 passively receive a backpropagation message and then enter a distributed engine?

3.1 Initiating reverse propagation

Let’s figure out how to initiate back propagation, from the bottom up. There are also two types:

  • One is to initiate actively. For example, the loss of Worker 0 in the figure above actively calls backward method.
  • One is implicit internal initiation. For example, in worker 0 above, T3 tells worker 1 through recV that you should start backpropagation.

3.1.1 External Initiative

3.1.1.1 sample

Let’s look at how a BACKWARD with distributed Autograd can be invoked actively, as shown in the example.

def worker0() :
    # On worker 0:

    with dist_autograd.context() as context_id:
      t1 = torch.rand((3.3), requires_grad=True)
      t2 = torch.rand((3.3), requires_grad=True)

      # Perform some computation remotely.
      t3 = rpc.rpc_sync("worker1", torch.add, args=(t1, t2))

      # Perform some computation locally based on remote result.
      t4 = torch.rand((3.3), requires_grad=True)
      t5 = torch.mul(t3, t4)

      # Compute some loss.
      loss = t5.sum(a)# Run the backward pass.Dist_autograd.backward (context_id, [loss]) // is called hereCopy the code
3.1.1.2 c + + world

In torch/_C/_distributed_autograd.pyi we can see the following comment:

# This module is defined in torch/csrc/distributed/autograd/init.cpp
Copy the code

So we went to the torch/CSRC/distributed/autograd/init. CPP file and have a look.

Part of the code is omitted, and here you can see that the context is generated, defining BACKWARD, get_gradients, and so on.

PyObject* dist_autograd_init(PyObject* _unused, PyObject* noargs) {
  auto autograd_module =
      THPObjectPtr(PyImport_ImportModule("torch.distributed.autograd"));
  auto torch_C_module = THPObjectPtr(PyImport_ImportModule("torch._C"));
  auto torch_C_m = py::handle(torch_C_module).cast<py::module> ();auto m = torch_C_m.def_submodule("_distributed_autograd"."distributed autograd bindings");
  auto module = py::handle(m).cast<py::module> ();auto distAutogradContext =
      shared_ptr_class_<DistAutogradContext>(module."DistAutogradContext").def(
              "_context_id",
              &DistAutogradContext::contextId,
              py::call_guard<py::gil_scoped_release>())
          .def(
              "_recv_functions"[] (const DistAutogradContext& ctx) {
                std::map<int64_t, py::object> funcs;
                for (const auto& map_entry : ctx.recvFunctions()) {
                  funcs.emplace(
                      map_entry.first,
                      py::reinterpret_steal<py::object>(
                          torch::autograd::functionToPyObject(
                              map_entry.second)));
                }
                returnfuncs; }).def(
              "_send_functions"[] (const ContextPtr& ctx) {
                std::map<int64_t, py::object> funcs;
                for (const auto& map_entry : ctx->sendFunctions()) {
                  funcs.emplace(
                      map_entry.first,
                      py::reinterpret_steal<py::object>(
                          torch::autograd::functionToPyObject(
                              map_entry.second)));
                }
                returnfuncs; }).def("_known_worker_ids", &DistAutogradContext::getKnownWorkerIds);

  module.def(
      "_new_context"[] () - >const ContextPtr {
        return DistAutogradContainer::getInstance().newContext(a); }, py::return_value_policy::reference); py::options options; options.disable_function_signatures(a);module.def(
      "backward",
      backward,
      py::arg("contextId"),
      py::arg("roots"),
      py::arg("retain_graph") = false,
      py::call_guard<py::gil_scoped_release>());

  module.def(
      "get_gradients"[] (int64_t contextId) -> py::dict {
        const auto& autogradContext =
            DistAutogradContainer::getInstance().retrieveContext(contextId);
        return torch::jit::toPyObject(IValue(autogradContext->getGradients()));
      },
      py::arg("context_id")); Py_RETURN_TRUE; }}// namespace
Copy the code

Specific backward defined in the torch/CSRC/distributed/autograd/autograd CPP.

void backward(
    int64_t context_id,
    const variable_list& roots,
    bool retain_graph) {
  RECORD_FUNCTION(
      kDistAutogradBackwardProfilingKey, std::vector<c10::IValue>());
  try {
    DistEngine::getInstance().execute(context_id, roots, retain_graph);
  } catch (std::exception& e) {
    // FIXME: crashes if exception type is not RuntimeError
    throw std::runtime_error(e.what()); }}Copy the code

As you can see, DistEngine::getInstance().execute(context_id, roots, retain_graph) is eventually called to propagate back. That goes into the engine.

3.1.2 Implicit Internal Initiation

Because it is implicit, so the code is more hidden, we use the bottom-up way to peel the silk cocoons. We know that BACKWARD_AUTOGRAD_REQ is sent if propagation back between nodes is required, so we start the search with BACKWARD_AUTOGRAD_REQ.

3.1.2.1 BACKWARD_AUTOGRAD_REQ

In the torch/CSRC/distributed/autograd rpc_messages/propagate_gradients_req PropagateGradientsReq: in the midst of the CPP: toMessageImpl will call to BACKWARD_AUTOGRAD_REQ.

Message PropagateGradientsReq::toMessageImpl(a) && {
  std::vector<at::IValue> ivalues;
  // Add all the grad tensors.
  for (const auto& grad : grads_) {
    ivalues.emplace_back(grad);
  }

  // Now add autograd metadata.
  ivalues.emplace_back(autogradMetadata_.autogradContextId);
  ivalues.emplace_back(autogradMetadata_.autogradMessageId);

  // Add retain graph.
  ivalues.emplace_back(retainGraph_);

  // Now pickle using JIT pickler.
  std::vector<torch::Tensor> tensorTable;
  std::vector<char> payload =
      jit::pickle(c10::ivalue::Tuple::create(std::move(ivalues)), &tensorTable);

  return Message(
      std::move(payload),
      std::move(tensorTable),
      MessageType::BACKWARD_AUTOGRAD_REQ); // this will be used here
}
Copy the code
3.1.2.2 PropagateGradientsReq

Continue to find who sent BACKWARD_AUTOGRAD_REQ, that is, who called toMessageImpl? Originally in the torch / / distributed/autograd CSRC/functions provides/recvrpc_backward CPP constructed PropagateGradientsReq here, toMessage is used to construct a message. That is, the call to RecvRpcBackward sends BACKWARD_AUTOGRAD_REQ.

variable_list RecvRpcBackward::apply(variable_list&& grads) { / / call the Node
  std::vector<Variable> outputGrads;
  for (size_t i = 0; i < grads.size(a); i++) {const auto& grad = grads[i];
    if (grad.defined()) {
      outputGrads.emplace_back(grad);
    } else {
      // Put in zeros for a tensor with no grad.
      outputGrads.emplace_back(input_metadata(i).zeros_like()); }}auto sharedContext = autogradContext_.lock(a);// Send the gradients over the wire and record the future in the autograd
  // context.
  PropagateGradientsReq gradCall( // 这里构建了 PropagateGradientsReq
      autogradMetadata_,
      outputGrads,
      sharedContext->retrieveGraphTask()->keep_graph_);

  // Send the gradients over to the appropriate node.
  auto rpcAgent = rpc::RpcAgent::getCurrentRpcAgent(a);auto jitFuture = rpcAgent->send( // Send to the next node in the backward propagation process
      rpcAgent->getWorkerInfo(fromWorkerId_),
      std::move(gradCall).toMessage(), / / here call PropagateGradientsReq: : toMessageImpl
      rpc::kUnsetRpcTimeout,
      deviceMap_);

  // Record the future in the context.
  sharedContext->addOutstandingRpc(jitFuture);

  // 'recv' function sends the gradients over the wire using RPC, it doesn't
  // need to return anything for any downstream autograd function.
  return variable_list(a); }Copy the code

So we know that when RecvRpcBackward is executed, BACKWARD_AUTOGRAD_REQ is sent to the next node. Where exactly is RecvRpcBackward called? We’ll cover this in the next DistEngine article.

At this point, the details are as follows: t3 of worker 0 sends BACKWARD_AUTOGRAD_REQ message to worker 1.

                                                                +
                                                       worker 0 | worker 1
                                                                |
                                                                |
 RecvRpcBackward                         PropagateGradientsReq  |
       +                                          +             |
       |                                          |             |
       |                                          |             |
       |                                          |             |
       v                                          |             |
                                                  |             |
     apply()                                      |             |
       +                                          |             |
       |                                          v             |
       |                                                        |
       | +------------------------------>  toMessageImpl        |
       |                                          +             |
       |                                          |             |
       |   Message(BACKWARD_AUTOGRAD_REQ)         |             |
       | <----------------------------------------+             |
       |                                                        |
       |                                                        |
       v                                                        |
                                                                |
rpcAgent+>send(Message)  +-------------------------------------------->
       +                             BACKWARD_AUTOGRAD_REQ      |
       |                                                        |
       |                                                        |
       v                                                        |
                                                                +
Copy the code

The corresponding example diagram is:

3.2 Accept back propagation

Let’s take a look at how the receiver handles the backpropagation. Let’s go back to Worker 1 again and see how the send node in the figure receives the backpropagation message.

3.2.1 Receiving Messages

When TensorPipeAgent is generated, configure the RequestCallbackImpl as a callback function. This is the unified response function of the Agent. About agent receives the logic in front of the time, we also mentioned, will enter RequestCallbackNoPython: : processRpc function. You can see the processing logic for BACKWARD_AUTOGRAD_REQ.

This is the normal flow of RPC.

void RequestCallbackNoPython::processRpc(
    RpcCommandBase& rpc,
    const MessageType& messageType,
    const int64_t messageId,
    const c10::intrusive_ptr<JitFuture>& responseFuture,
    std::shared_ptr<LazyStreamContext> ctx) const {

  switch (messageType) {

    case MessageType::BACKWARD_AUTOGRAD_REQ: { 
      processBackwardAutogradReq(rpc, messageId, responseFuture); // call here
      return;
    };
Copy the code

3.2.2 Processing messages

In the processBackwardAutogradReq will:

  • Obtain DistAutogradContainer.
  • Get the context, which was previously established in the forward propagation process. As previously known, in this legend, each autograd propagation in worker 0 and worker 1 shares the same context context ID.
  • The corresponding SendRpcBackward is retrieved from the context by the sender’s context ID. Here we see how context is used.
  • Call executeSendFunctionAsync for engine processing, using sendFunction as an argument.
void RequestCallbackNoPython::processBackwardAutogradReq(
    RpcCommandBase& rpc,
    const int64_t messageId,
    const c10::intrusive_ptr<JitFuture>& responseFuture) const {
  auto& gradientsCall = static_cast<PropagateGradientsReq&>(rpc);
  const auto& autogradMetadata = gradientsCall.getAutogradMetadata(a);// Retrieve the appropriate autograd context.
  auto autogradContext = DistAutogradContainer::getInstance().retrieveContext(
      autogradMetadata.autogradContextId); // Get the context ID of the sender

  // Lookup the appropriate 'send' function to enqueue.
  std::shared_ptr<SendRpcBackward> sendFunction = // Get sendFunction based on sender context ID and message ID
      autogradContext->retrieveSendFunction(autogradMetadata.autogradMessageId);

  // Attach the gradients to the send function.
  sendFunction->setGrads(gradientsCall.getGrads()); // Set the gradient

  // Now execute the autograd graph using the "distributed engine."
  auto execFuture = DistEngine::getInstance().executeSendFunctionAsync( // Call engine
      autogradContext, sendFunction, gradientsCall.retainGraph());

  // Our response is satisfied when the rpcs come back.
  execFuture->addCallback([responseFuture, messageId](JitFuture& execFuture) {
    if(! execFuture.hasError()) {
      Message m = std::move(PropagateGradientsResp()).toMessage(a); m.setId(messageId);
      responseFuture->markCompleted(
          IValue(c10::make_intrusive<Message>(std::move(m))));
    } else {
      responseFuture->setError(execFuture.exception_ptr()); }}); }Copy the code

In DistEngine worker 1: : executeSendFunctionAsync internal, will be after processing, eventually sent BACKWARD_AUTOGRAD_REQ to its downstream back propagation, so we continue to grow in the figure above modification, Add a BACKWARD_AUTOGRAD_REQ.

3.3 summarize

We can see that there are two ways to enter the Dist Autograd engine and start back propagation:

  • One is that the sample code explicitly and actively calls BACKWARD to DistEngine::getInstance().execute, which is worker 0.
  • One is called passive DistEngine: : getInstance () executeSendFunctionAsync, is the worker 1 (of course, the worker 0 send also corresponds to a passive calls).

DistEngine is now the source of both the top-down/bottom-up search for backpropagation, so we’ll cover DistEngine in our next article.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Pytorch how to implement backward propagation (3)—- engine dynamic logic

Pytorch how to implement backward propagation (2)—- engine static structure

PyTorch how to implement backward propagation (4)—- specific algorithm

How to implement back propagation (1)—- call engine