0 x00 the

We have analyzed the forward propagation of Reduer above, and in this paper, we will look at how to carry out backward propagation.

Other articles in this series are as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

—– DistributedDataParallel – gradient forward propagation

0 x01 review

1.1 Review

We have given the logic of forward propagation above. After the end of forward propagation, we get the following:

  • The parameters needed to calculate the gradient have been pilled.
  • The barrel has been rebuilt.
  • Forward propagation is complete.
  • Backtracking from the specified output, the autograd calculation graph is traversed to find all unused parameters and marked as ready.

In this way, DDP can do gradient merging. It knows which parameters can be merged directly without the Operation of autograd (ready state) and which parameters can be merged together (buckets). The PyTorch Autograd engine does the reverse calculation. One side carries on the cross-process gradient statute.

1.2 Overall Logic

We also give an overall strategy for backward propagation as follows:

Backward Pass:

  • Backward () is directly called on loss, which is the work of Autograd and beyond DDP’s control. DDP adopts Hook to achieve the purpose.

    • The DDP registers an Autograd hooks when it is constructed.
    • The Autograd engine calculates gradients.
    • When a gradient is ready, its corresponding DDP hook on the gradient accumulator is fired.
  • Perform all-reduce in autograd_hook. If the parameter index is param_index, the parameter param_index is used to obtain the parameter and is marked as ready. If all gradients in a bucket are ready, the bucket is ready.

  • When all gradients in a bucket are ready, asynchronous AllReduce is started on the Reducer to calculate the average gradient value of all processes.

  • If all buckets are ready, wait for all All-reduce buckets to complete. When all buckets are ready, the Reducer blocks and waits for all AllReduce operations to complete. When this is done, write the average gradient to the fields of all parameters of param.grad.

  • The gradients of all processes will reduce, and after updating, everyone’s model weight will be the same. So after the backward propagation is complete, the Grad fields on the same parameter corresponding to different DDP processes should be equal.

  • You don’t need to broadcast parameters after each iteration like DP does. But Buffers still need to be broadcast from rank 0 to other processes at each iteration.

Now let’s look at backward propagation.

0x02 Starts from Hook

The figure below is from a paper by Kuaishou (see Reference 1 for further analysis of this paper project). The top half of the figure shows the native Autograd engine processing, while the bottom half shows the Horovod and Torch-DDP processing. As you can see, for the gradient merging is going to start in the backward propagation.

Specifically, in addition to bucket splitting, Reducer also registers autograd hooks during construction, one for each parameter. When the gradient is ready, these hooks are triggered during backpass for gradient specification. A bucket is ready if all gradients inside it are ready. When all gradients in a bucket are ready, asynchronous AllReduce is started on the Reducer to calculate the average gradient value of all processes. Therefore, we start from Hook, the entry point of back propagation.

2.1 How to register HOOK

Let’s first look at how to register hooks, which involve AutogradMeta and Node.

2.1.1 AutogradMeta

AutoGradMeta: Records the autograd history of Variable. The main member Variable is.

  • Grad_ : Stores the gradient of the current instance of Variable, which is itself a Variable.
  • Grad_fn: is a Node instance, not a leaf Node. PyTorch uses grad_fn() to determine whether a Variable is a leaf Variable.
  • grad_accumulator_: is also an instance of Node, available only on leaf nodes.
    • This is accessed via grad_accumulator() of Variable.
    • Grad_accumulator_ is the gradient accumulation function.
    • The corresponding gradient is stored in the grad_ variable.
  • Output_nr_ : is a number. Output_nr_ indicates the first output of Node, for example, 0 indicates the first output of Node.
  • Let’s summarize:
    • For non-leaf nodes, grad_fn computes gradients. Gradients do not accumulate on grad_, but are passed to the computations and propagated back to the next station. Grad_fn is a Node.
    • For the leaf node, PyTorch virtualizes a special computation operation that outputs the leaf node, and the virtual computation operation is also the leaf node’sgrad_accumulator_The gradient accumulates on top of grad_, so the gradient of the leaf nodeoutput_nr_It must be 0.grad_accumulator_Is also a Node and is AccumulateGrad.

Its definition is as follows:

struct TORCH_API AutogradMeta : public c10::AutogradMetaInterface {
  std::string name_;

  Variable grad_;
  std::shared_ptr<Node> grad_fn_;
  std::weak_ptr<Node> grad_accumulator_;

  // This field is used to store all the forward AD gradients
  // associated with this AutogradMeta (and the Tensor it corresponds to)
  std::shared_ptr<ForwardGrad> fw_grad_;

  std::vector<std::shared_ptr<FunctionPreHook>> hooks_;
  std::shared_ptr<hooks_list> cpp_hooks_list_;

  // Only meaningful on leaf variables (must be false otherwise)
  bool requires_grad_;
  // Only meaningful on non-leaf variables (must be false otherwise)
  bool retains_grad_;
  bool is_view_;

  // The "output number" of this variable; e.g., if this variable
  // was the second output of a function, then output_nr == 1.
  // We use this to make sure we can setup the backwards trace
  // correctly when this variable is passed to another function.
  uint32_t output_nr_;
  mutable std::mutex mutex_;
};
Copy the code

2.1.2 Node

In the diagram, a computation operation is represented by a Node. Different subclasses of Node implement different operations.

AutogradMeta grad_fn_ and grad_accumulator_ are nodes.

The main member variable targeted here is post_hooks_, which is the hook that will be executed after running the gradient calculation.

Add_post_hook adds a hook to post_hooks_.

struct TORCH_API Node : std::enable_shared_from_this<Node> {
  public:
  std::vector<std::unique_ptr<FunctionPreHook>> pre_hooks_;
  std::vector<std::unique_ptr<FunctionPostHook>> post_hooks_;  
  
  uintptr_t add_post_hook(std::unique_ptr<FunctionPostHook>&& post_hook) {
    post_hooks_.push_back(std::move(post_hook));
    // Use the raw pointer as the unique key to identify this hook. This key
    // can then be used in del_post_hook(key) to remove this hook.
    return reinterpret_cast<std::uintptr_t>(post_hooks_.back().get()); }}Copy the code

2.1.3 AccumulateGrad

AccumulateGrad is a derived class of Node.

2.2 Constructors

Let’s review the Reducer constructor, which will:

  • Each tensor is given grad_accumulator_ for its Variable::AutogradMeta, the gradient accumulator used to accumulate leaf Variable.
  • For each gradient accumulator, an autograd_hook is configured, which hangs above the Autograd graph and is responsible for gradient synchronization when BACKWARD.
  • GradAccToVariableMap_ is stored with grad_Accumulator & index. Thus, it will be convenient to find unused parameters in autograd graph traversal in the future.
  • These gradient accumulators are stored in grad_Accumulators_.

The specific code is as follows:

Reducer::Reducer(
    std::vector<std::vector<at::Tensor>> replicas, / / tensor
    std::vector<std::vector<size_t>> bucket_indices, / / barrels of information
    ......) {

    for (size_t replica_index = 0; replica_index < replica_count; / / traverse up
         replica_index++) {
      
      for (size_t variable_index = 0; variable_index < variable_count; // iterate over the tensor
           variable_index++) { 
        auto& variable = replicas_[replica_index][variable_index]; // Get the specific tensor
        const auto index = VariableIndex(replica_index, variable_index); // Each tensor has an index
				// Get grad_accumulator_ for Variable::AutogradMeta
        auto grad_accumulator = torch::autograd::impl::grad_accumulator(variable); 

        hooks_.emplace_back(
            // The accumulator adds a hook, which hangs above the Autograd graph and is responsible for gradient synchronization when backward.
            // after grad_accumulator is executed, autograd_hook will run
            grad_accumulator->add_post_hook(
                torch::make_unique<torch::autograd::utils::LambdaPostHook>(
                    [=](const torch::autograd::variable_list& outputs,
                        const torch::autograd::variable_list& ) {
#ifndef _WIN32
                      this->rpc_context_.set(
                          ThreadLocalDistAutogradContext::getContextPtr());
#endif
                      this->autograd_hook(index); Add the autograd_hook function from reducer
                      return outputs;
                    })),
            grad_accumulator);
          
        // gradAccToVariableMap_ accumulates grad_accumulator & index. Thus, it will be convenient to find unused parameters in autograd graph traversal in the future
        if (find_unused_parameters_) {
          gradAccToVariableMap_[grad_accumulator.get()] = index;
        }

        grad_accumulators_[replica_index][variable_index] =
            std::move(grad_accumulator); }}}}Copy the code

2.2.1 grad_accumulator

Grad_accumulator_ is an autograd_meta->grad_accumulator_. For a leaf node, grad_accumulator_ is an AccumulateGrad.

std::shared_ptr<Node> grad_accumulator(const Variable& self) {
  auto autograd_meta = get_autograd_meta(self); / / get autograd_meta
  if(! autograd_meta) {return nullptr;
  }
  if (autograd_meta->grad_fn_) {
    throw std::logic_error(
        "grad_accumulator() should be only called on leaf Variables");
  }
  if(! autograd_meta->requires_grad_) {return nullptr;
  }

  std::lock_guard<std::mutex> lock(autograd_meta->mutex_);

  / / get autograd_meta - > grad_accumulator_
  auto result = autograd_meta->grad_accumulator_.lock(a);if (result) 
    return result;

  c10::raw::intrusive_ptr::incref(self.unsafeGetTensorImpl());
  auto intrusive_from_this = c10::intrusive_ptr<at::TensorImpl>::reclaim(self.unsafeGetTensorImpl());
  result = std::make_shared<AccumulateGrad>(Variable(std::move(intrusive_from_this)));
  autograd_meta->grad_accumulator_ = result; / / get autograd_meta - > grad_accumulator_
  return result;
}
Copy the code

2.2.2 here

A tensor is variable1, and its corresponding VariableIndex is index1. AccumulateGrad will call post_hooks after apply has calculated the gradient.

+-----------------------------------------+ | Reducer | | | | | | +------------------------------------+ | +------------------+ +----------------+ | | grad_accumulators_ | | | variable1 | | AccumulateGrad | | | | | | | | | | | | | | | | | | | [replica_index][variable_index]+------> | autograd_meta_+---> | post_hooks | | | | | | | | + | | | | | |  | | | | | +------------------------------------+ | +------------------+ +----------------+ | | | | +-------------------------------+ | | | | gradAccToVariableMap_ | | v | | | | | | | | +-----------------------+ | | [variable1 : index1] | | | autograd_hook(index1)| | | | | +-----------------------+ | +-------------------------------+ | | | +-----------------------------------------+ +---------------------------------------+ index1 +--> |VariableIndex | | | |  replica_index of Variable1 | | | | variable_index of Variable1 | | | +---------------------------------------+Copy the code

2.3 Hook function

When the gradient is ready, the engine calls the Hook function, which is the autograd_hook method below, to determine whether the variable is ready according to the relevant conditions. The logic is as follows:

  • If it is a dynamic graph & finding the unused tensor or the first iteration of the static graph, set the corresponding position of the variable in local_USed_maps_ to 1.

    • Local_used_maps_ Records locally used CPU tensors.
    • The dynamic graph may be inconsistent from iteration to iteration, buckets and variables may be different each time, so local_USed_maps_ needs to be updated with each iteration.
    • Static diagrams are the same for each iteration, as long as they are set in the callback during the first iteration.
  • If this is the first iteration of the static graph, change the corresponding position in numGradHooksTriggeredMap_ to 1

  • If no unused variable is marked, the unused variable is iterated over, marked ready, and mark_variable_ready is called.

  • If is the static diagram & after the second iteration, then if diminishing numGradHooksTriggeredMapPerIteration_ corresponding to 0, after setting variables for ready, call mark_variable_ready.

  • Otherwise it’s a dynamic graph, and each time you set variable to ready, you call mark_variable_ready.

// The function `autograd_hook` is called after the gradient for a
// model parameter has been accumulated into its gradient tensor.
// This function is only to be called from the autograd thread.
void Reducer::autograd_hook(VariableIndex index) {
  std::lock_guard<std::mutex> lock(this->mutex_);

  // Carry over thread local state from main thread. This allows for
  // thread-local flags such as profiler enabled to be configure correctly.
  at::ThreadLocalStateGuard g(thread_local_state_);

  // Ignore if we don't expect to be called.
  // This may be the case if the user wants to accumulate gradients
  // for number of iterations before reducing them.
  if(! expect_autograd_hooks_) {return;
  }

// Note [Skip allreducing local_used_maps_dev]
/ / ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~
// If find_unused_parameters_ is set to false, there is no need to allreduce
// local_used_maps_dev_, because all parameters will be reduced anyway.
// Therefore, we can avoid allocating memory for local_used_maps and
// local_used_maps_dev_ if find_unused_parameters_ is false. 
        
  // See Note [Skip allreducing local_used_maps_dev]
  // Dynamic graph & Find the first iteration of unused tensors or static graphs
  if (dynamic_graph_find_unused() | |static_graph_first_iteration()) {
    // Since it gets here, this param has been used for this iteration. We want
    // to mark it in local_used_maps_. During no_sync session, the same var can
    // be set multiple times, which is OK as does not affect correctness. As
    // long as it is used once during no_sync session, it is marked as used.
    // In the session of no_sync, any parameter that has been used once is marked as used
    // local_used_maps_ Records locally used CPU tensors
    // The dynamic graph may be inconsistent from iteration to iteration, buckets and variables may be different each time, so local_USed_maps_ needs to be updated every iteration
    // Static graph is the same for each iteration, as long as it is set in the callback during the first iteration
    local_used_maps_[index.replica_index][index.variable_index] = 1;
  }

  if (static_graph_first_iteration()) { // The first iteration of the static graph
    numGradHooksTriggeredMap_[index] += 1;// Only the first iteration of the static graph increases by 1
    return;
  }

  // If `find_unused_parameters_` is true there may be model parameters that
  // went unused when computing the model output, they won't be part of the
  // autograd graph, and won't receive gradients. These parameters are
  // discovered in the `prepare_for_backward` function and their indexes stored
  // in the `unused_parameters_` vector.
  if(! has_marked_unused_parameters_) { has_marked_unused_parameters_ =true;
    for (const auto& unused_index : unused_parameters_) { // Iterate over unused variables
      mark_variable_ready(unused_index); // If not used, of course, it is marked as ready}}// If it is static graph, after 1st iteration, check a avariable
  // is ready for communication based on numGradHooksTriggeredMap_.
  if (static_graph_after_first_iteration()) {// After the second iteration it did
    // Why start with the second iteration? Because in the first iteration, when we entered here, the gradient was not ready (that is, it was not processed by Reducer, it was processed by Reducer only)
    / / static diagram, numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_;
    if (--numGradHooksTriggeredMapPerIteration_[index] == 0) {
      // Finally mark variable for which this function was originally called.
      mark_variable_ready(index); // Set variable to ready when 0 is changed from 1}}else {
    // Finally mark variable for which this function was originally called.
    mark_variable_ready(index);// The dynamic diagram should be set to variable ready every time}}Copy the code

0 x03 ready

If, during the backpropagation process, the variable is found ready in the hook of one of the arguments, mark_variable_ready(index) will be called, and we’ll see how to do that.

3.1 variable ready

3.1.1 Setting is ready

Mark_variable_ready marks a variable as ready, as follows.

  • If you need to rebuild the bucket, insert index into the list to rebuild.

    • A bucket reconstruction occurs when: 1) The bucket is rebuilt for the first time. 2) Static graphs are true or find unused parameters are false. 3) This reverse process requires running AllReduce.
    • Here we simply dump the tensor and its parameter indexes into the reconstruction parameters and reconstruction parameter indexes based on gradient arrival order, and then at the end of finalize_BACKWARD () the bucket will be rebuilt based on the reconstruction parameters and reconstruction parameter indexes, and then broadcast and initialize the bucket. Furthermore, we need to dump only one copy of the tensor and parameter index.
  • Find the copy index of this variable and find where the variable is located in the copy.

  • Note that this variable is used, and insert it into perIterationReadyParams_.

  • Whenever a variable is marked as ready, Finalize is set to be called.

  • Check that all gradients in the bucket are ready. If there is any pending, the bucket is ready

  • The number of pending copies of this model is reduced by 1 because another tensor is ready.

  • If the number of pending tasks for this replica is 0, the number of pending tasks for this bucket is decreased by 1.

    • Because if the pending of this model copy is 0, it means that the number of model copies pending corresponding to buckets should be reduced by one.
    • If the bucket’s pending is 0, then mark_bucket_ready is used to set the bucket ready.
  • If all buckets are ready, then:

    • Call all_reduce_local_used_map.

    • Call Engine::get_default_engine().queue_callback to register a callback, which will be called after the Engine has completed all backward, then the used variable will be specified, Finalize_backward is called inside.

void Reducer::mark_variable_ready(VariableIndex index) {
  // Rebuild bucket only if 1) it is the first time to rebuild bucket 2)
  // static_graph_ is true or find_unused_parameters_ is false,
  // 3) this backward pass needs to run allreduce.
  // Here, we just dump tensors and their parameter indices into
  // rebuilt_params_ and rebuilt_param_indices_ based on gradient arriving
  // order, and then at the end of finalize_backward(), buckets will be
  // rebuilt based on rebuilt_params_ and rebuilt_param_indices_, and then
  // will be broadcasted and initialized. Also we only need to dump tensors
  // and parameter indices of one replica.
 
  if (should_rebuild_buckets()) {
    push_rebuilt_params(index); // If you need to rebuild, insert index into the list to rebuild
  }

  const auto replica_index = index.replica_index; // Find the copy index
  const auto variable_index = index.variable_index; // Find where in the copy

  if (replica_index == 0) {
    checkAndRaiseMarkedTwiceError(variable_index);
    perIterationReadyParams_.insert(variable_index); // This variable is used, record it
  }
  backward_stats_[replica_index][variable_index] =
      current_time_in_nanos() - cpu_timer_.backward_compute_start_time;

  // Any time we mark a variable ready (be it in line due to unused parameters,
  // or via an autograd hook), we require a call to the finalize function. If
  // this doesn't happen before the next iteration (or call to
  // `prepare_for_backwards`), we know something is wrong.
  require_finalize_ = true;  // Call Finalize whenever a variable is marked as ready

  const auto& bucket_index = variable_locators_[variable_index]; // Find the index of variable
  auto& bucket = buckets_[bucket_index.bucket_index]; // Find the bucket where variable is located
  auto& replica = bucket.replicas[replica_index]; // Find the copy


  set_divide_factor(a);if (bucket.expect_sparse_gradient) {
    mark_variable_ready_sparse(index); // sparse variable
  } else {
    mark_variable_ready_dense(index); // dense variable
  }

  // TODO(@pietern): Make this work for both CPU/CUDA tensors.
  // When using CPU tensors we don't need to do this.
  // // Record event so that we can wait for all of them.
  // auto& event = replica.events[bucket_index.intra_bucket_index];
  // event.record();

  // Check if this was the final gradient for this bucket.
  // Check whether all gradients in the bucket are ready. If there is any pending, the bucket is ready
  if (--replica.pending == 0) { // Subtract the pending number of copies of the model because another tensor is ready
    // Kick off reduction if all replicas for this bucket are ready.
    if (--bucket.pending == 0) {// If the pending of this model copy is 0, the number of pending buckets should be reduced by one
      mark_bucket_ready(bucket_index.bucket_index); // Then set the bucket ready}}// Run finalizer function and kick off reduction for local_used_maps once the
  // final bucket was marked ready.
  if (next_bucket_ == buckets_.size()) { // If all buckets are ready

    if (dynamic_graph_find_unused()) {
      all_reduce_local_used_map(a);// Specify the used variable
    }

    // The autograd engine uses the default stream when running callbacks, so we
    // pass in the current CUDA stream in case it is not the default.
    const c10::Stream currentStream = get_current_stream(a);// Here finalize_BACKWARD is registered to the engine
    torch::autograd::Engine::get_default_engine().queue_callback([=] {
      
      std::lock_guard<std::mutex> lock(this->mutex_);
      // Run callback with the current stream
      c10::OptionalStreamGuard currentStreamGuard{currentStream};
      if (should_collect_runtime_stats()) {
        record_backward_compute_end_time(a); }// Check that all buckets were completed and had their work kicked off.
      TORCH_INTERNAL_ASSERT(next_bucket_ == buckets_.size());
      this->finalize_backward(); 
    });
  }
}
Copy the code

The logic is as follows:

  1. Reduer would register autograd_hook on top of post_hooks for AccumulateGrad.
  2. The Autograd Engine calls the autograd_hook if it finds a parameter ready during backpropagation.
  3. Continue processing in autograd_hook.
  4. A finalize_BACKWARD is registered to the engine.
Engine        AccumulateGrad                Reducer

  +                  +                         +
  |                  |                         |
  |                  |           1             |
  |                  | <-----------------------v
  |                  |
  |                  |
  |                  |
  |                  v           2
  |             post_hooks  +-------->  autograd_hook
  |                                            +
  |                                            |
  |                                            | 3
  |                                            v
  |                         +------------------+---------------------------+
  |                         |    mark_variable_ready                       |
  |                         |                                              |
  |                         |                                              |
  |                         |     All variable in replica are ready?       |
  |                         |                   +                          |
  |                         |                   | YES                      |
  |                         |                   v                          |
  |                         |     All replica in bucket are ready?         |
  |                         |                   +                          |
  |                         |                   | YES                      |
  |                         |                   v                          |
  |                         |            mark_bucket_ready                 |
  |                         |                                              |
  |                         |                                              |
  |                         |                                              |
  |                         |                   +                          |
  |                         |                   |                          |
  |                         |                   |                          |
  |                         |                   v                          |
  |                         |          All buckets are ready?              |
  |                         |                   +                          |
  |                         |                   | YES                      |
  |                         |                   v                          |
  |   queue_back   4        |          all_reduce_local_used_map           |
  | <----------------------------+  queue_callback(finalize_backward)      |
  |                         |                                              |
  |                         |                                              |
  v                         +----------------------------------------------+
Copy the code

3.1.2 the registered callback

The code above, use the torch: : autograd: : Engine: : get_default_engine () queue_callback to register a callback function. So let’s analyze it.

This is defined in engine as a callback to final_callbacks_ :

void Engine::queue_callback(std::function<void()> callback) {
  std::lock_guard<std::mutex> lock(current_graph_task->final_callbacks_lock_);
  current_graph_task->final_callbacks_.emplace_back(std::move(callback));
}
Copy the code

For final_callbacks_ processing, in exec_post_processing, the callback is called when the engine has completed all backward.

void GraphTask::exec_post_processing(a) {
  if(! not_ready_.empty()) {
    throw std::runtime_error("could not compute gradients for some functions");
  }

  // set the thread_local current_graph_task_ as more callbacks can be installed
  // by existing final callbacks.
  GraphTaskGuard guard(shared_from_this());
  // Lock mutex during each iteration for accessing final_callbacks.size()
  // Unlocking is necessary, because the callback can register
  // more callbacks (or they can be registered from other threads
  // while it's waiting.
  std::unique_lock<std::mutex> cb_lock(final_callbacks_lock_);
  // WARNING: Don't use a range-for loop here because more callbacks may be
  // added in between callback calls, so iterators may become invalidated.
  // NOLINTNEXTLINE(modernize-loop-convert)
  for (size_t i = 0; i < final_callbacks_.size(a); ++i) { cb_lock.unlock(a); final_callbacks_[i]();// Callback is called
    cb_lock.lock(a); }// Syncs leaf streams with default streams (if necessary)
  // See note "Streaming backwards"
  for (const auto& leaf_stream : leaf_streams) {
    const auto guard = c10::impl::VirtualGuardImpl{c10::DeviceType::CUDA};
    const auto default_stream = guard.getDefaultStream(leaf_stream.device());
    if(leaf_stream ! = default_stream) {auto event = c10::Event{c10::DeviceType::CUDA};
      event.record(leaf_stream);
      default_stream.wait(event); }}}Copy the code

So the logical extension is as follows:

  1. Reduer would register autograd_hook on top of post_hooks for AccumulateGrad.
  2. The Autograd Engine calls the autograd_hook if it finds a parameter ready during backpropagation.
  3. Continue processing in autograd_hook.
  4. A finalize_BACKWARD is registered to the engine.
  5. Finalize_backward is called within GraphTask::exec_post_processing.
          Engine        AccumulateGrad                Reducer

            +                  +                         +
            |                  |                         |
            |                  |           1             |
            |                  | <-----------------------+
            |                  |
            |                  |
            |                  |
            |                  v
            |                              2
            |             post_hooks  +-------->  autograd_hook
            |                                            +
            |                                            |
            |                                            |  3
            |                                            v
            |                         +------------------+---------------------------+
            |                         | mark_variable_ready                          |
            |                         |                                              |
            |                         |                                              |
            |                         |     All variable in replica are ready?       |
            |                         |                   +                          |
            |                         |                   | YES                      |
            |                         |                   v                          |
            |                         |     All replica in bucket are ready?         |
            |                         |                   +                          |
            |                         |                   | YES                      |
            |                         |                   v                          |
            |                         |            mark_bucket_ready                 |
            |                         |                                              |
            |                         |                                              |
            |                         |                                              |
            |                         |                   +                          |
            |                         |                   |                          |
            |                         |                   |                          |
            |                         |                   v                          |
            |                         |          All buckets are ready?              |
            |                         |                   +                          |
            |                         |                   | YES                      |
            |                         |                   v                          |
            |   queue_back    4       |          all_reduce_local_used_map           |
            | <----------------------------+  queue_callback(finalize_backward)      |
            |                         |                                              |
            |                         |                                              |
            |                         +-------------------+--------------------------+
            v                                             |
                                                          |
GraphTask::exec_post_processing                           |
            +                                             |
            |                                             |
            |                 5                           v
            +--------------------------------->   finalize_backward
            |                                             +
            |                                             |
            |                                             |
            v                                             v
Copy the code

3.1.3 mark_variable_ready_sparse

The mark_variable_ready_SPARSE function is used to handle SPARSE variables, which copy gradients to the Reducer.

void Reducer::mark_variable_ready_sparse(VariableIndex index) {
  const auto replica_index = index.replica_index;
  const auto variable_index = index.variable_index;
  const auto& bucket_index = variable_locators_[variable_index];
  auto& bucket = buckets_[bucket_index.bucket_index]; / / which barrels
  auto& replica = bucket.replicas[replica_index]; // Which copy of the bucket
  auto& variable = replica.variables[bucket_index.intra_bucket_index]; // Which variable is in the copy

  runGradCallbackForVariable(variable, [&](auto& grad) {
    TORCH_CHECK(grad.defined(), "Expected sparse gradient to be defined.");
    TORCH_CHECK(
        grad.options().layout() == c10::kSparse,
        "Expected variable to have sparse gradient.");

    // Sparse tensors cannot be grouped together with other sparse tensors
    // in a single reduction operation like we can for dense tensors.
    // Therefore, the `offsets` and `lengths` vectors in the bucket replica
    // struct are empty, and there is no pre-existing accumulation tensor.
    // Directly assign the sparse tensor to the `contents` field.
    replica.contents = grad; // Direct copy
    // See Note [DDP Communication Hook]
    if (comm_hook_ == nullptr) {
      replica.contents.div_(divFactor_);
    }
    // The grad is modified in place and needs to be written back.
    return true;
  });
}
Copy the code

3.1.4 mark_variable_ready_dense

Mark_variable_ready_dense will deal with dense tensors, which is actually copy gradient to Reducer.

Let’s start with a member variable: gradient_AS_bucket_view_, which:

  • If false, you need to copy the bucket back to grads after allreduce bucket.

  • When set to True, gradients will be views pointing to different offsets of “AllReduce”. This reduces peak memory usage, where the memory size saved will be equal to the total gradient size. In addition, it avoids the overhead of replication between gradients and “AllReduce” buckets. Detach_ () cannot be called on a gradient when it is a view.

The mark_variable_ready_dense logic is:

  • Find which bucket and copy this variable belongs to according to index, and then get the tensor variable in the copy, and then get the offset and size of variable. You end up with the bucket_view of the tensor.
  • We used the runGradCallbackForVariable tensor for processing. RunGradCallbackForVariable actually is to use callback DistAutogradContext processing, finally back to the DistAutogradContext.
  • The internal execution logic of callback is:
    • When gradient_AS_bucket_view_ is false, or even ifgradient_as_bucket_view_When true, in rare cases, the user can set Grad to None after each iteration.
    • In these cases, grad and bucket_VIEW point to different stores, so grad needs to be copied to bucket_VIEW.
    • If gradient_AS_bucket_view_ is set to true, make grad point to bucket_view.
    • If grad was already set to bucket_VIEW in a previous iteration, no replication is required.
void Reducer::mark_variable_ready_dense(VariableIndex index) {
  const auto replica_index = index.replica_index;
  const auto variable_index = index.variable_index;
  const auto& bucket_index = variable_locators_[variable_index];
  auto& bucket = buckets_[bucket_index.bucket_index]; / / which barrels
  auto& replica = bucket.replicas[replica_index]; // Which copy of the bucket
  auto& variable = replica.variables[bucket_index.intra_bucket_index]; // Get the copy of variable
  const auto offset = replica.offsets[bucket_index.intra_bucket_index]; / / variable offset
  const auto length = replica.lengths[bucket_index.intra_bucket_index]; / / variable size
  auto& bucket_view = replica.bucket_views_in[bucket_index.intra_bucket_index]; / / insert the view

  // Copy contents of gradient tensor to bucket tensor.
  // If the gradient is not set, we assume it wasn't computed
  // as part of the current backwards pass, and zero the part
  // of the bucket it would otherwise hold.
  runGradCallbackForVariable(variable, [&](auto& grad) {
    // Get the gradient of the tensor
    if (grad.defined()) {
      this->check_grad_layout(grad, bucket_view);
      // When gradient_as_bucket_view_ is false, or even when
      // gradient_as_bucket_view_ is true, in rare cases users may set grad to
      // be None after every iteration. In these cases, grad and bucket_view are
      // pointing to different storages and thus need to copy grads to
      // bucket_view. If gradient_as_bucket_view_ is set as true, let grad point
      // to bucket_view. If grad has already been set as views of buckets in
      // previous iterations, no copy is needed.
      if(! grad.is_alias_of(bucket_view)) {
        this->copy_grad_to_bucket(grad, bucket_view); // Copy gradient into contents
        if (gradient_as_bucket_view_) {
          // Let grad point to bucket_view buffer.
          grad = bucket_view; // For savings, grad points to bucket_view
          // The grad is modified and need to be written back.
          return true; }}else {
        // If grad and bucket view point to the same storage, no need to copy
        if (comm_hook_ == nullptr) {
          bucket_view.div_(divFactor_); }}}else {
      bucket_view.zero_(a);// Set it to 0
    }
    // The grad is not modified and doesn't need to be written back.
    return false;
  });
}
Copy the code

All copy_grad_to_bucket does is copy the gradient into contents

void Reducer::copy_grad_to_bucket(
    const at::Tensor& grad,
    at::Tensor& bucket_view) {
  // See Note [DDP Communication Hook]
  if (comm_hook_ == nullptr) {
    auto wrapped = at::native::wrapped_scalar_tensor(double(1.) / divFactor_);
    // Divides while copying into the bucket view.
    at::mul_out(bucket_view, grad, wrapped);
  } else {
    bucket_view.copy_(grad); // Copy gradients to bucket contents via bucket_view}}Copy the code

3.2 barrels ready

Check that all gradients in the bucket are ready. If there is any pending, the bucket is ready. Call mark_bucket_ready.

Buckets are traversed in mark_bucket_ready, specifying ready buckets.

// Called when the bucket at the specified index is ready to be reduced.
void Reducer::mark_bucket_ready(size_t bucket_index) {
  TORCH_INTERNAL_ASSERT(bucket_index >= next_bucket_);

  // Buckets are reduced in sequence. Ignore this bucket if
  // it's not its turn to be reduced.
  if (bucket_index > next_bucket_) {
    return;
  }

  // Keep going, until we either:
  // - have kicked off reduction for all buckets, or
  // - found a bucket that's not yet ready for reduction.
  //   
    
  // Iterate through the bucket until one of the following conditions occurs:
	// - Specifications for all buckets have been initiated
	// - A bucket is not ready
  for (; next_bucket_ < buckets_.size() && buckets_[next_bucket_].pending == 0;
       next_bucket_++) {
    num_buckets_ready_++; / / add
    if (num_buckets_ready_ == 1 && should_collect_runtime_stats()) {
      record_backward_comm_start_time(a); }auto& bucket = buckets_[next_bucket_];
    all_reduce_bucket(bucket); // For ready buckets, perform specification}}Copy the code

3.2.1 all_reduce_bucket

All_reduce_bucket is the synchronization of contents.

  • Traverse the copy of the barrel and insert the copy tensor into the Tensors.
  • If comm_hook is not registered, directly allreduce these tensors.
  • Once you have registered Comm_hook, use hook for AllReduce. It should be noted that comm_hook is only the underlying hook for communication. To perform gradient clipping before Reduce, hook must be hooked in autograph.
void Reducer::all_reduce_bucket(Bucket& bucket) {
  std::vector<at::Tensor> tensors;
  tensors.reserve(bucket.replicas.size());
  for (const auto& replica : bucket.replicas) {
    // TODO(@pietern): Ensure proper synchronization with the CUDA events
    // that recorded copies into this contents tensor. If these copies are
    // executed on non-default streams, the current stream for the device
    // that holds the contents tensor must wait on these events.
    //
    // As long as autograd uses the default stream for every device,
    // these operations are implicitly sequenced, and we don't need to
    // do any extra synchronization here.
    //
    // CUDA Default Stream is scheduled and sorted
    tensors.push_back(replica.contents);
  }
  // See Note [DDP Communication Hook]
  if (comm_hook_ == nullptr) {
    // If comm_hook is not registered, direct allreduce
    bucket.work = process_group_->allreduce(tensors);
  } else {
    // Register comm_hook then use hook to do allreduce
    // It should be noted that the comm_hook is only the bottom hook for communication. To perform gradient clipping before reduce, the hook needs to be inside the autograph hook
      
    GradBucket grad_bucket(
        next_bucket_,
        tensors[0].// There is only one replica in a bucket
        // Since currently we do not support single-process multiple-device
        // mode, we can assume only one replica in the bucket.
        bucket.replicas[0].offsets,
        bucket.replicas[0].lengths,
        bucket.replicas[0].sizes_vec);
    bucket.future_work = comm_hook_->runHook(grad_bucket); }}Copy the code

The logical extension is as follows:

  1. Reduer would register autograd_hook on top of post_hooks for AccumulateGrad.
  2. The Autograd Engine calls the autograd_hook if it finds a parameter ready during backpropagation.
  3. Continue processing in autograd_hook.
  4. Call all_reduce_bucket to synchronize the gradient.
  5. A finalize_BACKWARD is registered to the engine.
  6. Finalize_backward is called within GraphTask::exec_post_processing.
                                                                             +
                                                                  Worker 1   |   Worker 2
                                                                             |
  Engine    AccumulateGrad                Reducer                            |    Reducer
                                                                             |
    +              +                         +                               |        +
    |              |                         |                               |        |
    |              |          1              |                               |        |
    |              | <-----------------------+                               |        |
    |              |                                                         |        |
    |              |                                                         |        |
    |              v                                                         |        |
    |                         2                                              |        |
    |         post_hooks  +-------->  autograd_hook                          |        |
    |                                        +                               |        |
    |                                        |                               |        |
    |                                        |  3| | | v | | | +------------------+---------------------------+ | | | | mark_variable_ready | | | | | | | | | | | | | | |  All variablein replica are ready?       |   |        |
    |                     |                   +                          |   |        |
    |                     |                   | YES                      |   |        |
    |                     |                   v                          |   |        |
    |                     |     All replica in bucket are ready?         |   |        |
    |                     |                   +                          +   +        |
    |                     |                   | YES                                   |
    |                     |                   v               4all_reduce_bucket | | | mark_bucket_ready <--------------+---+-----> | | | | | | | | | | | | | | | | | | + | | | | | | |  | | | | | | | | | | v | | | | | All buckets are ready? | | | | | + | | | | | | YES | | | | | v | | | | queue_back5| all_reduce_local_used_map | | | | <------------------------+ queue_callback(finalize_backward) | | | | | | | | | | | |  | | +-------------------+--------------------------+ | | v | | | | | | GraphTask::exec_post_processing | | | + | | | | | | | | v | | +-----------------------------> finalize_backward | | |6                       +                              |        |
    |                                         |                              |        |
    |                                         |                              |        |
    v                                         v                              +        v
Copy the code

3.2.2 PythonCommHook

PythonCommHook is used to implement specific user requirements, as we mentioned earlier, and here are two more examples.

PythonCommHook example

c10::intrusive_ptr<c10::ivalue::Future> PythonCommHook::runHook( GradBucket& bucket) {
  py::gil_scoped_acquire acquire;

  py::object py_fut = hook_(state_, bucket);

  try {
    return py_fut.cast<std::shared_ptr<torch::jit::PythonFutureWrapper>>()->fut;
  } catch (const py::cast_error& e) {
    auto type = py_fut.get_type(a);auto errMsg = c10::str(
        e.what(),
        ". DDP communication hook's callback must return a "
        "torch.futures.Future or torch._C.Future object, but got ",
        type.attr("__module__").cast<std::string>(),
        ".",
        type.attr("__qualname__").cast<std::string>());
    throw std::runtime_error(errMsg); }}Copy the code

or

c10::intrusive_ptr<c10::ivalue::Future> AllReduceCommHook::runHook( GradBucket& bucket) {
  std::vector<at::Tensor> tensors = {bucket.getTensorRef(a)};auto allreduce_work = state_->allreduce(tensors);

  // FIXME Access the result through the Future passed as argument, instead of
  // capturing the Work.
  auto div_by_process_group_size = [allreduce_work,
                                    this](c10::ivalue::Future& /* unused */) {
    auto tensor = allreduce_work->result(to)0] / state_->getSize(a);return c10::IValue(tensor);
  };

  auto fut = allreduce_work->getFuture(a);return fut->then(div_by_process_group_size, fut->elementType());
}
Copy the code

3.2.3 GradBucket

GradBucket is a class that copies information.

// This class passes bucket contents tensor to DDP communication hook.
class GradBucket {
 public:
  explicit GradBucket(
      size_t index,
      const at::Tensor& tensor,
      const std::vector<size_t>& offsets,
      const std::vector<size_t>& lengths,
      const std::vector<c10::IntArrayRef>& sizes_vec)
      : index_(index),
        tensor_(tensor),
        offsets_(offsets),
        lengths_(lengths),
        sizes_vec_(sizes_vec) {}

  // Returns the index of the bucket, which is unique across all the buckets.
  size_t getIndex(a) const {
    return index_;
  }

  const at::Tensor& getTensor(a) const {
    return tensor_;
  }

  // Returns a mutable tensor compared with the above method.
  at::Tensor& getTensorRef(a) {
    return tensor_;
  }

  // Overwrites tensors at a specific index.
  void setTensor(at::Tensor& tensor) {
    tensor_ = tensor;
  }

  // Each tensor in the list that getPerParameterTensors corresponds to a
  // parameter.
  std::vector<at::Tensor> getPerParameterTensors(a) const;

  // Returns whther this bucket is the last bucket to allreduce in an iteration.
  bool isTheLastBucketToAllreduce(a) const {
    return index_ == 0;
  }

 private:
  size_t index_;
  at::Tensor tensor_;

  // Per-variable info in tensors_[0].
  std::vector<size_t> offsets_;
  std::vector<size_t> lengths_;
  std::vector<c10::IntArrayRef> sizes_vec_;
};
Copy the code

3.3 all_reduce_local_used_map

Note that this is the specification of the local_used_maps_ variable for tensor usage, not the gradient of the tensor.

3.3.1 definition

Let’s recall the definition.

The following two variables are used to record locally used parameters, indicating whether these parameters were used locally in the current iteration or during the no_sync session without synchronization enabled (no_sync is on).

Each model copy corresponds to a tensor in the map, and each tensor is a one-dimensional INT32 (One-dim INT32) tensor of the number of parameters.

These tensors are marked in autograd_hook to indicate that the appropriate parameters have been used. These tensors are allreduce at the end of the back propagation of the current iteration or an unsynchronized session (no_sync session) to calculate globally unused parameters.

// Locally used parameter maps indicating if parameters are used locally
// during the current iteration or no_sync session if no_sync is on. One
// tensor for each model replica and each tensor is one-dim int32 tensor of
// number of parameters. These tensors are marked in autograd_hook to indicate
// the corresponding param has been used, and get allreduced in the end of
// backward of current iteration or no_sync session for figuring out the
// globally unused parameters.
//
// local_used_maps_: CPU tensors for bookkeeping locally used params
// local_used_maps_dev_: dev tensors for reducing globally unused params
std::vector<at::Tensor> local_used_maps_; // autograd_hook will be set, corresponding to the thesis
std::vector<at::Tensor> local_used_maps_dev_; // GPU
Copy the code

3.3.2 rainfall distribution on 10-12 synchronization

All_reduce_local_used_map Asynchronous H2D is used here to avoid blocking overhead. Copy local_USed_maps_ to local_USED_maps_dev_ and then specify local_USED_maps_dev_.

void Reducer::all_reduce_local_used_map(a) {
  // See Note [Skip allreducing local_used_maps_dev]
    // H2D from local_used_maps_ to local_used_maps_dev_
    for (size_t i = 0; i < local_used_maps_.size(a); i++) {if (local_used_maps_dev_[i].is_cuda()) {
        // Note [local_used_maps_ -> local_used_maps_dev copying]
        / / ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~ ~
        // We do async H2D to avoid the blocking overhead. The async copy and
        // allreduce respect the current stream, so will be sequenced
        // correctly.
        //
        // Correct sequencing with respect to host operations is also
        // essential. The H2D copy_ is stream ordered, while the host's
        // changes to local_used_maps_ are host ordered. If a large backlog of
        // cuda-stream work pushes the copy_ far into the future, and if no
        // blocking calls occur between now and finalize_backward()** such
        // that finalize_backward() re-zeroes local_used_maps_ on the host
        // before the stream executes the copy_, copy_ will read those zeros
        // instead of the values we thought we told it to read here. Copying
        // local_used_maps_[i] to a pinned temporary (which the pinned caching
        // allocator should supply asynchronously) avoids this nasty, rare
        // race condition.
        //
        // ** In the hoped-for case where all params are used, DDP itself
        // won't do any blocking work between now and the re-zeroing, so the
        // danger is real.
        //
        // Defensively ensures local_used_maps_tmp is distinct from
        // local_used_maps_[i]
        auto local_used_maps_tmp = at::native::empty_like(
            local_used_maps_[i],
            optTypeMetaToScalarType(local_used_maps_[i].options().dtype_opt()),
            local_used_maps_[i].options().layout_opt(),
            local_used_maps_[i].options().device_opt(),
            true /* pinned_memory */);
        // Paranoid asserts here because in some workloads, the pinned
        // allocator behaves in a way we don't understand, and may be bugged.
        // See https://github.com/pytorch/pytorch/pull/54474
        TORCH_INTERNAL_ASSERT(local_used_maps_tmp.is_pinned());
        TORCH_INTERNAL_ASSERT(
            local_used_maps_tmp.data_ptr() != local_used_maps_[i].data_ptr());
        local_used_maps_tmp.copy_(local_used_maps_[i]);
        local_used_maps_dev_[i].copy_(local_used_maps_tmp, true);
      } else {
        local_used_maps_dev_[i].copy_(local_used_maps_[i], true);
      }
    }
    local_used_work_ = process_group_->allreduce(local_used_maps_dev_);
}
Copy the code

The expansion is as follows:

  1. Reduer would register autograd_hook on top of post_hooks for AccumulateGrad.
  2. The Autograd Engine calls the autograd_hook if it finds a parameter ready during backpropagation.
  3. Continue processing in autograd_hook.
  4. Call all_reduce_bucket to synchronize the gradient.
  5. The local_USed_maps_ variable is specified by calling AllReduce.
  6. A finalize_BACKWARD is registered to the engine.
  7. Finalize_backward is called within GraphTask::exec_post_processing.
                                                                             +
                                                                  Worker 1   |   Worker 2
                                                                             |
  Engine    AccumulateGrad                Reducer                            |    Reducer
                                                                             |
    +              +                         +                               |        +
    |              |                         |                               |        |
    |              |          1              |                               |        |
    |              | <-----------------------+                               |        |
    |              |                                                         |        |
    |              |                                                         |        |
    |              |                                                         |        |
    |              |                                                         |        |
    |              v                                                         |        |
    |                         2                                              |        |
    |         post_hooks  +-------->  autograd_hook                          |        |
    |                                        +                               |        |
    |                                        |                               |        |
    |                                        |  3| | | v | | | +------------------+---------------------------+ | | | | mark_variable_ready | | | | | | | | | | | | | | |  All variablein replica are ready?       |   |        |
    |                     |                   +                          |   |        |
    |                     |                   | YES                      |   |        |
    |                     |                   v                          |   |        |
    |                     |     All replica in bucket are ready?         |   |        |
    |                     |                   +                          +   +        |
    |                     |                   | YES            4all_reduce_bucket | | | v | | | mark_bucket_ready <--------------+---+-----> | | | | | | | | | | | | | | | | | | + | | |  | | | | | | | | | | | | | | v | | | | | All buckets are ready? | | | | | + | | | | | | YES + + | | | v5  allreduce      |
    |   6queue_back | all_reduce_local_used_map <--------+---+-----> | | <------------------------+ queue_callback(finalize_backward) | | | | | | | | | | | | | | +-------------------+--------------------------+ | | v | |  | | | | GraphTask::exec_post_processing | | | + | | | | | | | | v | | +-----------------------------> finalize_backward  | | |7                           +                              |        |
    |                                         |                              |        |
    |                                         |                              |        |
    v                                         v                              +        v
Copy the code

3.4 finalize_backward

Finalize_backward completes the finishing work with the logic:

  • Iterating over buckets, for each bucket:
    • Wait for the synchronization tensor to complete.
    • Copy back contents from the future result.
  • Wait for the local_USed_maps_dev synchronization to complete.
void Reducer::finalize_backward(a) {
  // No longer expect autograd hooks to fire after this function returns.
  expect_autograd_hooks_ = false;
  // No longer require call to finalize after this function returns.
  require_finalize_ = false;

  // Unset allreduce division factor, as it may change in next backwards pass
  // when running with DDP join mode.
  divFactor_ = kUnsetDivFactor;

  // Wait for asynchronous reduction to complete and unflatten contents.
  for (auto& bucket : buckets_) { / / traverse the barrel
    // See Note [DDP Communication Hook]
    if (comm_hook_ == nullptr) {
      bucket.work->wait(a);// Wait for synchronization to complete
    } else {
      bucket.future_work->wait(a);// Wait for synchronization to complete

      auto future_result =
          comm_hook_->parseHookResult(bucket.future_work->value());

      for (size_t i = 0; i < future_result.size(a); i++) {// 
        auto& replica = bucket.replicas[i];
        if (bucket.expect_sparse_gradient) {
          replica.contents.copy_(future_result[i]); // Copy back contents from the future result
        } else {
          // Reinitialize only `bucket_views_out` with the future_result by
          // following the same logic in `initialize_buckets`.
          // Parse future_result[I] into bucket_views_out
          populate_bucket_views_out(replica, future_result[i]); }}}if(! bucket.expect_sparse_gradient) {// We don't need to finalize the sparse bucket since the sparse grad and
      // the bucket essentially point to the same storage. As a result, once
      // the allreduce is done, the sparse grads are automatically updated.
      finalize_bucket_dense(bucket); // }}// See Note [Skip allreducing local_used_maps_dev]
  if (dynamic_graph_find_unused() | |static_graph_first_iteration()) {
    // Due to the lazy wait, it is possible that reduction of the current
    // iteration is still going when the one for next iteration gets kicked off.
    // For such case, we want to wait explicitly to make sure the reduction does
    // complete before kicking off next one. Otherwise the previous one may
    // interfere, write to the device-side memory and clobber the content of
    // local_unused_maps_dev_.
    if(! local_used_maps_reduced_) { local_used_work_->wait(a);// Wait for the local_used_maps_dev synchronization to complete}}if (dynamic_graph_find_unused()) {
    // Reset unused parameter accounting.
    // See Note [local_used_maps_ -> local_used_maps_dev copying]
    for (auto& local_used : local_used_maps_) {
      local_used.fill_(0);
    }
    local_used_maps_reduced_ = false;
  }

  if (should_collect_runtime_stats()) {
    record_backward_comm_end_time();
  }
}
Copy the code

This process uses the following function.

4.6.1 populate_bucket_views_out

Populate_bucket_views_out builds the output view from contents

// (see Note: "Gradient Layout Contract" in initialize_buckets).
void Reducer::populate_bucket_views_out( Reducer::BucketReplica& replica, at::Tensor& tensor) { // tensor translates into bucket_views_out
  replica.bucket_views_out.clear(a);/ / to empty
  for (size_t i = 0; i < replica.variables.size(a); i++) {// re-initialize bucket_views_out
    const auto& v = replica.variables[i]; // Iterate over the tensor of the copy
    const auto offset = replica.offsets[i];
    const auto length = replica.lengths[i];
    if (v.is_non_overlapping_and_dense()) {
      // If the param's memory is dense, match its layout, anticipating
      // the autograd engine (AccumulateGrad) will also create gradients
      // matching its layout.
      replica.bucket_views_out.push_back( // tensor translates into bucket_views_out
          tensor.as_strided(v.sizes(), v.strides(), offset));
    } else {
      // Fall back to a C-style contiguous view, again anticipating
      // AccumulateGrad will do the same when stashing grads for non-dense
      // params.
      replica.bucket_views_out.push_back( // tensor translates into bucket_views_out
          tensor.narrow(0, offset, length).view(v.sizes())); }}}Copy the code

4.6.1 finalize_bucket_dense

Finalize_bucket_dense function is invoked runGradCallbackForVariable or copy_bucket_to_grad will copy the code good gradient engine.

// A bucket with one or more dense tensors needs to be unflattened.
void Reducer::finalize_bucket_dense(Bucket& bucket) {
  for (size_t replica_index = 0; replica_index < bucket.replicas.size(a); replica_index++) {auto& replica = bucket.replicas[replica_index];
    for (size_t intra_bucket_index = 0;
         intra_bucket_index < replica.variables.size(a); intra_bucket_index++) {auto& variable = replica.variables[intra_bucket_index];
      const auto offset = replica.offsets[intra_bucket_index];
      const auto length = replica.lengths[intra_bucket_index];

      bool global_unused = false;
      // See Note [Skip allreducing local_used_maps_dev]
      if (static_graph_ || find_unused_parameters_) {
        // Determine if this param has been used globally or not.
        //
        // If the variable was used locally, it is also used globally and then
        // we don't need to wait for the reduction. Otherwise we lazily wait for
        // the reduction to complete, only when we see a variable that was
        // unused locally. Then we end up delaying the synchronization point
        // that local_used_work_->wait() implies. If we don't have any unused
        // parameters at all, we can skip waiting for the work to complete
        // altogether, and cause negligible performance overhead for models
        // where all parameters are used. Such lazily waiting means minimizing
        // performance impact for the big majority of models where all
        // parameters are always used. Then we only pay the overhead cost if
        // there is indeed a parameter that is locally unused, because we need
        // to check if it's also globally unused.
        size_t variable_index = bucket.variable_indices[intra_bucket_index];
        // Note: global_unused might not be global yet. As we lazily wait for
        // the reduction to complete, it becomes really global only if we get to
        // the point as below where we wait for the reduction work, make D2H
        // copy, and update global_unused with the real global consensus, i.e.
        // local_used_maps_reduced_ is true.
        global_unused =
            local_used_maps_[replica_index][variable_index].item<int= = > ()0;
        if(global_unused && ! local_used_maps_reduced_) {// Wait for local_used_maps reduction to complete.
          local_used_work_->wait(a);// D2H from local_used_maps_dev_ to local_used_maps_
          for (size_t i = 0; i < local_used_maps_.size(a); i++) {// Blocking copy, if local_used_maps_dev_ is cuda
            local_used_maps_[i].copy_(local_used_maps_dev_[i]);
          }
          global_unused =
              local_used_maps_[replica_index][variable_index].item<int= = > ()0;
          local_used_maps_reduced_ = true; }}if(! gradient_as_bucket_view_) {copy_bucket_to_grad( // Copy back to dist. Context
            variable, replica, intra_bucket_index, global_unused);
      } else {
        const auto& bucket_view_out =
            replica.bucket_views_out[intra_bucket_index];
        auto& bucket_view_in = replica.bucket_views_in[intra_bucket_index];
        // If communication_hook is registered, bucket_view_out stores
        // allreduced results in a newly allocated tensor, copy bucket_view_out
        // back to bucket_view_in that referring to replica.content tensor and
        // grad.
        if(! bucket_view_in.is_alias_of(bucket_view_out)) {
          bucket_view_in.copy_(bucket_view_out); // Copy from out to in view
        }
        runGradCallbackForVariable(variable, [&](auto& grad) {
          // If a parameter is globally unused, we keep its grad untouched.
          if(! global_unused) {// If grad is globally used but locally unused, let grad point to
            // bucket_view_in
            if(! grad.defined()) {
              grad = bucket_view_in;
            } else {
              if(! grad.is_alias_of(bucket_view_in)) {
                TORCH_CHECK(
                    false."Detected at least one parameter gradient is not the "
                    "expected DDP bucket view with gradient_as_bucket_view=True. "
                    "This may happen (for example) if multiple allreduce hooks "
                    "were registered onto the same parameter. If you hit this error, "
                    "please file an issue with a minimal repro."); }}// The grad is modified and needs to be written back.
            return true;
          }
          // The grad is not modified.
          return false; }); }}}}Copy the code

4.6.3 copy_bucket_to_grad

Here is the gradient copied from the bucket back into autograd.

void Reducer::copy_bucket_to_grad(
    at::Tensor& variable,
    Reducer::BucketReplica& replica,
    size_t intra_bucket_index,
    bool global_unused) {
  const auto& bucket_view = replica.bucket_views_out[intra_bucket_index]; // Get the output view
  runGradCallbackForVariable(variable, [&](auto& grad) {
    // If a parameter is globally unused, we keep its grad untouched.
    if(! global_unused) {if(! grad.defined()) {
        // Creates grad according to the "Gradient Layout Contract"
        // (see torch/csrc/grad/AccumulateGrad.h)
        grad =
            torch::autograd::utils::clone_obey_contract(bucket_view, variable);
      } else {
        grad.copy_(bucket_view); // Copy the gradient back from the bucket
      }
      // The grad is modified and needs to be written back.
      return true;
    }
    // The grad is not modified.
    return false;
  });
}
Copy the code

So far, we expand as follows:

  1. Reduer would register autograd_hook on top of post_hooks for AccumulateGrad.
  2. The Autograd Engine calls the autograd_hook if it finds a parameter ready during backpropagation.
  3. Continue processing in autograd_hook.
  4. Call all_reduce_bucket to synchronize the gradient.
  5. Call allreduce tolocal_used_maps_Variables are specified,local_used_maps_The goal is to calculate globally unused parameters.
  6. The DDP registers another Finalize_BACKWARD to the engine.
  7. Finalize_backward is called within GraphTask::exec_post_processing.
  8. DDP calls wait to synchronize with other workers.
  9. DDP calls copy_bucket_to_grad to copy the gradient from the bucket back to the autograd engine.

Therefore, we know a complete process of how autograd engine interacts with DDP in the process of back propagation, and how to merge gradient with DDP while doing reverse calculation.

                                                                             +
                                                                  Worker 1   |   Worker 2
                                                                             |
  Engine    AccumulateGrad                Reducer                            |    Reducer
                                                                             |
    +              +                         +                               |        +
    |              |                         |                               |        |
    |              |          1              |                               |        |
    |              |  <----------------------+                               |        |
    |              |                                                         |        |
    |              |                                                         |        |
    |              v                                                         |        |
    |                         2                                              |        |
    |         post_hooks  +-------->  autograd_hook                          |        |
    |                                        +                               |        |
    |                                        |                               |        |
    |                                        |  3| | | v | | | +------------------+---------------------------+ | | | | mark_variable_ready | | | | | | | | | | | | | | |  All variablein replica are ready?       |   |        |
    |                     |                   +                          |   |        |
    |                     |                   | YES                      |   |        |
    |                     |                   v                          |   |        |
    |                     |     All replica in bucket are ready?         |   |        |
    |                     |                   +                          +   +        |
    |                     |                   | YES           4all_reduce_bucket | | | v | | | mark_bucket_ready <--------------+---+-----> | | | | | | | | | | | | | | | | | | + | | |  | | | | | | | | | | | | | | v | | | | | All buckets are ready? | | | | | + | | | | | | YES + + | | | v5   allreduce      |
    |   6queue_back | all_reduce_local_used_map <--------+---+-----> | | <------------------------+ queue_callback(finalize_backward) | | | | | | | | | | | | | | +-------------------+--------------------------+ | | v | |  | | | | GraphTask::exec_post_processing | | | + | | | | | | | |7                          v                              |        |
    +----------------------------->   finalize_backward                      |        |
    |                                         +                 8       wait |        |
    |                                         |  <--------------------------------->  |
    | <-------------------------------------+ |                              |        |
    v         copy_bucket_to_grad     9       v                              +        v
Copy the code

At this point, the analysis of back propagation is finished, and all analysis of DDP is also finished. Next, we analyze distributed Autograd.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

BAGUA: Scaling up Distributed Learning with System Relaxations

Pytorch distributed series 3 – distributed training, torch. Utils. Data. The distributed. DistributedSampler do?

Pytorch Distributed Series 1 — Clarify the environment variables associated with Torch.distributed. Launch

Pytorch distributed data framework – How to synchronize allel?

Pytorch (distributed) data parallel personal practice – DataParallel/DistributedDataParallel

The nn Pytorch DataParallel

Discuss.pytorch.org/t/dataparal…

Pytorch.org/docs/stable…

PyTorch source code interpretation of distributed training to understand?

PyTorch AutoGrad C++ layer implementation

PYTORCH Automatic Differential (1)

How does PyTorch speed up data parallel training? Distributed secrets revealed

Pytorch Distributed training (two init_process_group)

Pytorch.org/tutorials/i…

Pytorch.org/docs/master…

Pytorch.org/tutorials/i…

DP & DDP for PyTorch source Code Interpretation: Model parallel and Distributed training parsing

Parameters and Buffers in the Pytorch model