0 x00 the

In this article, we will look at how the engine propagates backward based on these dependencies. Through the study of this article, you can:

  • To understand how RecvRpcBackward sends RPC messages to corresponding downstream nodes, we can sort out the interaction process of backward propagation between workers again.
  • Learn how AccumulateGrad accumulates gradients in context.

PyTorch distributed other articles as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

PyTorch Distributed Autograd (4) —-

PyTorch Distributed Autograd (5) —-

For better illustration, the code in this article will be streamlined accordingly.

0 x01 review

We first review the FAST mode algorithm algorithm as follows, this article needs to discuss the following parts.

  1. We start with the worker with backpropagated roots (all roots must be local).
  2. Find all of the current Distributed Autograd ContextsendFunction.
  3. From the supplied root and all that we retrievedsendFunction, we evaluate the dependency locally.
  4. After the dependencies are calculated, start the local Autograd engine using the provided root.
  5. When the Autograd engine executes thisrecvFunction, therecvThe function sends the input gradient to the appropriate worker via RPC. eachrecvAll functions know the target worker ID because it is logged as part of the forward propagation. throughautograd_context_idandautograd_message_idrecvThe function is sent to the remote host.
  6. When the remote host receives this request, we useautograd_context_idandautograd_message_idTo find the right onesendFunction.
  7. If this is the first time the worker receives a pair givenautograd_context_id, which evaluates the dependency locally as described in Points 1-3 above.
  8. It will then be received at point 6sendMethod is inserted into the queue for execution on the worker’s local Autograd engine.
  9. And finally, we’re not at Tensor.gradInstead, the gradient is accumulated separately on each Distributed Autograd Context. Gradient is stored inDict[Tensor, Tensor],Dict[Tensor, Tensor]It’s basically a mapping from the Tensor to its associated gradient, and you can retrieve that mapping using the get_gradients() API.

Second, let’s look at the total execution code, which is done in DistEngine::execute as follows:

  • Use contextId to get the forward context.
  • Using validateRootsAndRetrieveEdges for validation.
  • Construct a GraphRoot to drive backward propagation, which can be thought of as a virtual root.
  • Use computeDependencies to calculate dependencies.
  • Using runEngineAndAccumulateGradients back propagation calculation.
  • Using clearAndWaitForOutstandingRpcsAsync waiting for RPC.
void DistEngine::execute(
    int64_t contextId,
    const variable_list& roots,
    bool retainGraph) {
  // Retrieve the context for the given context_id. This will throw if the
  // context_id is invalid.
  auto autogradContext =
      DistAutogradContainer::getInstance().retrieveContext(contextId);

  // Perform initial pre-processing.
  edge_list rootEdges;
  variable_list grads;
  validateRootsAndRetrieveEdges(roots, rootEdges, grads); 

  // Construct a GraphRoot to drive backward propagation, thought of as a virtual root
  std::shared_ptr<Node> graphRoot =
      std::make_shared<GraphRoot>(rootEdges, grads);
  edge_list outputEdges;
  // Compute dependencies locally, starting from all roots and all 'send'
  // functions.
  {
    std::lock_guard<std::mutex> guard(initializedContextIdsLock_);
    // Context should not have been initialized already.
    TORCH_INTERNAL_ASSERT(
        initializedContextIds_.find(autogradContext->contextId()) ==
        initializedContextIds_.end());

    // Calculate dependencies
    computeDependencies(
        autogradContext, rootEdges, grads, graphRoot, outputEdges, retainGraph);

    // Mark the autograd context id as initialized.
    initializedContextIds_.insert(autogradContext->contextId());
  }

  BackwardPassCleanupGuard guard(autogradContext);

  // This needs to be blocking and as a result we wait for the future to
  // complete.
  runEngineAndAccumulateGradients(autogradContext, graphRoot, outputEdges)
      ->waitAndThrow(a);// Back propagation calculation

  // Wait for all of the outstanding rpcs to complete.
  autogradContext->clearAndWaitForOutstandingRpcsAsync() - >waitAndThrow(a); }Copy the code

Again, as we saw earlier, dependencies are handled in computeDependencies, and all function information that needs to be computed is located on graphTask.exec_info_. We then look at how to calculate, is runEngineAndAccumulateGradients and clearAndWaitForOutstandingRpcsAsync these two methods.

0 x02 GraphTask execution

We look first at how to use runEngineAndAccumulateGradients back propagation calculation, cumulative gradient.

2.1 runEngineAndAccumulateGradients

The engine, the first call runEngineAndAccumulateGradients. Basically, you wrap a NodeTask and call execute_graph_task_until_ready_queue_EMPTY from that. At ::launch is used to start the thread.

c10::intrusive_ptr<c10::ivalue::Future> DistEngine::
    runEngineAndAccumulateGradients(
        const ContextPtr& autogradContext,
        const std::shared_ptr<Node>& graphRoot,
        const edge_list& outputEdges,
        bool incrementOutstandingTasks) {
  // Cleanup previous state for outstanding RPCs. Outstanding RPCs could be
  // lingering if we're running backward multiple times and some of the
  // passes ran into errors.
  autogradContext->clearOutstandingRpcs(a);/ / get GraphTask
  auto graphTask = autogradContext->retrieveGraphTask(a);// A thread is started to run execute_graph_task_until_ready_queue_EMPTY
  at::launch([this, graphTask, graphRoot, incrementOutstandingTasks]() {
    execute_graph_task_until_ready_queue_empty(
        /*node_task*/ NodeTask(graphTask, graphRoot, InputBuffer(0)),
        /*incrementOutstandingTasks*/ incrementOutstandingTasks);
  });
    
  // Use a reference here to avoid refcount bump on futureGrads.
  // Process the result
  auto& futureGrads = graphTask->future_result_;

  // Build a future that waits for the callbacks to execute (since callbacks
  // execute after the original future is completed). This ensures we return a
  // future that waits for all gradient accumulation to finish.
  auto accumulateGradFuture =
      c10::make_intrusive<c10::ivalue::Future>(c10::NoneType::get());

  futureGrads->addCallback(
      [autogradContext, outputEdges, accumulateGradFuture](c10::ivalue::Future& futureGrads) {
        if (futureGrads.hasError()) {
		  // Omit the error handling part
          return;
        }

        try {
          const variable_list& grads =
              futureGrads.constValue().toTensorVector(a);// The flag is finished
          accumulateGradFuture->markCompleted(c10::IValue());
        } catch (std::exception& e) {
          accumulateGradFuture->setErrorIfNeeded(std::current_exception()); }});return accumulateGradFuture;
}
Copy the code

At: : launch in aten/SRC/aten/ParallelThreadPoolNative CPP, here will be in the thread calls incoming func.

void launch(std::function<void()> func) {

  internal::launch_no_thread_state(std::bind([](
    std::function<void()> f, ThreadLocalState thread_locals) {
      ThreadLocalStateGuard guard(std::move(thread_locals));
      f(a); }, std::move(func),
    ThreadLocalState())); }namespace internal {
    void launch_no_thread_state(std::function<void()> fn) {
    #if AT_EXPERIMENTAL_SINGLE_THREAD_POOL
      intraop_launch(std::move(fn));
    #else
      get_pool().run(std::move(fn));
    #endif}}Copy the code

Let’s take a look at how each of these internal methods is executed.

2.2 execute_graph_task_until_ready_queue_empty

This function is similar to Engine::thread_main in that the GraphTask is executed by a NodeTask in which evaluate_function keeps inserting new NodeTasks to cpu_ready_queue. The engine_.evaluate_function method will:

  • First, initialize the native engine thread.
  • Second, each call creates a CPU_ready_queue, which is used to traverse graph_Task from root_to_execute. This allows different threads to execute GraphTask in parallel, a CPU-dependent queue.
  • Insert the incoming node_task into cpu_ready_queue.
  • Follow the reverse calculation diagram from the root to the leaf node.
    • Here, the leaf nodes are all AccumulateGrad or RecvRpcBackward.

    • If it is an intermediate node, the calculation is normal.

    • If it is a RecvRpcBackward, an RPC message is sent to the corresponding downstream node.

    • If it is AccumulateGrad, the gradient is accumulated in context.

The specific code is as follows:

void DistEngine::execute_graph_task_until_ready_queue_empty(
    NodeTask&& node_task,
    bool incrementOutstandingTasks) {
  
  // Initialize the native engine thread
  engine_.initialize_device_threads_pool(a);// Create a ready queue per call to traverse the graph_task from
  // root_to_execute This allow concurrent execution of the same GraphTask from
  // different threads
  // Each call creates a ready queue for traversing graph_Task from root_to_execute, which allows parallel execution of GraphTask by different threads, a CPU-specific queue
  std::shared_ptr<ReadyQueue> cpu_ready_queue = std::make_shared<ReadyQueue>();
  auto graph_task = node_task.base_.lock(a);if (graph_task == nullptr) {
    LOG(ERROR) << "GraphTask has expired for NodeTask: "
               << node_task.fn_->name() < <", skipping execution.";
    return;
  }

  cpu_ready_queue->push(std::move(node_task), incrementOutstandingTasks);

  torch::autograd::set_device(torch::autograd::CPU_DEVICE);
  graph_task->owner_ = torch::autograd::CPU_DEVICE;
  while(! cpu_ready_queue->empty()) {
    std::shared_ptr<GraphTask> local_graph_task;
    {
      // Scope this block of execution since NodeTask is not needed after this
      // block and can be deallocated (release any references to grad tensors
      // as part of inputs_)
      NodeTask task = cpu_ready_queue->pop(a);// Retrieve a NodeTask
      if(! (local_graph_task = task.base_.lock())) {
        continue;
      }
      if(task.fn_ && ! local_graph_task->has_error_.load()) {
        AutoGradMode grad_mode(local_graph_task->grad_mode_);
        try {
          GraphTaskGuard guard(local_graph_task);
          engine_.evaluate_function( // The Node function is called
              local_graph_task, task.fn_.get(), task.inputs_, cpu_ready_queue);
        } catch (std::exception& e) {
          engine_.thread_on_exception(local_graph_task, task.fn_, e);
          // break the loop in error so that we immediately stop the execution
          // of this GraphTask, mark it completed if necessary and return the
          // future with proper ErrorMessage
          break; }}}// Decrement the outstanding task.
    --local_graph_task->outstanding_tasks_; // A NodeTask is processed
  }
  // Check if we've completed execution.
  if (graph_task->completed()) {
    // We don't need to explicitly notify the owner thread, since
    // 'mark_as_completed_and_run_post_processing' would mark the Future as
    // completed and this would notify the owner thread that the task has been
    // completed.
    graph_task->mark_as_completed_and_run_post_processing();
  }
}
Copy the code

In addition, there are three places to call execute_graph_task_until_ready_queue_EMPTY.

  1. RunEngineAndAccumulateGradients will call, here is the condition of backward users active call, is this section is introduced.
  2. ExecuteSendFunctionAsync is called, which corresponds to what a node does after receiving a gradient from a node that propagates back, which we’ll describe in the next section.
  3. GlobalCpuThread is called, which is a CPU working thread, which we’ll cover in a moment.
  4. In engine.evaluate_function, the gradient is accumulated against AccumulateGrad.
  5. In engine.evaluate_function, RecvRpcBackward is called to send a message to the backward propagation downstream.

Let’s summarize a few procedures for calculating gradients, each corresponding to the following three numbers.

 User Training Script             RPC BACKWARD_AUTOGRAD_REQ
     +                                         +
     |                                         |
     | 1                                       | 2
     v                                         v
 backward                         RequestCallbackNoPython.processRpc
     +                                         +
     |                                         |
     |                                         |
     v                                         v
 DistEngine.execute               RequestCallbackNoPython.processBackwardAutogradReq
     +                                         +
     |                                         |
     |                                         |
     |                                         v
     |              +----------+  DistEngine.executeSendFunctionAsync
     |              |                               +
     |              |                               |
     v              v                               |
DistEngine.computeDependencies                      |
     |                                              |
     |                                              |
     v                                              |
 DistEngine.runEngineAndAccumulateGradients         |     DistEngine.globalCpuThread
     +                                              |                   +
     |                           +------------------+                   |
     |                           |                                      | 3
     |                           |             +------------------------+
     |                           |             |
     |                           |             |
     v                           v             v
 DistEngine.execute_graph_task_until_ready_queue_empty
     +
     |
     |
     v
 DistEngine.evaluate_function
     +
     |
     +--------------------------------------------------------------+
     |                                                              |
     |  4 AccumulateGrad                                            | 5  RecvRpcBackward
     v                                                              v

(*hook)(captured_grad)                            call_function(graph_task, func, inputs)
Copy the code

2.3 evaluate_function

In the above code, the evaluate_function of the native engine is actually called to do this.

Let’s look at how to use exec_info_ and not process it if it is not set to be executed. Here we can see how recvBackwardEdges mentioned above interact with exec_info_.

Walk through recvBackwardEdges and, for each recvBackward, set stop to be executed above the corresponding item in graphTask.exec_info_.

The specific code is as follows:

  • The gradient is accumulated for AccumulateGrad.
  • Call RecvRpcBackward to send a message downstream of backpropagation.
void Engine::evaluate_function(
    std::shared_ptr<GraphTask>& graph_task,
    Node* func,
    InputBuffer& inputs,
    const std::shared_ptr<ReadyQueue>& cpu_ready_queue) {
  // If exec_info_ is not empty, we have to instrument the execution
  auto& exec_info_ = graph_task->exec_info_;
  if(! exec_info_.empty()) {
    auto& fn_info = exec_info_.at(func);
    if (auto* capture_vec = fn_info.captures_.get()) {
      // Lock mutex for writing to graph_task->captured_vars_.
      std::lock_guard<std::mutex> lock(graph_task->mutex_);
      for (const auto& capture : *capture_vec) {
        auto& captured_grad = graph_task->captured_vars_[capture.output_idx_];
        captured_grad = inputs[capture.input_idx_];
        for (auto& hook : capture.hooks_) {
          captured_grad = (*hook)(captured_grad); / / call hook, here is the operator DistAccumulateGradCaptureHook (), captured_grad is the accumulation of gradient}}}if(! fn_info.needed_) {// Skip execution if we don't need to execute the function.
      return; // If no Settings need to be executed, return directly. RecvBackward sets need to be executed}}// Call recvBackward here
  auto outputs = call_function(graph_task, func, inputs);
    
  // The following code is omitted
Copy the code

2.4 globalCpuThread

Globalcputhreads can be seen in the GPU to CPU continuations section above. A globalCpuThread is a worker thread that pops a NodeTask from the Ready queue and executes it.

For globalCpuThread, the ready_queue argument is global_cpu_ready_queue_

void DistEngine::globalCpuThread(
    const std::shared_ptr<ReadyQueue>& ready_queue) {
  while (true) {
    NodeTask task = ready_queue->pop(a);if (task.isShutdownTask_) {
      // Need to shutdown this thread.
      break;
    }

    auto graphTask = task.base_.lock(a);if (graphTask == nullptr) {
      // GraphTask has expired, ignore and continue processing.
      continue;
    }

    // Launch the execution on a JIT thread.
    at::launch([this,
                graphTask,
                graphRoot = task.fn_,
                variables =
                    InputBuffer::variables(std::move(task.inputs_))]() mutable {
      InputBuffer inputs(variables.size());
      for (size_t i = 0; i < variables.size(a); i++) { inputs.add(i, std::move(variables[i]), c10::nullopt, c10::nullopt);
      }
      execute_graph_task_until_ready_queue_empty( // this is called
          /*node_task*/ NodeTask(graphTask, graphRoot, std::move(inputs)),
          /*incrementOutstandingTasks*/ false); }); }}Copy the code

A CPU-specific queue is also set up for normal engines.

auto graph_task = std::make_shared<GraphTask>(
    /* keep_graph */ keep_graph,
    /* create_graph */ create_graph,
    /* depth */ not_reentrant_backward_call ? 0 : total_depth + 1./* cpu_ready_queue */ local_ready_queue);
Copy the code

2.5 summary

For a distributed engine, the main differences from a normal engine in computing are:

  • If it is a RecvRpcBackward, an RPC message is sent to the corresponding downstream node.

  • If it is AccumulateGrad, the gradient is accumulated in context.

So let’s look at how to deal with these two parts.

0 x03 RPC calls

In the previous article, we saw how the receiver handles a back-propagated RPC call. Let’s take a look at how the engine initiates a back-propagated RPC call, by calling the RECV method.

This applies to the case where worker 0 calls recv and the execution comes to worker 1, as shown in the corresponding design document below.

When the Autograd engine executes the RECV function, the RECV function sends the input gradient to the appropriate worker via RPC. Each recV function knows the target worker ID because it is logged as part of the forward propagation. The recV function is sent to the remote host using autograd_context_id and autograd_message_id.

So let’s see how to do recv.

Specifically combined with the distributed engine, when the engine finds that a Node is RecvRpcBackward, it calls its apply function.

void Engine::evaluate_function(
    std::shared_ptr<GraphTask>& graph_task,
    Node* func,
    InputBuffer& inputs,
    const std::shared_ptr<ReadyQueue>& cpu_ready_queue) {
  // If exec_info_ is not empty, we have to instrument the execution
  auto& exec_info_ = graph_task->exec_info_;
  if(! exec_info_.empty()) {
    // Omit the gradient accumulation part of the code, see the above section
    if(! fn_info.needed_) {// Skip execution if we don't need to execute the function.
      return; // If no Settings need to be executed, return directly. RecvBackward sets need to be executed}}// Call recvbackward. apply
  auto outputs = call_function(graph_task, func, inputs);
    
  // The following code is omitted
Copy the code

3.1 RecvRpcBackward

3.1.1 defines

RecvRpcBackward is defined as follows,

class TORCH_API RecvRpcBackward : public torch::autograd::Node {
 public:
  explicit RecvRpcBackward(
      const AutogradMetadata& autogradMetadata,
      std::shared_ptr<DistAutogradContext> autogradContext,
      rpc::worker_id_t fromWorkerId,
      std::unordered_map<c10::Device, c10::Device> deviceMap);

  torch::autograd::variable_list apply( torch::autograd::variable_list&& grads) override;

 private:
  const AutogradMetadata autogradMetadata_;

  // Hold a weak reference to the autograd context to avoid circular
  // dependencies with the context (since it holds a reference to
  // RecvRpcBackward).
  std::weak_ptr<DistAutogradContext> autogradContext_;

  // The worker id from which the RPC was received. During the backward pass,
  // we need to propagate the gradients to this workerId.
  rpc::worker_id_t fromWorkerId_;

  // Device mapping for tensors sent over RPC.
  const std::unordered_map<c10::Device, c10::Device> deviceMap_;
};
Copy the code

3.1.2 build

The constructor is as follows.

RecvRpcBackward::RecvRpcBackward(
    const AutogradMetadata& autogradMetadata,
    ContextPtr autogradContext,
    rpc::worker_id_t fromWorkerId,
    std::unordered_map<c10::Device, c10::Device> deviceMap)
    : autogradMetadata_(autogradMetadata),
      autogradContext_(std::move(autogradContext)),
      fromWorkerId_(fromWorkerId),
      deviceMap_(std::move(deviceMap)) {}
Copy the code

3.1.3 the apply

Torch / / distributed/autograd CSRC/functions provides/recvrpc_backward CPP defines its apply function, its function is:

  • Put the incoming gradient grads into outputGrads, for output to the next link.
  • Build PropagateGradientsReq, which is BACKWARD_AUTOGRAD_REQ.
  • Send the RPC to the next link.
variable_list RecvRpcBackward::apply(variable_list&& grads) {
  std::vector<Variable> outputGrads;
  for (size_t i = 0; i < grads.size(a); i++) {// Here is the incoming gradient grads into outputGrads
    const auto& grad = grads[i];
    if (grad.defined()) {
      outputGrads.emplace_back(grad);
    } else {
      // Put in zeros for a tensor with no grad.
      outputGrads.emplace_back(input_metadata(i).zeros_like()); }}auto sharedContext = autogradContext_.lock(a);// Send the gradients over the wire and record the future in the autograd
  // context.
  PropagateGradientsReq gradCall( / / build PropagateGradientsReq
      autogradMetadata_,
      outputGrads,
      sharedContext->retrieveGraphTask()->keep_graph_);

  // Send the gradients over to the appropriate node.
  auto rpcAgent = rpc::RpcAgent::getCurrentRpcAgent(a);auto jitFuture = rpcAgent->send( / / send the RPC
      rpcAgent->getWorkerInfo(fromWorkerId_),
      std::move(gradCall).toMessage(), // toMessageImpl was called
      rpc::kUnsetRpcTimeout,
      deviceMap_);

  // Record the future in the context.
  sharedContext->addOutstandingRpc(jitFuture);

  // 'recv' function sends the gradients over the wire using RPC, it doesn't
  // need to return anything for any downstream autograd function.
  return variable_list(a); }Copy the code

PropagateGradientsReq is sent, so let’s move on.

3.2 PropagateGradientsReq

3.2.1 definition

PropagateGradientsReq extends RpcCommandBase.

// Used to propagate gradients from one node to another during a distributed
// backwards pass. This RPC call is invoked when we hit a `recv` autograd
// function during backward pass execution.
class TORCH_API PropagateGradientsReq : public rpc::RpcCommandBase {
 public:
  PropagateGradientsReq(
      const AutogradMetadata& autogradMetadata,
      std::vector<torch::autograd::Variable> grads,
      bool retainGraph = false);

  const AutogradMetadata& getAutogradMetadata(a);

  const std::vector<torch::autograd::Variable>& getGrads(a);

  // Serialization and deserialization methods.
  rpc::Message toMessageImpl(a) && override;
  static std::unique_ptr<PropagateGradientsReq> fromMessage(
      const rpc::Message& message);

  // Whether or not to retain the autograd graph.
  bool retainGraph(a);

 private:
  AutogradMetadata autogradMetadata_;
  std::vector<torch::autograd::Variable> grads_;
  bool retainGraph_;
};
Copy the code

Its toMessageImpl indicates that the message is BACKWARD_AUTOGRAD_REQ.

Message PropagateGradientsReq::toMessageImpl(a) && {
  std::vector<at::IValue> ivalues;
  // Add all the grad tensors.
  for (const auto& grad : grads_) {
    ivalues.emplace_back(grad);
  }

  // Now add autograd metadata.
  ivalues.emplace_back(autogradMetadata_.autogradContextId);
  ivalues.emplace_back(autogradMetadata_.autogradMessageId);

  // Add retain graph.
  ivalues.emplace_back(retainGraph_);

  // Now pickle using JIT pickler.
  std::vector<torch::Tensor> tensorTable;
  std::vector<char> payload =
      jit::pickle(c10::ivalue::Tuple::create(std::move(ivalues)), &tensorTable);

  return Message(
      std::move(payload),
      std::move(tensorTable),
      MessageType::BACKWARD_AUTOGRAD_REQ); // The message type is specified here.
}
Copy the code

3.3 the receiving party

To complete the argument, let’s look at how the receiver handles back propagation.

3.3.1 Receiving messages

When TensorPipeAgent is generated, configure the RequestCallbackImpl as a callback function. This is the unified response function of the Agent. About agent receives the logic in front of the time, we also mentioned, will enter RequestCallbackNoPython: : processRpc function. You can see the processing logic for BACKWARD_AUTOGRAD_REQ.

This is the normal flow of RPC.

void RequestCallbackNoPython::processRpc(
    RpcCommandBase& rpc,
    const MessageType& messageType,
    const int64_t messageId,
    const c10::intrusive_ptr<JitFuture>& responseFuture,
    std::shared_ptr<LazyStreamContext> ctx) const {

  switch (messageType) {

    case MessageType::BACKWARD_AUTOGRAD_REQ: { 
      processBackwardAutogradReq(rpc, messageId, responseFuture); // call here
      return;
    };
Copy the code

3.3.2 rainfall distribution on 10-12 processBackwardAutogradReq

In the processBackwardAutogradReq will:

  • Obtain DistAutogradContainer.
  • Get the context.
  • Call executeSendFunctionAsync for engine processing.

From this, we can see that there are two ways to enter the engine:

  • One is that the sample code explicitly and actively calls BACKWARD to DistEngine::getInstance().execute, which is worker 0.
  • One is called passive DistEngine: : getInstance (). ExecuteSendFunctionAsync, worker is 1.
void RequestCallbackNoPython::processBackwardAutogradReq(
    RpcCommandBase& rpc,
    const int64_t messageId,
    const c10::intrusive_ptr<JitFuture>& responseFuture) const {
  auto& gradientsCall = static_cast<PropagateGradientsReq&>(rpc);
  const auto& autogradMetadata = gradientsCall.getAutogradMetadata(a);// Retrieve the appropriate autograd context.
  auto autogradContext = DistAutogradContainer::getInstance().retrieveContext(
      autogradMetadata.autogradContextId); // Get the context ID of the sender

  // Lookup the appropriate 'send' function to enqueue.
  std::shared_ptr<SendRpcBackward> sendFunction = // Get sendFunction based on sender context ID and message ID
      autogradContext->retrieveSendFunction(autogradMetadata.autogradMessageId);

  // Attach the gradients to the send function.
  sendFunction->setGrads(gradientsCall.getGrads()); // Set the gradient

  // Now execute the autograd graph using the "distributed engine."
  auto execFuture = DistEngine::getInstance().executeSendFunctionAsync( // Call engine
      autogradContext, sendFunction, gradientsCall.retainGraph());

  // Our response is satisfied when the rpcs come back.
  execFuture->addCallback([responseFuture, messageId](JitFuture& execFuture) {
    if(! execFuture.hasError()) {
      Message m = std::move(PropagateGradientsResp()).toMessage(a); m.setId(messageId);
      responseFuture->markCompleted(
          IValue(c10::make_intrusive<Message>(std::move(m))));
    } else {
      responseFuture->setError(execFuture.exception_ptr()); }}); }Copy the code

3.3.3 executeSendFunctionAsync

ExecuteSendFunctionAsync is starting to go into the engine, and notice, this is the receiver is also going into the engine, doing the computation on the receiver. ExecuteSendFunctionAsync calls execute_graph_task_until_readY_queue_EMPTY directly, or it is possible to calculate dependencies first and continue execution. Here you can refer to the design:

  • 6) When the remote host receives this request, we useautograd_context_idandautograd_message_idTo find the right onesendFunction.
  • 7) If this is the first time that the worker receives the pair givenautograd_context_id, which evaluates the dependency locally as described in Points 1-3 above.
  • 8) And will be received at point 6sendMethod is inserted into the queue for execution on the worker’s local Autograd engine.

The specific code is as follows:

c10::intrusive_ptr<c10::ivalue::Future> DistEngine::executeSendFunctionAsync(
    const ContextPtr& autogradContext,
    const std::shared_ptr<SendRpcBackward>& sendFunction,
    bool retainGraph) {

  // Typically the local autograd engine ensures stream synchronizations between
  // nodes in the graph. However, for distributed autograd the sendFunction
  // inputs might have been retrieved over the wire on a separate stream and the
  // sendFunction itself runs on a different stream. As a result, we need to
  // manually synchronize those two streams here.
  const auto& send_backward_stream = sendFunction->stream(c10::DeviceType::CUDA);
  if (send_backward_stream) { // Get the Stream for this execution
    for (const auto& grad : sendFunction->getGrads()) {
        const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA};
        const auto default_stream = guard.getStream(grad.device());
        if(send_backward_stream ! = default_stream) {auto event = c10::Event{c10::DeviceType::CUDA};
          event.record(default_stream);
          send_backward_stream->wait(event); // Synchronization is required to ensure that the current operation is complete}}}std::unique_lock<std::mutex> lock(initializedContextIdsLock_);
  if (initializedContextIds_.find(autogradContext->contextId()) ==
      initializedContextIds_.end()) { // iterate to see if the context corresponding to sendFunction has been recorded in this node
    // No context found, dependency needed to be calculated
    edge_list outputEdges;
    // Pass in a dummy graphRoot since all send functions are the roots.
    auto dummyRoot = std::make_shared<GraphRoot>(edge_list(), variable_list());
    computeDependencies( // Calculate dependencies
        autogradContext, {}, {}, dummyRoot, outputEdges, retainGraph);

    // Mark the autograd context id as initialized and unlock.
    initializedContextIds_.insert(autogradContext->contextId());
    lock.unlock(a);// Enqueue the current send function.
    auto graphTask = autogradContext->retrieveGraphTask(a);// Run the autograd engine.
    auto accumulateGradFuture = runEngineAndAccumulateGradients( // Compute the gradient
        autogradContext,
        sendFunction,
        outputEdges,
        /*incrementOutstandingTasks=*/false);

    // Build the 'uber' future that waits for everything.
    auto callbackFuture =
        c10::make_intrusive<c10::ivalue::Future>(c10::NoneType::get());
    // Register a callback
    accumulateGradFuture->addCallback([autogradContext,
                                       callbackFuture](c10::ivalue::Future& accumulateGradFuture) {
      try {
        if (accumulateGradFuture.hasError()) {
          // Perform cleanup at the end of the backward pass (before we mark
          // the future as completed).
          DistEngine::getInstance().cleanupBackwardPass(autogradContext);

          // Skip any further processing on errors.
          callbackFuture->setError(accumulateGradFuture.exception_ptr());
          return;
        }

        // Wait for all RPCs after the autograd engine is done.
        auto rpcFuture = autogradContext->clearAndWaitForOutstandingRpcsAsync(a); rpcFuture->addCallback([callbackFuture, autogradContext](c10::ivalue::Future& rpcFuture) {
          try {
            // Perform cleanup at the end of the backward pass (before
            // we mark the future as completed).
            DistEngine::getInstance().cleanupBackwardPass(autogradContext);
          } catch (std::exception& e) {
            callbackFuture->setErrorIfNeeded(std::current_exception());
            return;
          }

          // Finally mark the 'uber' future as completed.
          if(! rpcFuture.hasError()) {
            callbackFuture->markCompleted(c10::IValue());
          } else {
            callbackFuture->setError(rpcFuture.exception_ptr()); }}); }catch (std::exception& e) {
        callbackFuture->setErrorIfNeeded(std::current_exception()); }});// Return the future which waits for all async processing to be done.
    return callbackFuture;
  } else { // Context can be found in the current Node
    lock.unlock(a);auto graphTask = autogradContext->retrieveGraphTask(a); at::launch([this, graphTask, sendFunction]() {
      execute_graph_task_until_ready_queue_empty(
          /*node_task*/ NodeTask(graphTask, sendFunction, InputBuffer(0)),
          /*incrementOutstandingTasks*/ false);
    });
    auto fut = c10::make_intrusive<c10::ivalue::Future>(c10::NoneType::get());
    fut->markCompleted(c10::IValue());
    returnfut; }}Copy the code

The details are as follows:

                                                                  +
                                                         worker 0 | worker 1
                                                                  |
  Engine            RecvRpcBackward              RpcAgent         |     RequestCallbackNoPython             DistEngine
    +                    +                          +             |              +                              +
    |                    |                          |             |              |                              |
    |                    |                          |             |              |                              |
evaluate_function        |                          |             |              |                              |
    +                    |                          |             |              |                              |
    |                    |                          |             |              |                              |
    +                    |                          |             |              |                              |
  call_function          |                          |             |              |                              |
    +                    |                          |             |              |                              |
    |      grads         v                          |             |              |                              |
    +----------------> apply                        |             |              |                              |
    |                    +                          |             |              |                              |
    |                    |                          |             |              |                              |
    |                    +                          |             |              |                              |
    |                 gradCall                      |             |              |                              |
    |                    +                          |             |              |                              |
    |                    |  PropagateGradientsReq   |             |              |                              |
    |                    +------------------------> |             |              |                              |
    |                    |                          |             +              |                              |
    |                    |                          +   BACKWARD_AUTOGRAD_REQ    |                              |
    |                    |                        send  +---------+--------->    |                              |
    |                    |                          +             |              |                              |
    |                    |                          |             |              +                              |
    |                    |                          |             |     processBackwardAutogradReq              |
    |                    |                          |             |              +                              |
    |                    |                          |             |              |                              +
    |                    |                          |             |              +------------> executeSendFunctionAsync
    |                    |                          |             |              |                              +
    |                    |                          |             |              |                              |
    |                    |                          |             |              |                              |
    v                    v                          v             +              v                              v


Copy the code

The mobile phone is as follows:

0x04 DistAccumulateGradCaptureHook

So far it looks like the overall logic is complete, but there is actually a piece missing that corresponds to the design document:

Finally, instead of accumulating gradients on Tensor. Grad, we will accumulate gradients on each Distributed Autograd Context. Gradients are stored in Dict, which is basically a mapping from the Tensor to its associated gradient, and you can retrieve that mapping using the get_gradients() API.

Is the long distance/local gradient cumulative to the local context, so we analyze DistAccumulateGradCaptureHook again.

4.1 define

There are three functions: DistAccumulateGradCaptureHook

  1. Call the pre hooks of original AccumulateGrad to modify the input gradient.

  2. Accumulate grad into the RPC context.

  3. Call post hooks for original AccumulateGrad.

Its definition is as follows:

// This hook does 3 things:
// 1. Call pre hooks of the original AccumulateGrad to modify the input grad.
// 2. Accumuate the gard to RPC context.
// 3. Call post hooks of the original AccumulateGrad.
class DistAccumulateGradCaptureHook
    : public GraphTask::ExecInfo::Capture::GradCaptureHook {
 public:
  DistAccumulateGradCaptureHook(
      std::shared_ptr<AccumulateGrad> accumulateGrad,
      ContextPtr autogradContext)
      : accumulateGrad_(std::move(accumulateGrad)),
        autogradContext_(std::move(autogradContext)) {}

  at::Tensor operator(a)(const at::Tensor& grad) override {
    ThreadLocalDistAutogradContext contextGuard{ContextPtr(autogradContext_)};
    variable_list inputGrads = {grad};
    // It's intended that pre/post hooks are still called even if the grad is
    // undenfined here.
    for (const auto& hook : accumulateGrad_->pre_hooks()) {
      inputGrads = (*hook)(inputGrads); / / call the pre - hooks
    }

    // It is possible that the grad is not defined since a separate
    // invocation of the autograd engine on the same node might actually
    // compute this gradient.
    if (inputGrads[0].defined()) {
      // There are 3 internal references to 'inputGrads[0]' at this moment:
      // 1. 'inputGrads[0]' in this function.
      // 2. 'graph_task->captured_vars_' on the callsite in the local engine.
      // 3. 'InputBuffer& inputs' on the callsite as the inputs of the
      // function node.
      autogradContext_->accumulateGrad( // Cumulative gradient
          accumulateGrad_->variable, inputGrads[0].3 /* num_expected_refs */);
    }
    const variable_list kEmptyOuput;
    for (const auto& hook : accumulateGrad_->post_hooks()) {
      (*hook)(kEmptyOuput, inputGrads); / / call the post - hooks
    }
    return inputGrads[0];
  }

 private:
  std::shared_ptr<AccumulateGrad> accumulateGrad_; // This is the target vector that needs to be accumulated, and subsequent operations on top of it
  ContextPtr autogradContext_;
};
Copy the code

4.2 to generate

How to generate DistAccumulateGradCaptureHook? Calculating generating DistAccumulateGradCaptureHook depend on time, but are recorded in the capture. Hooks_. Push_back.

This is to process AccumulateGrad.

  • AccumulateGrad must be a leaf node, which does not need to be executed but needs to accumulate gradient on it, but RecvRpcBackward needs to be executed.

  • Save in DistAccumulateGradCaptureHook AccumulateGrad.

void DistEngine::computeDependencies(
    const ContextPtr& autogradContext,
    const edge_list& rootEdges,
    const variable_list& grads,
    const std::shared_ptr<Node>& graphRoot,
    edge_list& outputEdges,
    bool retainGraph) {
  
  if(! outputEdges.empty()) {
    // Compute 'needed execution' starting from all 'send' functions and the
    // original graphRoot.
    edge_list edges;
    // Create some dummy edges (input_nr not important for init_to_execute).
    for (const auto& mapEntry : sendFunctions) {
      edges.emplace_back(mapEntry.second, 0);
    }

    // Add the original graphRoot as an edge.
    edges.emplace_back(graphRoot, 0);

    // Create a dummy GraphRoot and run init_to_execute with it.
    GraphRoot dummyRoot(edges, {});
    graphTask->init_to_execute(dummyRoot, outputEdges, /*accumulate_grad=*/false./*min_topo_nr=*/0);
    for (auto& mapEntry : graphTask->exec_info_) {
      auto& execInfo = mapEntry.second;
      if(! execInfo.captures_) {continue;
      }
      auto fn = mapEntry.first;
      // There may be nodes other than 'AccumulateGrad', e.g. RecvRPCBackward,
      // to be captured.
      if (auto accumulateGradFn = dynamic_cast<AccumulateGrad*>(fn)) {
        for (auto& capture : *execInfo.captures_) {
          capture.hooks_.push_back( // this will generate
              std::make_unique<DistAccumulateGradCaptureHook>(
                  std::dynamic_pointer_cast<AccumulateGrad>( // AccumulateGrad is stored
                      accumulateGradFn->shared_from_this()), autogradContext)); }}}// Mark all 'RecvRPCBackward' as needing execution.
    for (const auto& recvBackwardEdge : recvBackwardEdges) {
      graphTask->exec_info_[recvBackwardEdge.function.get()].needed_ = true; }}}Copy the code

4.3 the use of

The code is abridged.

First, execute_graph_task_until_ready_queue_EMPTY calls the original engine engine_.evaluate_function.

void DistEngine::execute_graph_task_until_ready_queue_empty(
    NodeTask&& node_task,
    bool incrementOutstandingTasks) {

  while(! cpu_ready_queue->empty()) {
    std::shared_ptr<GraphTask> local_graph_task;
    {
      NodeTask task = cpu_ready_queue->pop(a);if(task.fn_ && ! local_graph_task->has_error_.load()) {
        AutoGradMode grad_mode(local_graph_task->grad_mode_);
        GraphTaskGuard guard(local_graph_task);
        engine_.evaluate_function( // Call the original engine
              local_graph_task, task.fn_.get(), task.inputs_, cpu_ready_queue); }}// Decrement the outstanding task.--local_graph_task->outstanding_tasks_; }}Copy the code

Second, in the original engine code, hooks are called.

void Engine::evaluate_function(
    std::shared_ptr<GraphTask>& graph_task,
    Node* func,
    InputBuffer& inputs,
    const std::shared_ptr<ReadyQueue>& cpu_ready_queue) {
  // If exec_info_ is not empty, we have to instrument the execution
  auto& exec_info_ = graph_task->exec_info_;
  if(! exec_info_.empty()) {
    auto& fn_info = exec_info_.at(func);
    if (auto* capture_vec = fn_info.captures_.get()) {
      // Lock mutex for writing to graph_task->captured_vars_.
      std::lock_guard<std::mutex> lock(graph_task->mutex_);
      for (const auto& capture : *capture_vec) {
        auto& captured_grad = graph_task->captured_vars_[capture.output_idx_];
        captured_grad = inputs[capture.input_idx_];
        for (auto& hook : capture.hooks_) {
          captured_grad = (*hook)(captured_grad); / / call hook, here is the operator DistAccumulateGradCaptureHook (), captured_grad is the accumulation of gradient}}}}// subsequent ellipsis
Copy the code

DistAccumulateGradCaptureHook operator () method, will call the cumulative gradient.

  autogradContext_->accumulateGrad(
      accumulateGrad_->variable, inputGrads[0].3 /* num_expected_refs */);
Copy the code

4.4 Cumulative Gradient

4.4.1 Context Accumulation

void DistAutogradContext::accumulateGrad(
    const torch::autograd::Variable& variable, // variable is the target variable
    const torch::Tensor& grad, // grad is a gradient that needs to be accumulated over variable
    size_t num_expected_refs) {

  std::lock_guard<std::mutex> guard(lock_);
  auto it = accumulatedGrads_.find(variable);
  at::Tensor old_grad;
  if(it ! = accumulatedGrads_.end()) {
    // Accumulate multiple grads on the same variable.
    old_grad = it->value(a); }// Gradients are computed using the forward streams. Local autograd
  // engine uses AccumulateGrad function to retrieve and apply forward
  // stream during the backward computation. In distributed autograd,
  // we directly call AccumulateGrad::accumulateGrad, and skip the
  // CUDA stream restoration from autograd function. Hence, we manually
  // call it here to get the streams correct.
  auto forward_stream =
      torch::autograd::impl::grad_accumulator(variable)->stream(
          grad.device().type());
  c10::OptionalStreamGuard stream_guard(forward_stream);

  // No higher order gradients supported in distributed autograd.
  AutoGradMode grad_mode(false);
  at::Tensor new_grad = AccumulateGrad::callHooks(variable, grad); / / calculate

  AccumulateGrad::accumulateGrad( // Call operator function to accumulate gradient
      variable,
      old_grad,
      new_grad,
      // Add +1 here since we can't std::move(grad) when call
      // AccumulateGrad::callHooks, since it is a const ref, and that incurs a
      // refcount bump for the new_grad.
      num_expected_refs + 1[this, &variable](at::Tensor&& grad_update) {
        auto device = grad_update.device(a); accumulatedGrads_.insert(variable, std::move(grad_update));
        recordGradEvent(device);
      });
}
Copy the code

4.4.2 Operator accumulation

Code is located in the torch / / autograd CSRC/functions provides/accumulate_grad. H. AccumulateGrad is defined as follows:

struct TORCH_API AccumulateGrad : public Node {
  explicit AccumulateGrad(Variable variable_);

  variable_list apply(variable_list&& grads) override;

  static at::Tensor callHooks(
      const Variable& variable,
      at::Tensor new_grad) {
    for (auto& hook : impl::hooks(variable)) {
      new_grad = (*hook)({new_grad})[0];
    }
    return new_grad;
  }

  // Given a variable with its current grad as variable_grad, accumulates
  // new_grad into variable_grad if in place accumulation is possible.
  // Otherwise, uses 'update_grad' to update the grad for the variable.

  // "Gradient Layout Contract"
  //
  // AccumulateGrad tries to stash strided (non-sparse) grads with memory layout
  // (strides) such that variables and grads interact efficiently in later
  // optimizer kernels, and grads interact efficiently with c10d::Reducer.cpp.
  //
  // Specifically, AccumulateGrad tries to ensure the following
  // (cf torch/csrc/autograd/utils/grad_layout_contract.h):
  // (1) if variable.is_non_overlapping_and_dense(), the stashed grad's
  // strides match variable.
  // (2) else, stashed grad is rowmajor contiguous.
  // If variable's grad does not exist (! variable_grad.defined())
  // AccumulateGrad steals new_grad if it's stealable and obeys the contract
  // already, otherwise it deep copies new_grad into an obedient clone.
  //
  // If variable's grad already exists (variable_grad.defined()), new_grad must
  // be added to variable_grad. If we aren't setting up for double backward
  / / (! GradMode::is_enabled()), AccumulateGrad performs "variable_grad += new_grad"
  // in-place, which keeps variable_grad's layout. We assume (hope) variable_grad
  // was created obeying (1) or (2) at some point in the past.
  //
  // If we are setting up for double backward, AccumulateGrad updates the grad
  // out-of-place via "variable_grad + new_grad." TensorIterator operator+ decides
  // result's layout. Typically TensorIterator matches strides of the first arg,
  // so we once again assume (hope) variable_grad was originally created obeying
  // (1) or (2).
  //
  // AccumulateGrad does not enforce the contract with 100% certainty. Examples:
  // - If a user manually permutes a param or its grad, then runs a fwd+bwd,
  // variable_grad += new_grad keeps variable_grad's layout without rechecking
  // the contract.
  // - If TensorIterator changes its corner cases about operator+'s result
  // (for example, giving more or less priority to channels_last inputs, see
  // https://github.com/pytorch/pytorch/pull/37968) the result may not obey.
  //
  // Fortunately, if a given grad doesn't satisfy (1) or (2), the penalty is
  // degraded performance in Reducer.cpp or optimizer kernels, not death by
  // assert or silently bad numerics.

  // variable: the variable whose grad we're accumulating.
  // variable_grad: the current grad for the variable.
  // new_grad: new grad we want to acummulate for the variable.
  // num_expected_refs: the number of refs we expect to hold internally
  // such that it is safe to avoid cloning the grad
  // if use_count() of the grad is less than or equal
  // to this value (in addition to post_hooks).
  // update_grad: Function that is used to update grad for the variable.
  // The argument to the function is a Tensor which
  // is used to set a new value for the grad.
  template <typename T>
  static void accumulateGrad( // There is a specific cumulative gradient
      const Variable& variable,
      at::Tensor& variable_grad,
      const at::Tensor& new_grad,
      size_t num_expected_refs,
      const T& update_grad) {
    if(! variable_grad.defined()) {
      if(! GradMode::is_enabled() &&
          !new_grad.is_sparse() &&
          new_grad.use_count() <= num_expected_refs &&
          (new_grad.is_mkldnn() || utils::obeys_layout_contract(new_grad, variable))) {
        // we aren't setting up for double-backward
        // not sparse
        // no other user-visible tensor references new_grad
        // new_grad obeys the "Gradient Layout Contract", there has a special case,
        // For MKLDNN tensor, which is a opaque tensor, assuming it obeys layout_contract.
        // Under these conditions, we can steal new_grad without a deep copy.
        update_grad(new_grad.detach());
      } else if (
          !GradMode::is_enabled() && new_grad.is_sparse() &&
          new_grad._indices().is_contiguous() &&
          new_grad._values().is_contiguous() &&
          // Use count for indices and values should always be <=1 since the
          // SparseTensor should be the only one holding a reference to these.
          new_grad._indices().use_count() < =1 &&
          new_grad._values().use_count() < =1 &&
          new_grad.use_count() <= num_expected_refs) {
        // Can't detach sparse tensor (since metadata changes are not allowed
        // after detach), so just create a new one for the grad which is a
        // shallow copy. We need a shallow copy so that modifying the original
        // grad tensor doesn't modify the grad we accumulate.
        // We only skip clone if indices and values themselves are contiguous
        // for backward compatiblity reasons. Since without this optimization,
        // earlier we would clone the entire SparseTensor which cloned indices
        // and values.
        // For details see https://github.com/pytorch/pytorch/issues/34375.
        update_grad(at::_sparse_coo_tensor_unsafe(
            new_grad._indices(),
            new_grad._values(),
            new_grad.sizes(),
            new_grad.options()));
      } else {
        if (new_grad.is_sparse()) {
          update_grad(new_grad.clone());
        } else {
          if (new_grad.is_mkldnn()) {
            update_grad(new_grad.clone());
          } else {
            // Deep copies new_grad according to the "Gradient Layout Contract."
            update_grad(utils::clone_obey_contract(new_grad, variable)); }}}}else if(! GradMode::is_enabled()) {
      // This case is not strictly necessary, but it makes the first-order only
      // case slightly more efficient.
      if (variable_grad.is_sparse() && !new_grad.is_sparse()) {
        // If `variable_grad` is sparse and `new_grad` is not sparse, their
        // sum is not sparse, and we must change the TensorImpl type of
        // `variable_grad` for it to store the result. However, changing the
        // TensorImpl type of a tensor requires changing the tensor itself, and
        // thus in this case we have to change the grad tensor.
        auto result = new_grad + variable_grad;
        CHECK_RESULT(result, variable);
        update_grad(std::move(result));
      } else if(! at::inplaceIsVmapCompatible(variable_grad, new_grad)) {
        // Ideally we'd perform an in-place operation to avoid changing
        // the grad tensor. However, if that's impossible because the grads
        // are vmap-incompatible (See NOTE: [vmap-incompatible in-place operations]),
        // then we just add them out-of-place.
        auto result = variable_grad + new_grad;
        CHECK_RESULT(result, variable);
        update_grad(std::move(result));
      } else {
        // In this case we can avoid changing the grad tensor. There are three
        // scenarios when we'll hit this case:
        //
        // 1. `variable_grad` is sparse, and `new_grad` is sparse.
        // 2. `variable_grad` is dense, and `new_grad` is sparse.
        // 3. `variable_grad` is dense, and `new_grad` is dense.
        // 4. `variable_grad` is mkldnn, and `new_grad` is mkldnn.
        //
        // In all of these four cases, `variable_grad += new_grad` is a
        // valid operation which adds `new_grad` to `variable_grad` in
        // place. `variable_grad` is thus still referring to the same tensor
        // after the operation.
        // Also DistributedDataParallel(DDP) package relies on grad being
        // mutated in place for saving peak memory usage. DDP will still
        // work correctly if it is mutated out of place here, but DDP will
        // maintain one extra copy of grad tensors in buffer and thus
        // increase peak memory usage.
        variable_grad += new_grad;
        CHECK_RESULT(variable_grad, variable);
        // ^ We could enforce the contract more aggressively here by writing:
        // if (variable_grad.is_sparse() || new_grad.is_sparse()) {
        // variable_grad += new_grad;
        // } else if (obeys_layout_contract(variable_grad, variable)) {
        // variable_grad += new_grad;
        // } else {
        // result = at::empty_strided(variable.sizes(), variable.strides(),
        // variable.options().memory_format(c10::nullopt));
        // update_grad(at::native::add_out(result, variable_grad, new_grad, 1.0);
        // }
        // However, that accumulation is sometimes in place and sometimes not,
        // which may break user code.}}else {
      at::Tensor result;
      if (variable_grad.is_sparse() && !new_grad.is_sparse()) {
        // CPU backend throws an error on sparse + dense, so prefer dense + sparse here.
        result = new_grad + variable_grad;
      } else {
        // Assumes operator+ result typically matches strides of first arg,
        // and hopes variable_grad was originally created obeying layout contract.
        result = variable_grad + new_grad;
      }
      CHECK_RESULT(result, variable);
      update_grad(std::move(result));
      // ^ We could enforce the contract more aggressively here by saying
      // if (obeys_layout_contract(new_grad, variable)) {
      // update_grad(new_grad + variable_grad);
      // } else {
      // update_grad(variable_grad + new_grad);
      // }
      // such that the stashed grad is likely to have the right strides if
      // either variable_grad or new_grad already has the right strides.
      // We could enforce the contract with certainty by saying
      // auto result = variable_grad + new_grad (or vice versa), checking result's
      // layout, and copying to an obedient clone if necessary before update_grad.
      // The copy would require another gmem pass. We can't create empty result with
      // the right layout then add_out into it with a single kernel, because GradMode
      // is enabled in this branch, and add_out isn't differentiable.
      // Maybe more trouble than it's worth.
    }
  }

  Variable variable;
};
Copy the code

The left side is the data structure, the right side is the algorithm flow, the serial number on the right side indicates the execution from top to bottom, the left side of the data structure will be used in the execution process, the call relationship between the algorithm and the data structure is represented by the horizontal arrow.

  1. The distributed engine calls execute_graph_task_until_ready_queue_EMPTY to execute the specific GraphTask.
  2. Engine::evaluate_function calls ExecInfo in GraphTask.
  3. The operator function of the hook will be called autogradContext_->accumulateGrad.
  4. AutogradContext_ executes accumulateGrad to hook (DistAccumulateGradCaptureHook) save accumulateGrad_ do operation.
  5. AccumulateGrad: : AccumulateGrad will complete the final gradient update operations.
                                     DATA STRUCTURE   +  ALGORITHM
                                                      |
+-----------------------------------------------+     |
| GraphTask                                     |     |  DistEngine::execute_graph_task_until_ready_queue_empty
|                                               |     |      +                |
|   unordered_map<Node*, ExecInfo> exec_info_   |     |      |                |
|                            +                  | <----------+                |
|                            |                  |     |                       |
+-----------------------------------------------+     |                       | 1
                             |                        |                       |
                             |                        |                       |
                             v                        |                       |
       +---------------------+------------------+     |                       v
       | ExecInfo                               | <-------------+  Engine::evaluate_function
       |                                        |     |                       +
       |       < vector<Capture> > captures_    |     |                       |
       |                   +                    |     |                       |
       |                   |                    |     |                       | 2
       +----------------------------------------+     |                       |
                           |                          |                       v
                           |                          |
                           v                          |      +--+ captured_grad = (*hook)(captured_grad)
       +-------------------+--------------------+     |      |                +
       | Capture                                |     |      |                |
       |                                        |     |      |                |
       |   vector< <GradCaptureHook> > hooks_ <--------------+                | 3
       |                   +                    |     |                       |
       +----------------------------------------+     |                       v
                           |                          |
                           |                          |   +--+ autogradContext_->accumulateGrad(
                           v                          |   |         accumulateGrad_-> variable, inputGrads[0].3)
       +-------------------+--------------------+     |   |                   +
       | DistAccumulateGradCaptureHook          |     |   |                   |
       |                                        |     |   |                   |
       |      ContextPtr autogradContext_    <------------+                   | 4
       |                                        |     |   |                   |
       |      AccumulateGrad accumulateGrad_ <------------+                   v
       |                          +             |     |
       +----------------------------------------+     |   +-+ new_grad = AccumulateGrad::callHooks(variable, grad)
                                  |                   |   |                   +
                                  |                   |   |                   |
                                  v                   |   |                   | 5
              +-------------------+------+            |   |                   v
              | AccumulateGrad           |            |   |
              |                          |            |   |      AccumulateGrad::accumulateGrad(
              |      Variable variable <------------------+------+   variable, old_grad, new_grad,)
              |                          |            |
              +--------------------------+            +
Copy the code

The mobile phone is as follows:

0x05 Wait complete

Finally, the distributed engine will call clearAndWaitForOutstandingRpcsAsync to waiting for processing is complete.

c10::intrusive_ptr<c10::ivalue::Future> DistAutogradContext::
    clearAndWaitForOutstandingRpcsAsync() {
  std::unique_lock<std::mutex> lock(lock_);
  auto outStandingRpcs = std::move(outStandingRpcs_);
  lock.unlock(a);struct State {
    explicit State(int32_t count)
        : future(
              c10::make_intrusive<c10::ivalue::Future>(c10::NoneType::get())),
          remaining(count) {}
    c10::intrusive_ptr<c10::ivalue::Future> future;
    std::atomic<int32_t> remaining;
    std::atomic<bool> alreadySentError{false};
  };
  auto state = std::make_shared<State>(outStandingRpcs.size());
  if (outStandingRpcs.empty()) {
    state->future->markCompleted(c10::IValue());
  } else {
    for (auto& rpc : outStandingRpcs) {
      rpc->addCallback([state](rpc::JitFuture& future) {
        if (future.hasError()) {
          // If there's an error, we want to setError() on the future,
          // unless another error has already been sent - use a CAS to
          // guard.
          //
          // Don't decrement num remaining here! (We don't need to, since
          // memory handling is separate). If we simply don't decrement on
          // errors, reaching 0 means that there were no errors - and hence,
          // we can just markCompleted() without any other checking there.
          bool expectedAlreadySent = false;
          if (state->alreadySentError.compare_exchange_strong(
                  expectedAlreadySent, true)) {
            state->future->setError(future.exception_ptr());
          }
          return;
        }

        if (--state->remaining == 0) {
          state->future->markCompleted(c10::IValue()); }}); }}return state->future;
}
Copy the code

As mentioned earlier, there are four king’s of distributed processing. We introduced RPC, RRef, and analyzed distributed engines. Starting with the next article, we will analyze the remaining distributed optimizers.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Distributed Autograd Design

Remote Reference Protocol

PyTorch source code interpretation of distributed training to understand?

Pytorch.org/docs/stable…

Pytorch.apachecn.org/docs/1.7/59…

Pytorch.org/docs/stable…

Pytorch.org/docs/master…

Pytorch.org/docs/master… Pytorch.org/docs/master…

www.w3cschool.cn/pytorch/pyt…

PyTorch distributed Autograd design

Getting started with Distributed RPC Framework

Implementing a Parameter Server using Distributed RPC Framework

Combining Distributed DataParallel with Distributed RPC Framework

Profiling RPC-based Workloads

Implementing batch RPC processing

Distributed Pipeline Parallel