0 x00 the

Having analyzed how to start/accept back propagation and how to get into a distributed Autograd engine, this article and the rest of the article will look at how a distributed engine works. Through the study of this article, readers can understand the basic static architecture and overall execution logic of the Dist. Autograd engine.

Other articles in this series are as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

—– DistributedDataParallel – gradient back-propagation

PyTorch distributed Autograd (1) —- design

PyTorch Distributed Autograd (2) —- RPC foundation

PyTorch Distributed Autograd (3) —-

PyTorch Distributed Autograd (4) —-

For better illustration, the code in this article will be streamlined accordingly.

0x01 Support System

Let’s start by looking at some of the engine’s internal support systems.

1.1 Engine Inlet

The engine entry is called in the backward function, from DistEngine::getInstance().execute into the engine, which, as we know from the previous section, is an active call to the engine.

void backward(
    int64_t context_id,
    const variable_list& roots,
    bool retain_graph) {
  RECORD_FUNCTION(
      kDistAutogradBackwardProfilingKey, std::vector<c10::IValue>());
  try {
    DistEngine::getInstance().execute(context_id, roots, retain_graph);
  } catch (std::exception& e) {
    throw std::runtime_error(e.what()); }}Copy the code

1.2 SendRpcBackward

The passive call engine starts with SendRpcBackward. SendRpcBackward is the backpropagation operator corresponding to the sending behavior in forward propagation. DistAutogradContext stores the relevant information of each distributed autograd on a worker. It encapsulates forward and backward propagation in distributed autograd and accumulates gradients. This prevents multiple workers from influencing each other on their gradients. In the DistAutogradContext, there is a member variable that records the backpropagator corresponding to all the sending behaviors of the worker.

std::unordered_map<int64_t, std::shared_ptr<SendRpcBackward>> sendAutogradFunctions_;
Copy the code

The contents in sendAutogradFunctions_ are all SendRpcBackward.

1.2.1 analysis

As part of the distributed Autograd implementation, SendRpcBackward adds a “SendRpcBackward” Autograd function to the Autograd diagram every time we send an RPC from one node to another. This is a placeholder function used to start the current worker’s Autograd engine when propagating backwards. The edge of this Autograd function is the input to the RPC method.

During backward propagation, this function is queued up for execution in the Autograd engine, which will eventually run the rest of the Autograd diagram.

SendRpcBackward is actually the root of the Autograd graph on the local node. We give the previous schematic diagram as follows:

  • SendRpcBackward does not receive any “input”, but rather the RPC framework passes gradients to the function to start local Autograd calculations.
  • The input edge of SendRpcBackward is the input to the RPC method, which is the gradient.

1.2.2 definition

SendRpcBackward is a derived class of Node, so it has next_edges. As you can see, its new member variable is grads_.

// As part of our distributed autograd implementation, whenever we send an RPC
// from one node to another, we add a 'SendRpcBackward' autograd function to the
// autograd graph. This is more or less a placeholder function that is used to
// kickoff the autograd engine on the current worker on the backward pass. The
// edges for this autograd function are the inputs to the RPC method.
//
// During the backward pass, this function is queued for execution in the
// autograd engine which eventually runs the rest of the autograd graph.
struct TORCH_API SendRpcBackward : public torch::autograd::Node {
 public:
  torch::autograd::variable_list apply( torch::autograd::variable_list&& inputs) override;

  // SendRpcBackward is actually the root of an autograd graph on the local
  // node. As a result, it doesn't receive any 'inputs', but rather the RPC
  // framework passes gradients over to this function to kickoff local autograd
  // computation.
  void setGrads(const torch::autograd::variable_list& grads);

  // Retrieve the grads for the function.
  const torch::autograd::variable_list& getGrads(a) const;

 private:
  torch::autograd::variable_list grads_;
};
Copy the code

1.2.3 build

In the forward propagation process, addSendRpcBackward will build a SendRpcBackward and set its forward propagation input edge as the output edge of backward propagation in SendRpcBackward.

void addSendRpcBackward(
    const ContextPtr& autogradContext,
    const AutogradMetadata& autogradMetadata,
    std::vector<torch::Tensor>& tensors) {
  // Attach autograd information only for tensors requiring grad.
  std::vector<torch::Tensor> tensors_with_grad;
  std::copy_if(
      tensors.begin(),
      tensors.end(),
      std::back_inserter(tensors_with_grad),
      [](const torch::Tensor& t) { return t.requires_grad(a); });// Attach the appropriate autograd edges.
  auto grad_fn = std::make_shared<SendRpcBackward>(); // Build SendRpcBackward
  grad_fn->set_next_edges(
      torch::autograd::collect_next_edges(tensors_with_grad));

  // Add the appropriate input metadata for the grad_fn.
  for (const auto& tensor : tensors_with_grad) {
    grad_fn->add_input_metadata(tensor); // Add edge SendRpcBackward
  }

  // Record the send autograd function in our current context.
  // Insert into context
  autogradContext->addSendFunction(grad_fn, autogradMetadata.autogradMessageId);
}
Copy the code

1. Grads_

The new member variable added with SendRpcBackward is grads_. How is grads_ set and used?

SendRpcBackward provides set, GET operations.

void SendRpcBackward::setGrads(const torch::autograd::variable_list& grads) {
  grads_ = grads;
}

const torch::autograd::variable_list& SendRpcBackward::getGrads(a) const {
  return grads_;
}
Copy the code

When will it be used? In the torch/CSRC/distributed/RPC/request_callback_no_python processBackwardAutogradReq of CPP. ProcessBackwardAutogradReq will:

  1. Use sendFunction->setGrads(gradientscall.getgrads ()) to set the gradient transmitted from the remote end.
  2. Call DistEngine: : getInstance (). ExecuteSendFunctionAsync to perform the engine start to calculate after local.

This corresponds to the following text in the design, which is the starting point of passive entry into the engine:

SendRpcBackward is actually the root of the Autograd graph on the local node. Therefore, it does not receive any “input”, but rather the RPC framework passes gradients to the function to start local autograd calculations.

The specific code is as follows:

void RequestCallbackNoPython::processBackwardAutogradReq(
    RpcCommandBase& rpc,
    const int64_t messageId,
    const c10::intrusive_ptr<JitFuture>& responseFuture) const {
  auto& gradientsCall = static_cast<PropagateGradientsReq&>(rpc);
  const auto& autogradMetadata = gradientsCall.getAutogradMetadata(a);// Retrieve the appropriate autograd context.
  auto autogradContext = DistAutogradContainer::getInstance().retrieveContext(
      autogradMetadata.autogradContextId);

  // Lookup the appropriate 'send' function to enqueue.
  std::shared_ptr<SendRpcBackward> sendFunction =
      autogradContext->retrieveSendFunction(autogradMetadata.autogradMessageId);

  // Attach the gradients to the send function.
  sendFunction->setGrads(gradientsCall.getGrads()); // The gradient from RPC is assigned

  // Now execute the autograd graph using the "distributed engine."
  auto execFuture = DistEngine::getInstance().executeSendFunctionAsync( // Grads_ is used
      autogradContext, sendFunction, gradientsCall.retainGraph());

  // Our response is satisfied when the rpcs come back.
  execFuture->addCallback([responseFuture, messageId](JitFuture& execFuture) {
    if(! execFuture.hasError()) {
      Message m = std::move(PropagateGradientsResp()).toMessage(a); m.setId(messageId);
      responseFuture->markCompleted(
          IValue(c10::make_intrusive<Message>(std::move(m))));
    } else {
      responseFuture->setError(execFuture.exception_ptr()); }}); }Copy the code

ExecuteSendFunctionAsync operates by using sendFunction->getGrads() to extract the gradient.

c10::intrusive_ptr<c10::ivalue::Future> DistEngine::executeSendFunctionAsync(
    const ContextPtr& autogradContext,
    const std::shared_ptr<SendRpcBackward>& sendFunction,
    bool retainGraph) {
  
  // Typically the local autograd engine ensures stream synchronizations between
  // nodes in the graph. However, for distributed autograd the sendFunction
  // inputs might have been retrieved over the wire on a separate stream and the
  // sendFunction itself runs on a different stream. As a result, we need to
  // manually synchronize those two streams here.
  const auto& send_backward_stream = sendFunction->stream(c10::DeviceType::CUDA);
  if (send_backward_stream) {
    for (const auto& grad : sendFunction->getGrads()) { // There is a fetch
        const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA};
        const auto default_stream = guard.getStream(grad.device());
        if(send_backward_stream ! = default_stream) {auto event = c10::Event{c10::DeviceType::CUDA};
          event.record(default_stream);
          send_backward_stream->wait(event); }}}// omit subsequent code
Copy the code

The details are as follows:

0 x02 definition

2.1 define

DistEngine is defined as follows, but some of the code has been removed for better explanation:

class TORCH_API DistEngine {
 public:
  // Retrieve the singleton instance.
  static DistEngine& getInstance(a);

  // Given a list of root variables, start the distributed backwards pass from
  // these variables and accumulate all the gradients in the current autograd
  // context on each node. This method is used to kickoff distributed autograd
  // on a single node.
  void execute(
      int64_t context_id,
      const torch::autograd::variable_list& roots,
      bool retainGraph);

  // Given a send function to execute in the autograd engine, ensures we compute
  // dependencies once for this node and enqueues the send function for execute
  // in the engine.
  // This method is used to kick off the autograd computation on a node when it
  // receives gradients from the corresponding 'recv' method on another node.
  // The gradients are accumulated in the provided autograd context.
  c10::intrusive_ptr<c10::ivalue::Future> executeSendFunctionAsync(
      const ContextPtr& autogradContext,
      const std::shared_ptr<SendRpcBackward>& sendFunction,
      bool retainGraph);

  // Number of backward passes currently running for the Distributed Engine.
  size_t numBackwardPasses(a) const;

  // Returns key-value pairs consisting of useful debugging information related
  // to distributed autograd.
  std::unordered_map<std::string, int> getDebugInfo(a) const;

  // Validates the input roots for the backward computations and retrieves the
  // appropriate root edges and corresponding gradients. Populates root_edges
  // with the appropriate gradient edges and grads with the gradients for each
  // edge.
  void validateRootsAndRetrieveEdges(
      const torch::autograd::variable_list& roots,
      torch::autograd::edge_list& rootEdges,
      torch::autograd::variable_list& grads);

  // Given the autograd context, root edges and grads, we compute dependencies
  // for the local node and fill out the provided GraphTask and GraphRoot with
  // appropriate information for the local autograd engine.
  // We also determine all leaf nodes(functions) in the graph and accumulate
  // them in outputEdges.
  void computeDependencies(
      const ContextPtr& context,
      const torch::autograd::edge_list& rootEdges,
      const torch::autograd::variable_list& grads,
      const std::shared_ptr<torch::autograd::Node>& graphRoot,
      torch::autograd::edge_list& outputEdges,
      bool retainGraph);

  // Given a pre-populated GraphTask and a root node, compute the backward pass
  // for the autograd graph until the graph task ready queue is empty.
  //
  // This method assumes that the appropriate GraphTask has already been
  // initialized appropriately. It will construct a local ready queue to
  // traverse the GraphTask instead of using the GraphTask embedded
  // cpu_ready_queue, this is because dist engine might run the same GraphTask
  // from different SendFunctions concurrently in different threads. The method
  // will only mark the GraphTask as completed when it needes to, which means it
  // might not mark as completed for every call as dist engine would like to
  // keep the GraphTask alive when it not receives all gradients.
  //
  // When `incrementOutstandingTasks=false`, the function does not increment
  // 'outstanding_tasks_' in the appropriate GraphTask. It is assumed we've
  // already done this before hand for this task (to ensure we don't pre-mark
  // this graph_task as completed). This is useful in the distributed autograd
  // case where we need to increment 'outstanding_tasks_' first to indicate the
  // local autograd engine the graph task is not completed until it receives the
  // signals from other workers over the network.
  //
  // XXX: calling this function assumes that we will have NO GPU nodetasks be
  // executed for the graph_task, the caller of this function need to ensure
  // this otherwise there will be undefined behaviors. A correct way to fix this
  // is to re-design the autograd engine so that GPU worker thread to behave the
  // same as CPU caller thread, record the operation/thread for the device, and
  // reuse it in backward.
  // TODO: 1. Add assert in the dist engine to ensure no GPU NodeTasks during
  // backward
  // 2. properly setup the thread local ready queue to enable reentrant
  // backwards
  void execute_graph_task_until_ready_queue_empty(
      torch::autograd::NodeTask&& node_task,
      bool incrementOutstandingTasks = true);

  // Run the local autograd engine using the provided graphTask and graphRoot
  // and accumulate the gradients part 'outputEdges' in the provided autograd
  // context.
  c10::intrusive_ptr<c10::ivalue::Future> runEngineAndAccumulateGradients(
      const ContextPtr& autogradContext,
      const std::shared_ptr<torch::autograd::Node>& graphRoot,
      const torch::autograd::edge_list& outputEdges,
      bool incrementOutStandingTasks = true);

  // Run after the backward pass is done to appropriately cleanup structures.
  void cleanupBackwardPass(const ContextPtr& autogradContext);

  // Global thread to execute CPU continuations.
  void globalCpuThread(
      const std::shared_ptr<torch::autograd::ReadyQueue>& ready_queue);

  // Set of autograd context_ids, which we have already initialized for
  // distributed autograd on this node (e.g.: already computed dependencies)
  std::unordered_set<int64_t> initializedContextIds_;

  mutable std::mutex initializedContextIdsLock_;

  // Reference to local autograd engine.
  torch::autograd::Engine& engine_;

  // Ready queue used by the CPU thread in distributed engine.
  // See Note [GPU to CPU continuations]
  // Each GraphTask sets global_cpu_ready_queue_ to its own CPU_ready_queue_
  std::shared_ptr<torch::autograd::ReadyQueue> global_cpu_ready_queue_;

  // See Note [GPU to CPU continuations]
  std::thread global_cpu_thread_;

  friend class BackwardPassCleanupGuard;
};
Copy the code

2.2 the singleton

The engine uses singleton mode, so that only one singleton is running in each worker.

DistEngine& DistEngine::getInstance(a) {
  // Leaky singleton to avoid module destructor race.
  static DistEngine* engine = new DistEngine(a);return *engine;
}
Copy the code

2.3 Important Notes

There are plenty of detailed comments in the PyTorch source, so let’s take a look at some of them.

2.3.1 Member Variables

The code defines two global CPU related member variables, as follows, indicating that the need to see [GPU to CPU] comment.

  // Ready queue used by the CPU thread in distributed engine.
  // See Note [GPU to CPU continuations]
  std::shared_ptr<torch::autograd::ReadyQueue> global_cpu_ready_queue_;

  // See Note [GPU to CPU continuations]
  std::thread global_cpu_thread_;
Copy the code

2.3.2 build

The specific initialization locations of these two member variables are in the constructor function.

DistEngine::DistEngine()
    : initializedContextIds_(),
      engine_(Engine::get_default_engine()),
      global_cpu_ready_queue_(std::make_shared<ReadyQueue>()), // This is built
      global_cpu_thread_( // This is built
          &DistEngine::globalCpuThread,
          this,
          global_cpu_ready_queue_) {
  // Note [GPU to CPU continuations]
  / / ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~
  // Initialize a single CPU thread to execute continuations from GPU
  // tasks. The multithreaded structure for the distributed engine works
  // well only for CPU tasks. If we have an order of tasks like
  // CPU->GPU->CPU, distributed autograd has no thread to execute the last
  // CPU task on. To fix this, we introduce a global CPU thread to handle
  // such situations and it will be responsible for executing these CPU
  // tasks. The CPU thread has its own ready_queue which is used as the
  // cpu_ready_queue for all GraphTasks for DistEngine. This ensures all GPU
  // to CPU continuations are enqueued on this thread. The global CPU thread
  // simply dequeues tasks from the global queue and calls
  // "execute_graph_task_until_ready_queue_empty" on a JIT thread to execute the
  // appropriate task.
  global_cpu_thread_.detach(a);// Detach runs independently after detach
}
Copy the code

2.3.3 GPU to CPU continuations

The continuations of GPU to CPU are translated and understood below.

Continuations should have been in contact with schema language at first, and have been used in many languages since then. However, the concept of Continuations has not found a good concept, so the translation of “Continuations” is temporarily used.

In order to perform continuations of GPU tasks, a separate CPU thread needs to be initialized for processing. The multithreaded architecture of a distributed engine applies only to CPU tasks. If we have a CPU->GPU->CPU task order, distributed Autograd has no thread to execute the last CPU task. To solve this problem, we introduced a global CPU thread to handle this situation, which will be responsible for executing these CPU tasks.

CPU threads have their own ready queue (ready_queue), which acts as the CPU ready queue (CPU_ready_queue) for all graphTasks of DistEngine. This ensures that all GPU-to-CPU continuations are queued on this thread. The global CPU thread simply pulls the task out of the global queue and calls “execute_graph_task_until_readY_queue_EMPTY” on the JIT thread to execute the corresponding task.

If we have an order of tasks like CPU->GPU->CPU, distributed autograd has no thread to execute the last CPU task on. To fix this, we introduce a global CPU thread to handle such situations and it will be responsible for executing these CPU tasks. The  CPU thread has its own ready_queue which is used as the cpu_ready_queue for all GraphTasks for DistEngine. This ensures  all GPU to CPU continuations are enqueued on this thread. The global CPU thread simply dequeues tasks from the global queue and calls "execute_graph_task_until_ready_queue_empty" on a JIT thread to execute the appropriate task.Copy the code

2.3.4 destructor

Among the destructors are the following, which perform related operations on the two member variables for engine closure.

DistEngine::~DistEngine() {
  // Ensure we shutdown the CPU thread.
  TORCH_ASSERT_NO_GIL_WITHOUT_PYTHON_DEP(a); global_cpu_ready_queue_->pushShutdownTask(a); global_cpu_thread_.join(a); }Copy the code

2.3.5 Inserting a Queue

Where do I insert global_cpu_ready_queue_? In DistEngine: : computeDependencies will be inserted inside. First, each GraphTask sets global_cpu_ready_queue_ to CPU_ready_queue. The GraphTask constructor is called with global_cpu_ready_queue_ as the argument.

void DistEngine::computeDependencies(
    const ContextPtr& autogradContext,
    const edge_list& rootEdges,
    const variable_list& grads,
    const std::shared_ptr<Node>& graphRoot,
    edge_list& outputEdges,
    bool retainGraph) {

  // Build the graph task and graph root.
  // NOTE: we don't need to build and pass a cpu_ready_queue to GraphTask
  // as we use execute_graph_task_until_ready_queue_empty, which will build
  // a separate ReadyQueue for each call.
  auto graphTask = std::make_shared<GraphTask>(
      /* keep_graph */ retainGraph,
      /* create_graph */ false./* depth */ 0./* cpu_ready_queue */ global_cpu_ready_queue_,
      /* exit_on_error */ true);

  // Omit other graphTask initializations
  
  // Let autograd context take ownership of the GraphTask.
  // Set GraphTask in context
  autogradContext->setGraphTask(std::move(graphTask));
}
Copy the code

So, if GraphTask ends up returning a time when it needs CPU to run, use this.

2.3.6 Worker Threads

A globalCpuThread is a worker thread that ejects a NodeTask from the Ready queue and executes it.

void DistEngine::globalCpuThread(
    const std::shared_ptr<ReadyQueue>& ready_queue) {
  
  while (true) {
    NodeTask task = ready_queue->pop(a);if (task.isShutdownTask_) {
      // Need to shutdown this thread.
      C10_LOG_API_USAGE_ONCE("torch.autograd.thread_shutdown");
      break;
    }

    auto graphTask = task.base_.lock(a);if (graphTask == nullptr) {
      // GraphTask has expired, ignore and continue processing.
      continue;
    }

    // Launch the execution on a JIT thread.
    at::launch([this,
                graphTask,
                graphRoot = task.fn_,
                variables =
                    InputBuffer::variables(std::move(task.inputs_))]() mutable {
      InputBuffer inputs(variables.size());
      for (size_t i = 0; i < variables.size(a); i++) { inputs.add(i, std::move(variables[i]), c10::nullopt, c10::nullopt);
      }
      execute_graph_task_until_ready_queue_empty(
          /*node_task*/ NodeTask(graphTask, graphRoot, std::move(inputs)),
          /*incrementOutstandingTasks*/ false); }); }}Copy the code

0x03 General Execution

The overall execution is done in DistEngine::execute as follows:

  • Use contextId to get the forward context.
  • Using validateRootsAndRetrieveEdges for validation.
  • Construct a GraphRoot to drive backward propagation, which can be thought of as a virtual root.
  • Use computeDependencies to calculate dependencies.
  • Using runEngineAndAccumulateGradients back propagation calculation.
  • Using clearAndWaitForOutstandingRpcsAsync waiting for RPC.

As you can see, compared to a normal engine, distributed has a process for computing root edges and generating gradient information on edges. Because in normal forward propagation, these are already configured, but in distributed computing, forward propagation does not calculate these, so it needs to be calculated before back propagation.

void DistEngine::execute(
    int64_t contextId,
    const variable_list& roots,
    bool retainGraph) {
  // Retrieve the context for the given context_id. This will throw if the
  // context_id is invalid.
  auto autogradContext =
      DistAutogradContainer::getInstance().retrieveContext(contextId);

  // Perform initial pre-processing.
  edge_list rootEdges;
  variable_list grads;
  validateRootsAndRetrieveEdges(roots, rootEdges, grads); 

  // Construct a GraphRoot to drive backward propagation, thought of as a virtual root
  std::shared_ptr<Node> graphRoot =
      std::make_shared<GraphRoot>(rootEdges, grads);
  edge_list outputEdges;
  // Compute dependencies locally, starting from all roots and all 'send'
  // functions.
  {
    std::lock_guard<std::mutex> guard(initializedContextIdsLock_);
    // Context should not have been initialized already.
    TORCH_INTERNAL_ASSERT(
        initializedContextIds_.find(autogradContext->contextId()) ==
        initializedContextIds_.end());

    // Calculate dependencies
    computeDependencies(
        autogradContext, rootEdges, grads, graphRoot, outputEdges, retainGraph);

    // Mark the autograd context id as initialized.
    initializedContextIds_.insert(autogradContext->contextId());
  }

  BackwardPassCleanupGuard guard(autogradContext);

  // This needs to be blocking and as a result we wait for the future to
  // complete.
  runEngineAndAccumulateGradients(autogradContext, graphRoot, outputEdges)
      ->waitAndThrow(a);// Back propagation calculation

  // Wait for all of the outstanding rpcs to complete.
  autogradContext->clearAndWaitForOutstandingRpcsAsync() - >waitAndThrow(a); }Copy the code

0x04 Validate node and edge

Let’s look at how to do the validation.

ValidateRootsAndRetrieveEdges is used to verify the validity of the nodes and edges, specific logic is:

  • Verify the validity of the root node and get the root node edge.
  • See if the root node is empty.
  • Whether the root node needs to compute the gradient.
  • Does the root node have a gradient function?
  • Compute the edges of the gradient to generate the corresponding gradient.
  • Call validATE_outputs to validate the output.
void DistEngine::validateRootsAndRetrieveEdges(
    const variable_list& roots,
    edge_list& rootEdges,
    variable_list& grads) {
  TORCH_CHECK(! roots.empty(), "No tensors provided for gradient computation.");
  TORCH_INTERNAL_ASSERT(rootEdges.empty());
  TORCH_INTERNAL_ASSERT(grads.empty());

  // Verify roots are all scalar and require gradients.
  for (const auto& root : roots) {
    TORCH_CHECK(root.requires_grad(), "requires_grad not set on root");
    TORCH_CHECK(
        root.numel() = =1.The python numel() function returns the number of elements in the array
        root.name(),
        " is not a scalar, all roots need to be scalar");
    TORCH_CHECK(
        root.grad_fn(),
        root.name(),
        " does not have a valid gradient function.");

    // Compute the root edges and generate the appropriate gradients.
    rootEdges.push_back(torch::autograd::impl::gradient_edge(root));
    grads.push_back(at::ones_like(root, LEGACY_CONTIGUOUS_MEMORY_FORMAT));
  }

  // Validate rootEdges and grads.
  validate_outputs(
      rootEdges, grads, [](const std::string& msg) { return msg; });
}
Copy the code

4.1 gradient_edge

Gradient_edge is used for the rest of this article, using a gradient of Variable and the forward-propagated output to build a gradient Edge.

Edge gradient_edge(const Variable& self) {
  // If grad_fn is null (as is the case for a leaf node), we instead
  // interpret the gradient function to be a gradient accumulator, which will
  // accumulate its inputs into the grad property of the variable. These
  // nodes get suppressed in some situations, see "suppress gradient
  // accumulation" below. Note that only variables which have `requires_grad =
  // True` can have gradient accumulators.
    
  // self.grad_fn() triggers a call to get a Node instance
  if (const auto& gradient = self.grad_fn()) {
    return Edge(gradient, self.output_nr()); // self.output_nr() indicates that Edge is the NTH input to function. The NTH output in forward propagation is the NTH input in back propagation.
  } else {
    return Edge(grad_accumulator(self), 0); // 0 indicates that Edge is the first input to function}}Copy the code

4.2 validate_outputs

The definition in the torch / / autograd/engine. The CSRC CPP, native engine and distributed engine will be called. Validate_outputs contains a lot of validation code.

  • If the number of gradients is different from the number of edges, exit.
  • Traversing gradients, for each gradient:
    • Get the corresponding edge, and if the edge is invalid, go to the next gradient.
    • Use input_metadata to get input information.
    • If the gradient is not defined, go to the next gradient as well.
    • Exit if the gradient size is different from the input shape.
    • Make a series of judgments about gradient equipment, metadata equipment.

The specific code is as follows:

void validate_outputs(
    const edge_list& edges,
    variable_list& grads,
    const std::function<std::string(const std::string&)>& format_error) {
  if (grads.size() != edges.size()) {
    std::stringstream ss;
    ss << "invalid number of gradients - expected ";
    ss << edges.size() < <", but got " << grads.size(a);AT_ERROR(format_error(ss.str()));
  }
  for (size_t i = 0; i < grads.size(a); i++) {const auto& edge = edges[i];
    if(! edge.is_valid()) continue;

    const auto& metadata = edge.function->input_metadata(edge.input_nr);
    auto& grad = grads[i];
    if(! grad.defined()) {
      // FIXME: TestJit.test_ge_optimized fails this assertion.
      // std::stringstream ss;
      // ss << "undefined gradient at index " << i;
      // AT_ERROR(format_error(ss.str()));
      continue;
    }
    // Exit if the gradient size is different from the input shape
    if(! grad.sizes().equals(metadata.shape())) {
      if(! at::is_expandable_to(metadata.shape(), grad.sizes())) {
        std::stringstream ss;
        ss << "invalid gradient at index " << i << " - got ";
        ss << grad.sizes() < <" but expected shape compatible with ";
        ss << metadata.shape(a);AT_ERROR(format_error(ss.str()));
      }
      grad = at::sum_to(std::move(grad), metadata.shape());
    }

    bool input_is_complex = isComplexType(c10::typeMetaToScalarType(metadata.options().dtype()));
    bool grad_is_complex = isComplexType(grad.scalar_type());
    TORCH_CHECK(isFloatingType(grad.scalar_type()) || (input_is_complex == grad_is_complex));
    
    if (c10::typeMetaToScalarType(metadata.options().dtype()) != grad.scalar_type()) {
      grad = grad.to(c10::typeMetaToScalarType(metadata.options().dtype()));
    }
    if (grad.device() != metadata.device() &&
        grad.dim() = =0) {
      grad = grad.to(metadata.device());
    }
    if (!is_compatible_type(metadata.options(), grad.options())) {
       std::stringstream ss;
       ss << "invalid gradient at index " << i << " - expected type ";
       ss << metadata.options() < <" but got " << grad.options(a);AT_ERROR(format_error(ss.str()));
    }
    auto grad_device = grad.device(a);if(grad_device ! = metadata.device()) {
      std::stringstream ss;
      ss << "invalid gradient at index " << i << " - expected device ";
      ss << metadata.device() < <" but got " << grad_device;
      AT_ERROR(format_error(ss.str()));
    }
    // We should not build graph for Tensors that are not differentiable
    TORCH_INTERNAL_ASSERT(isDifferentiableType(grad.scalar_type())); }}Copy the code

4.3 VS Common Engine

Let’s compare the checksum with a normal engine.

The normal Engine calls only VALIDate_outputs.

auto Engine::execute(const edge_list& roots,
                     const variable_list& inputs,
                     bool keep_graph,
                     bool create_graph,
                     bool accumulate_grad,
                     const edge_list& outputs) -> variable_list {
    
  validate_outputs(roots, const_cast<variable_list&>(inputs), [](const std::string& msg) {
    return msg;
  });
  
  // Omit other subsequent code
Copy the code

Therefore, for the validation part, DistEngine can be summarized as:

  • Do check.
  • The edge corresponding to root is calculated and the corresponding gradient is generated according to roots.
  • Then validate the output with validate_outputs.

0x05 Compute dependencies

Let’s recall the FAST mode algorithm from the design document. The key assumption of the algorithm is that when we run back propagation, each send function has a dependency of 1. In other words, we assume that we will receive gradients through RPC from another node. The algorithm is as follows:

  1. We start with the worker with backpropagated roots (all roots must be local).
  2. Find all of the current Distributed Autograd ContextsendFunction.
  3. From the supplied root and all that we retrievedsendFunction, we evaluate the dependency locally.
  4. After the dependencies are calculated, start the local Autograd engine using the provided root.
  5. When the Autograd engine executes thisrecvFunction, therecvThe function sends the input gradient to the appropriate worker via RPC. eachrecvAll functions know the target worker ID because it is logged as part of the forward propagation. throughautograd_context_idandautograd_message_idrecvThe function is sent to the remote host.
  6. When the remote host receives this request, we useautograd_context_idandautograd_message_idTo find the right onesendFunction.
  7. If this is the first time the worker receives a pair givenautograd_context_id, which evaluates the dependency locally as described in Points 1-3 above.
  8. It will then be received at point 6sendMethod is inserted into the queue for execution on the worker’s local Autograd engine.
  9. And finally, we’re not at Tensor.gradInstead, the gradient is accumulated separately on each Distributed Autograd Context. Gradient is stored inDict[Tensor, Tensor],Dict[Tensor, Tensor]It’s basically a mapping from the Tensor to its associated gradient, and you can retrieve that mapping using the get_gradients() API.

This chapter corresponds to the first three algorithms, which is one of the biggest differences with ordinary engines.

5.1 Overall Process

Computing dependencies is divided into two parts, the first part is to do the preparatory work, the second part is to calculate the dependencies, and the third part is to figure out which functions need to be computed based on the dependencies.

We’ll start with the overall code and comments, which we’ll examine in detail later.

void DistEngine::computeDependencies(
    const ContextPtr& autogradContext,
    const edge_list& rootEdges,
    const variable_list& grads,
    const std::shared_ptr<Node>& graphRoot,
    edge_list& outputEdges,
    bool retainGraph) {
  TORCH_INTERNAL_ASSERT(graphRoot, "graphRoot is null!");

  // The first part, preparation
  // 1. Generate a GraphTask
  // Build the graph task and graph root.
  // NOTE: we don't need to build and pass a cpu_ready_queue to GraphTask
  // as we use execute_graph_task_until_ready_queue_empty, which will build
  // a separate ReadyQueue for each call.
  // There is no need to pass a CPU_ready_queue to GraphTask because we will use execute_graph_task_until_readY_queue_EMPTY later, where a separate ReadyQueue will be created for each call
  auto graphTask = std::make_shared<GraphTask>(
      /* keep_graph */ retainGraph,
      /* create_graph */ false./* depth */ 0./* cpu_ready_queue */ global_cpu_ready_queue_,
      /* exit_on_error */ true);

  // Run BFS to traverse the graph locally. The roots of the graph are
  // GraphRoot and all send functions for this autograd context.
  std::unordered_set<Node*> seen; // Record the nodes that have been accessed
  std::queue<Node*> queue; // A queue of type Node
  queue.push(static_cast<Node*>(graphRoot.get())); // Insert the Node corresponding to the root

  auto sendFunctions = autogradContext->sendFunctions(a);// To get the outgoing edge

  // 2. Get the list of outgoing edges
  // Add all the send functions to the queue as roots.
  // In normal mode, the root node already has the next edges when propagating back, but in distributed mode, the outgoing edges are in sendFunctions
  for (const auto& mapEntry : sendFunctions) { // sendFunctions are outgoing sides that were added to addSendFunction earlier
    // Increment 'outstanding_tasks_' for GraphTask for each send_function
    // since we want the local autograd engine to wait for all of them.
    graphTask->outstanding_tasks_++; // The output edge increases
    queue.push(mapEntry.second.get()); // Then queue is used and SendRpcBackward is inserted
  }

  // In the second part, the graph is traversed to calculate the dependencies. In this case, the queue contains root and several SendRpcBackward
  edge_list recvBackwardEdges;
  // Traverse the graph.
  auto& dependencies = graphTask->dependencies_; // Get dependencies
  while(! queue.empty()) { // Iterate over all the sending edges
    auto fn = queue.front(a);// Get the send edge
    queue.pop(a);for (const auto& edge : fn->next_edges()) { // Traverse the next_edges of a Node (root or SendRpcBackward)
      if (auto nextFn = edge.function.get()) { // Get an edge
        dependencies[nextFn] += 1; // The dependency of the corresponding node is increased by one
        const bool wasInserted = seen.insert(nextFn).second; // Whether it has been accessed
        if (wasInserted) { // If it is true, it has not been accessed before. Otherwise, it cannot be inserted
          // Seeing this function for the first time.
          queue.push(nextFn); // Insert to queue since it has not been accessed before

          if (nextFn->next_edges().empty()) { // If the edge has no output edge, it is a leaf node
            TORCH_INTERNAL_ASSERT(
                dynamic_cast<AccumulateGrad*>(nextFn) ||
                dynamic_cast<RecvRpcBackward*>(nextFn)); // There are two types of leaf nodes
            // We have found a leaf node which should be either AccumulateGrad
            // or RecvRpcBackward. Record the function
            // to ensure we don't execute it and instead accumulate the grads on
            // the autograd context. These functions would be passed in as the
            // 'outputs' parameter of the vanilla autograd engine.

            // We don't accumulate any grads in the context for RecvRpcBackward.
            // RecvRpcBackward is added as an output edge to indicate it is a
            // leaf node and this helps in properly computing dependencies for
            // the local autograd graph. Putting RecvRpcBackward in
            // 'outputEdges' means that this function needs to be executed
            // (inline with our assumption for FAST mode that all send/recv
            // functions are valid in the backward pass), and as a result all of
            // its ancestors need to be executed as well.
            if (dynamic_cast<RecvRpcBackward*>(nextFn)) {
              recvBackwardEdges.emplace_back(edge); // Special processing
            }
            outputEdges.emplace_back(edge); // Final output edge
          }
        }
      }
    }
  }
  
  // At this point, inside recvBackwardEdges is RecvRpcBackward and inside outputEdges is AccumulateGrad

  // Here is the third part, to find which functions need to be computed based on dependencies
  // Now lets compute which functions need to be executed. The algorithm is as
  // follows:
  // 1. Create a dummy GraphRoot which points to all 'send' functions for this
  // context and the original graphRoot. Run 'init_to_execute' with the
  // outputEdges and the dummy GraphRoot. This ensures we mark
  // appropriate functions as needed if they are reachable only from a
  // specific 'send' function locally and not necessarily from the provided
  // roots.
  // 2. For all edges in 'outputEdges' which point to 'RecvRpcBackward', mark
  // those functions as needed for execution. The reason for this is that
  // 'init_to_execute', will mark these as not needed. But 'RecvRpcBackward'
  // is unique in the sense that we use it as a leaf node in graph to compute
  // needed execution accurately, but unlike AccumulateGrad, we do need to
  // execute this function.
  if(! outputEdges.empty()) {
    // Compute 'needed execution' starting from all 'send' functions and the
    // original graphRoot.
    edge_list edges;
    // Create some dummy edges (input_nr not important for init_to_execute).
    for (const auto& mapEntry : sendFunctions) { / / traverse
      edges.emplace_back(mapEntry.second, 0); // Get the list of outgoing edges
    }

    // Add the original graphRoot as an edge.
    edges.emplace_back(graphRoot, 0); // root is added to the outbound edge list

    // Create a dummy GraphRoot and run init_to_execute with it.
    GraphRoot dummyRoot(edges, {}); // Create a virtual Root
    // If the outgoing edge is not empty, init_to_execute is called to initialize GraphTask
    graphTask->init_to_execute(dummyRoot, outputEdges, /*accumulate_grad=*/false./*min_topo_nr=*/0);
    // Exec_info_ data structure is STD ::unordered_map
      *,>
    for (auto& mapEntry : graphTask->exec_info_) {
      auto& execInfo = mapEntry.second;
      if(! execInfo.captures_) {// See if the tensor is on the tensor path of the gradient
        continue;// If not on the path, skip to the next tensor
      }
      auto fn = mapEntry.first; / / get the Node
      // There may be nodes other than 'AccumulateGrad', e.g. RecvRPCBackward,
      // to be captured.
      if (auto accumulateGradFn = dynamic_cast<AccumulateGrad*>(fn)) {
        // If it is a leaf node
        for (auto& capture : *execInfo.captures_) { // Traverses the nodes on the tensor path
          capture.hooks_.push_back(
              std::make_unique<DistAccumulateGradCaptureHook>( // Insert Hook into tensors
                  std::dynamic_pointer_cast<AccumulateGrad>(
                      accumulateGradFn->shared_from_this()), autogradContext)); }}}// Mark all 'RecvRPCBackward' as needing execution.
    // RecvRPCBackward needs to be executed
    for (const auto& recvBackwardEdge : recvBackwardEdges) {
      graphTask->exec_info_[recvBackwardEdge.function.get()].needed_ = true; }}// Let autograd context take ownership of the GraphTask.
  // Set in context
  autogradContext->setGraphTask(std::move(graphTask));
}
Copy the code

5.2 Part I Preparation

5.2.1 implementation

Since the local dependencies are computed here, the traversal needs to start with the root and the local SendRpcBackward. We need to do some preparatory work first:

  • We’ll generate a GraphTask first, but we don’t need to pass the GraphTask a CPU_ready_queue because we’ll use execute_graph_task_until_ready_queue_EMPTY later, There a separate ReadyQueue is created for each call.
  • Second, seen is used to record the nodes that have been visited.
  • Construct a queue of type Node and insert the root Node into the queue.
  • Then take the side Functions out of the context and put them in sendFunctions.
    • SendFunctions is the outgoing side that was added to addSendFunction earlier.
    • In normal state, when the root node propagates internally, it already has the Next edges, but in distributed mode, the outgoing edges are in sendFunctions.
  • Iterate over side sendFunctions to build the side list, for each item in sendFunctions:
    • GraphTask ->outstanding_tasks_++
    • Insert SendRpcBackward in sendFunctions in the queue.
    • Finally, inside the queue are the root and several SendRpcBackward.

5.2.2 related

Implementation, the use of part of the function or member variables, we select the focus of the introduction.

5.2.2.1 sendFunctions

SendFunctions is context-retrieved sendAutogradFunctions_, which is a STD ::unordered_map<int64_t, STD ::shared_ptr>.

std::unordered_map<int64_t, std::shared_ptr<SendRpcBackward>>
DistAutogradContext::sendFunctions(a)const {
  std::lock_guard<std::mutex> guard(lock_);
  return sendAutogradFunctions_;
}
Copy the code

SendFunctions are outgoing edges that were previously added in addSendFunction, and addSendRpcBackward calls addSendFunction.

5.2.2.2 outstanding_tasks_

Use graphTask-> outStanding_tasks_ ++ to increase the number of graphTask outsides.

GraphTask

Outstanding_tasks_ is a member variable of GraphTask.

  • Outstanding_tasks_ : Records the number of current tasks. If the number is 0, the task is finished. If this number is not zero, the GraphTask still needs to run.
vania engine

Outstanding_tasks_ is found in vania Engine.

Is the number of nodeTasks to be processed, which is used to determine whether the GrapTask needs to be executed. If the number is 0, the task is finished.

  • This value is 0 when GraphTask is created.
  • If a NodeTask is sent to the ReadyQueue, outstanding_tasks_ increases by 1.
  • After evaluate_function(task) is executed once by the worker thread, the value of outstanding_tasks is reduced by 1.
  • If this number is not zero, the GraphTask still needs to run.
bool GraphTask::completed(a) {
  return outstanding_tasks_.load() = =0 ||
      (exit_on_error_ && has_error_.load());
}
Copy the code

Outstanding_tasks_ increases by one when NodeTask is added.

dist engine

When calculating the dependency, traverse sendFunctions and add outstanding_tasks_ when there are several SendRpcBackward contexts. Each extra edge means an extra computing process.

std::unordered_map<int64_t, std::shared_ptr<SendRpcBackward>>
DistAutogradContext::sendFunctions(a)const {
  std::lock_guard<std::mutex> guard(lock_);
  return sendAutogradFunctions_;
}
Copy the code

When executed, void DistEngine:: execute_graph_task_until_ready_queue_EMPTY and Engine::thread_main reduce outstanding_tasks_.

5.3 Part 2 Calculates dependencies

The second part is to traverse the graph and calculate the dependencies.

5.3.1 implementation

This queue contains root and a number of SendRpcBackward, so the next step is to keep popping nodes out of the queue for calculation. The specific logic is:

  • Traverses all sending edges (constantly popping nodes from the queue), and for each Node traverses the next_edges of a Node (root Node or SendRpcBackward) :
    • If an edge can be obtained, then:
      • The corresponding node dependence increases by one.
      • If it has not been accessed before, it is inserted into the queue.
      • If the edge itself has no output edge, it indicates that it is a leaf node. There are two types of leaf nodes: AccumulateGrad or RecvRpcBackward.
        • For recvBackwardEdges. Emplace_back (edge) to do special treatment.
        • Insert into the final output edge outputEdges, and notice that RecvRpcBackward is also inserted here.

After this, the local variable recvBackwardEdges is RecvRpcBackward, outputEdges is AccumulateGrad and RecvRpcBackward.

5.3.2 Types of leaf nodes

There are two types of leaf nodes, so they need to be handled separately.

  • AccumulateGrad: Is an ordinary leaf node and is a local leaf node.
  • RecvRpcBackward: In the forward diagram, the RPC receiving node.

From the design document, there is the following correspondence:”

We found a leaf node, which should be An AccumulateGrad or RecvRpcBackward. We record the function to ensure that we do not execute it, but instead accumulate gradients in the context of Autograd. These functions are passed as “output” parameters to the Vanilla Autograd engine.

We do not accumulate any gradients in the context of RecvRpcBackward. RecvRpcBackward is added as an output edge to indicate that it is a leaf node, which helps calculate the dependency of the local Autograd graph correctly. Putting RecvRpcBackward in “outputEdges” means that this function needs to be executed (consistent with our assumption about fast patterns that all send/ RECV functions are valid in backward propagation), and therefore all its ancestor functions need to be executed as well.

For example, for Work 1, recv is the leaf node, a RecvRpcBackward that needs to pass gradients to Worker 0. For worker 0, t1 and T2 are both leaf nodes and are AccumulateGrad.

5.4 The third part obtains Functions

This section finds which functions need to be evaluated based on dependencies.

5.4.1 algorithm

Now let’s calculate which functions need to be executed. The algorithm is as follows:

    1. Create a virtual GraphRoot that points to this context and all the “send” functions of the original GraphRoot. Use outputEdges and virtual GraphRoot to run “init_to_execute”. This ensures that we mark the appropriate functions as needed: if they can only be accessed from the locally specific “send” function and do not need to be accessed from the supplied root.
    1. For all edges in “outputEdges” that point to “RecvRpcBackward”, mark these functions according to execution needs. The reason is that “init_to_execute” marks these as unwanted. But what is unique about ‘RecvRpcBackward’ is that we use it with the leaf nodes in the plot to calculate exactly the required execution operations, but unlike AccumageGrad, we do need to perform this function.

Specifically:

  • RecvRpcBackward needs to be executed.
  • AccumulateGrad requires a cumulative gradient.

5.4.2 implementation

At this point, inside recvBackwardEdges is RecvRpcBackward, inside outputEdges is AccumulateGrad and RecvRpcBackward. We need to use this information to identify how to proceed. The concrete implementation is:

  • If outputEdges are not empty, then insert the outputEdges information into graphTask.exec_info_ :

    • Construct an edge_list edges list.
    • Iterate over the sendFunctions to get the output list, which is added to the edges.
    • Root is also added to the outbound edge list.
    • Create a virtual Root.
    • If the outgoing edge is not empty, the GraphTask is initialized with a call to init_to_execute.
    • Exec_info traverses the GraphTask, the data structure for exec_info_ is STD ::unordered_map<Node*, ExecInfo>.
      • Let’s see if this tensor is on the tensor path of the gradient.
      • If it’s not on the path, skip to the next tensor.
      • Get the Node for exec_info_.
      • If Node is a leaf Node.
        • Traverse nodes on a tensor path.
        • Insert hooks into tensors. The key here is that the tensor corresponding to AccumulateGrad is attached with Hook, which is used for the subsequent accumulation gradient.
  • Walk through recvBackwardEdges and, for each recvBackward, set stop to “need to execute” above the corresponding item in graphTask.exec_info_.

At this point, the dependencies are processed and all the function information that needs to be computed is located above graphTask.exec_info_, which we’ll see how to do in the next article.

5.5 summary

Let’s summarize the logic for calculating dependencies:

  1. ComputeDependencies Starts counting dependencies.
  2. Get sendAutogradFunctions_ from DistAutogradContext and put SendRpcBackward into sendFunctions. In normal state, when the root node propagates internally, it already has the Next edges, but in distributed mode, the outgoing edges are in sendFunctions, so it needs to be extracted and added to the following queue.
  3. Iterate over sendFunctions and add Node to the queue, which in this case is root and some SendRpcBackward.
  4. The Queue is traversed, resulting in two local variables edge_list. There are RecvRpcBackward in recvBackwardEdges, and AccumulateGrad and RecvRpcBackward in outputEdges. We need to identify how to perform subsequent execution according to these information.
  5. Iterate over recvBackwardEdges and outputEdges to add relevant information toGraphTask.exec_info_At this point, dependencies are processed and all function information that needs to be computed is located above graphTask.exec_info_.
    1. AccumulateGrad is added to Hook for subsequent accumulation gradient.
    2. RecvRpcBackward is set to be executed.
                                        computeDependencies
                                                +
+---------------------------+                   | 1
| DistAutogradContext       |                   |
|                           |                   v
|                           |  2
|  sendAutogradFunctions_ +-------> map<int,SendRpcBackward> > sendFunctions
|                           |
+---------------------------+                   +
                                                |
                                                | 3
                                                v

                                        queue<Node*> queue

                                                +
                                                | 4
                                                |
                                                |
                                                v

             recvBackwardEdges = [RecvRpcBackward 1, RecvRpcBackward 2. ]  outputEdges = [RecvRpcBackward1, RecvRpcBackward 2,
                                    AccumulateGrad 1, AccumulateGrad 2. ]  + | |5
                                                v

                                       GraphTask.exec_info_


Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Distributed Autograd Design

Remote Reference Protocol

PyTorch source code interpretation of distributed training to understand?

Pytorch.org/docs/stable…

Pytorch.apachecn.org/docs/1.7/59…

Pytorch.org/docs/stable…

Pytorch.org/docs/master…

Pytorch.org/docs/master… Pytorch.org/docs/master…

www.w3cschool.cn/pytorch/pyt…

PyTorch distributed Autograd design

Getting started with Distributed RPC Framework

Implementing a Parameter Server using Distributed RPC Framework

Combining Distributed DataParallel with Distributed RPC Framework

Profiling RPC-based Workloads

Implementing batch RPC processing

Distributed Pipeline Parallel