0 x00 the

Since the previous article has made relevant analysis around the Reducer related member variables, this paper starts to do dynamic logic analysis, with the purpose of connecting the previous several articles in series to set the basis for the following analysis of forward propagation and back propagation.

Other articles in this series are as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

An introduction to 0 x01

For better analysis, we need to look at how to call.

1.1 call

The Reducer creation code is as follows, which is in _DDP_init_helper.

        # Note: reverse list of buckets because we want to approximate the
        # order in which their gradients are produced, and assume they
        # are used in the forward pass in the order they are defined.
        self.reducer = dist.Reducer(
            parameters, # parameters[0] is a list of tensors
            list(reversed(bucket_indices)), # barrel information
            self.process_group,
            expect_sparse_gradient,
            self.bucket_bytes_cap,
            self.find_unused_parameters,
            self.gradient_as_bucket_view,
            param_to_name_mapping,
        )
Copy the code

1.2 Parameter Description

Parameters [0] is the only element that makes sense. The original [0] element itself contains 20 elements:

parameters = {list: 1} 
0 = {list: 4}           
 0 = {Parameter: 10} Parameter containing:\ntensor([[4.0381 e-02.3.8828 e-02.1  )   
 1 = {Parameter: 10} Parameter containing:\ntensor([0.0438.0.2033.0.2771.0.0721.)2 = {Parameter: 5} Parameter containing:\ntensor([[0.0094.0.1319.0.0713.0.3155.)3 = {Parameter: 5} Parameter containing:\ntensor([0.0008.0.0582.0.1245.0.2538,)...20 = {Parameter: 5} Parameter containing:\ntensor([0.0008.0.0582.0.1245.0.2538, )                                                   
 __len__ = {int} 20
__len__ = {int} 1
Copy the code

Bucket_indices is an example:

Tensor indices — you need to have a tensor index for all your tensor, starting from 0 up to tensors. Size (). If the parameters of the model have 20 tensors, then the tensor index goes from 0 to 19 and it’s divided into six buckets, then each of these six buckets is the only one that doesn’t repeat.

+-----------------------------------------------------------------------+
|                                                                       |
|  <tensor index 0, tensor index 1, tensor index 2, tensor index 3>     |
|                                                                       |
|                                                                       |
|  <tensor index 4, tensor index 5, tensor 6> | | | | | |... | | | | | | <tensor index16, tensor index 17, tensor index 18, tensor index 19> |
|                                                                       |
+-----------------------------------------------------------------------+

Copy the code

Next, we will look at how to initialize the Reducer.

0x02 Reducer Is initialized

Code is located in the torch/lib/c10d/reducer. H and torch/lib/c10d/reducer. The CPP

2.1 Constructors

The specific logic is as follows:

  • See if this module is a multi-device module, specifically: traverse the tensor, get the device of the tensor, insert the device into a set structure, if there is more than one device in the set, it is multi-device
  • ifexpect_sparse_gradientsWithout setting, initialize expect_SPARse_gradients_ to false.
  • Calling initialize_BUCKETS initializes the buckets and assigns parameters to the buckets in reverse order as much as possible, so bucket communication can be more efficient. It is also possible to re-initialize the bucket again later at run time.
  • Add grad_accumulator to each parameter, which is responsible for gradient synchronization when backward.
    • Since these variables are leaf tensors of autograd diagrams, theirgrad_fnBoth are set to Gradient Accumulation Function.
    • The Reducer saved Pointers to these functions so that the Reducer knew whether they were used in the autograd propagation or not. If not, the Reducer set the gradient tensor (Grad tensors) of these functions to the protocol ready state.
    • Iterates through tensors, generating a VariableIndex of type VariableIndex for each tensor.
    • Grad_accumulator_ of Variable::AutogradMeta is the gradient accumulator used to accumulate leaf Variable.
    • Add the autograd_hook function from reducer into each grad_accumulator_. Index is the parameter of hook. This hook hangs on top of the Autograd graph and is responsible for gradient synchronization when backward. After grad_accumulator is executed, autograd_hook will run.
  • GradAccToVariableMap_ contains grad_Accumulator and index, so that it is easy to find unused parameters in autograd graph traversal.
  • Initialize backward_stats_.
  • Call initialize_local_used_map to initialize unused maps.
// The constructor takes a list of variables for every model replica.
// The bucket assignment for this reducer is specified as a list of
// buckets, each of which is specified as a list of indices into the
// variables list for **a single replica** (i.e. `variables[0]`).
Reducer::Reducer(
    std::vector<std::vector<at::Tensor>> replicas, / / tensor
    std::vector<std::vector<size_t>> bucket_indices, / / barrels of information
    c10::intrusive_ptr<c10d::ProcessGroup> process_group,
    std::vector<std::vector<bool>> expect_sparse_gradients,
    int64_t bucket_bytes_cap,
    bool find_unused_parameters,
    bool gradient_as_bucket_view,
    std::unordered_map<size_t, std::string> paramNames)
    : replicas_(std::move(replicas)),
      process_group_(std::move(process_group)),
      expect_sparse_gradients_(std::move(expect_sparse_gradients)),
      expect_autograd_hooks_(false),
      require_finalize_(false),
      next_bucket_(0),
      has_marked_unused_parameters_(false),
      find_unused_parameters_(find_unused_parameters),
      gradient_as_bucket_view_(gradient_as_bucket_view),
      local_used_maps_reduced_(false),
      num_iterations_(0),
      num_buckets_ready_(0),
      has_rebuilt_bucket_(false),
      bucket_bytes_cap_(bucket_bytes_cap),
      divFactor_(kUnsetDivFactor),
      static_graph_(false),
      comm_hook_(nullptr),
      thread_local_state_(at::ThreadLocalState()),
      ddp_debug_level_(parseDistDebugLevel()),
      param_names_(std::move(paramNames)) {

  // Check whether the module is multi_device_module
  // See if this module is a multi-device module
  {
    std::set<int> unique_devices;
    for (const auto& v : replicas_[0]) { // iterate over the tensor
      auto device_idx = int(v.device().index()); // The device to get the tensor
      if (unique_devices.find(device_idx) == unique_devices.end()) {
        unique_devices.insert(device_idx); // Insert the device into a set structure
        if (unique_devices.size(a) >1) { // If there is more than one device in a set, it is multiple devices
          is_multi_device_module_ = true; 
          break; }}}}// If `expect_sparse_gradients` is not specified, initialize it such that
  // we do not expect sparse gradients for any parameter.
  if (expect_sparse_gradients_.empty()) {
    expect_sparse_gradients_ = std::vector<std::vector<bool>>(
        replicas_.size(), std::vector<bool>(replicas_[0].size(), false));
  }

  // Initialize variable bucketing.
  // This can be reinitialized later after capturing runtime information.
  {
    std::lock_guard<std::mutex> lock(mutex_);
    initialize_buckets(std::move(bucket_indices)); // Initialize the bucket
  }

  // All variables are expected to have their `grad_fn` set to the gradient
  // accumulation function (since they are leafs in the autograd graph).
  // We store pointers to these functions such that we can check if they are
  // used in an autograd pass. If they are not, we know their grad tensors
  // can be marked as ready for reduction.
  {
    const auto replica_count = replicas_.size(a); grad_accumulators_.resize(replica_count);
    for (size_t replica_index = 0; replica_index < replica_count; // Only replicas_[0] is valid
         replica_index++) {
      const auto variable_count = replicas_[replica_index].size(a);// Number of tensors
      grad_accumulators_[replica_index].resize(variable_count); // Allocate memory for grad_accumulators_
        
      for (size_t variable_index = 0; variable_index < variable_count;
           variable_index++) { // Iterating over tensors, variable_index is the index of the tensor
        auto& variable = replicas_[replica_index][variable_index]; // Get the specific tensor
        const auto index = VariableIndex(replica_index, variable_index); // Each tensor generates a VariableIndex

        // The gradient accumulator function is lazily initialized once.
        // Therefore we can use its presence in the autograd graph as
        // evidence that the parameter has participated in an iteration.
        auto grad_accumulator =
            torch::autograd::impl::grad_accumulator(variable); // Get grad_accumulator_ for Variable::AutogradMeta

#ifndef _WIN32
        using torch::distributed::autograd::ThreadLocalDistAutogradContext;
#endif
        // Hook to execute after the gradient accumulator has executed.
        hooks_.emplace_back(
            // The accumulator adds a hook, which hangs above the Autograd graph and is responsible for gradient synchronization when backward.
            // after grad_accumulator is executed, autograd_hook will run
            grad_accumulator->add_post_hook(
                torch::make_unique<torch::autograd::utils::LambdaPostHook>(
                    [=](const torch::autograd::variable_list& outputs,
                        const torch::autograd::variable_list& /* unused */) {
#ifndef _WIN32
                      this->rpc_context_.set(
                          ThreadLocalDistAutogradContext::getContextPtr());
#endif
                      this->autograd_hook(index); Add the autograd_hook function from reducer
                      return outputs;
                    })),
            grad_accumulator);

        // Map raw function pointer to replica index and parameter index.
        // This is used later on when the autograd graph is traversed
        // to check for parameters for which no gradient is computed, if
        // find_unused_parameters=True.
        // Note that the mapping of gradient accumulator to variable should be
        // one to one as we deduplicate shared parameters before constructing
        // Reducer.
          
        // gradAccToVariableMap_ accumulates grad_accumulator & index. Thus, it will be convenient to find unused parameters in autograd graph traversal in the future
        if (find_unused_parameters_) {
          gradAccToVariableMap_[grad_accumulator.get()] = index;
        }

        numGradHooksTriggeredMap_[index] = 0;

        // The gradient accumulator is stored as weak_ptr in the autograd
        // metadata of the variable, so we have to keep it alive here for
        // the raw pointer to be valid.
        TORCH_CHECK(
            grad_accumulators_[replica_index][variable_index] == nullptr,
            c10::str(
                "Reducer tried to register duplicate grad accumulator for replica ",
                replica_index,
                " variable ",
                variable_index));
        grad_accumulators_[replica_index][variable_index] =
            std::move(grad_accumulator); }}}// Initialize backward stats vector.
  {
    const auto replica_count = replicas_.size(a); backward_stats_.resize(replica_count);
    const auto variable_count = replicas_[0].size(a); std::for_each( backward_stats_.begin(),
        backward_stats_.end(),
        [=](std::vector<int64_t>& v) { v.resize(variable_count); });
  }

  // See Note [Skip allreducing local_used_maps_dev]
  if (find_unused_parameters_) {
    initialize_local_used_map();
  }
}
Copy the code

Let’s look at each part in detail.

2.2 Initializing a Bucket

The initialize_buckets method is used to initialize buckets by adding a copy of the model for each bucket and a tensor list for each copy of the model:

  • Set rpc_context_ with distributed context.

    • If initialize_bucket is called inside the DDP constructor, it does not matter whether the RPC Context pointer (RPC Context PTR) is null, because grad does not change.
    • If initialize_bucket is called during the training cycle, such as inside rebuild_bucket, because grad might change and point to bucket_VIEW, it needs to check if the RPC Context PTR is null.
    • If the RPC context PTR is null, change variable.grad(); otherwise, change the gradient in the RPC context.
  • Clear buckets_ and variable_locators_.

  • Resize variable_locators_ so that each variable has a bucket index.

  • Bucket_count = bucket_indices.size(); bucket_count = bucket_indices. replica_count = replicas_.size();

  • Incrementing from 0 to bucket_count, the buckets are initialized one by one.

    • Generate a Bucket Bucket
    • ifbucket_indices[bucket_index].size() == 1, the bucket expects a single sparse gradient, so set bucket.expect_SPARse_gradient = true.
    • Incrementing from 0 to replica_count, BucketReplica is initialized one by one.
      • Generate a BucketReplica Replica
      • If the bucket expects a Single Sparse gradient, then
        • usingbucket_indices[bucket_index].front()Take the first element of the vector and set it to variable_index.
        • Use variable_index to get the corresponding variable in the copy.
        • Set the replica variable list as replica. Variables = {variable}. This replica contains only one variable.
      • Otherwise, it means “Dense gradient”, then
        • To iterate over the bucket’s variable, usereplicas_[replica_index][variable_index]Get variable.
        • Set the device and data type of variable
        • Set the replica to its variables: replica.variable.push_back (variable).
        • Set some meta information of replica about variable, which is related to flat contents. For example, offsets store the offsets of each tensor in flat bucket contents.
        • Allocate memory to relica.contents
        • Initialize_bucket_views (replica, replica.contents) is used to initialize cotnents and views.
        • Add the replica to the bucket using bucket.replicas.push_back(STD ::move(replica)).
    • Run the bucket_indices[bucket_index] command through the variable in the bucket.
      • Set reducer.variable_locators_ so that the Reducer knows how to determine a varaible within the bucket.bucket_indexbuckets_The position of the list, representsbuckets_A bucket above.intra_bucket_indexIs the variable index in the vector domain of bucket replica.
    • Bucket.variable_indices = STD :: MOVE (bucket_indices[bucket_index]); bucket.variable_indices = STD ::move(bucket_indices[bucket_index]);
    • Add the bucket to the Reducer using buckets_.push_back(STD ::move(bucket)).

The specific code is:

void Reducer::initialize_buckets(
    std::vector<std::vector<size_t>> bucket_indices) {
  // If initialize_buckets is called inside DDP constructor, then
  // it does not matter rpc context ptr is nullptr or not, as grad
  // will not be mutated.
  // If initialize_buckets is called during training loop, e.g, inside
  // rebuild_buckets(), since grad could be mutated and be pointed to
  // bucket_view, then it needs to check rpc context ptr is nullptr or not,
  // If rpc context ptr is nullptr, mutate variable.grad(); otherwise,
  // mutate grad in rpc context.
#ifndef _WIN32
  using torch::distributed::autograd::ThreadLocalDistAutogradContext;
  this->rpc_context_.set(ThreadLocalDistAutogradContext::getContextPtr());
#endif

  // This shouldn't be called if we're expecting autograd hooks to fire.
  TORCH_CHECK(
      !expect_autograd_hooks_,
      "`initialize_buckets` must NOT be called during autograd execution.");

  // Clear current bucket assignment.
  buckets_.clear(a); variable_locators_.clear(a);// Ensure we have a bucket index for every variable.
  variable_locators_.resize(replicas_[0].size());

  // Iterate over buckets.
  const auto bucket_count = bucket_indices.size(a);const auto replica_count = replicas_.size(a); buckets_.reserve(bucket_count);
  // Increments from 0 to bucket_count
  for (size_t bucket_index = 0; bucket_index < bucket_count; bucket_index++) {
    Bucket bucket; // Create a bucket

    // TODO(@pietern): Validate indices.
    // Must be non-empty, unique, and unique across buckets.
    TORCH_CHECK(
        bucket_indices[bucket_index].size(a) >0."Empty bucket specified.");

    // Variables that expect sparse gradients must have their own bucket.
    if (bucket_indices[bucket_index].size() = =1) {
      // Indicate that this bucket expects a Single Sparse gradient
      const auto variable_index = bucket_indices[bucket_index].front(a); bucket.expect_sparse_gradient = expect_sparse_gradients_[0][variable_index];
    } else {
      for (const auto variable_index : bucket_indices[bucket_index]) {
        TORCH_CHECK(
            !expect_sparse_gradients_[0][variable_index],
            "Buckets with more than one variable cannot include variables "."that expect a sparse gradient."); }}// Iterate over model replicas. Increments from 0 to replica_count iterate over the number of model replicas, doing the same for each
    for (size_t replica_index = 0; replica_index < replica_count;
         replica_index++) {
      BucketReplica replica; // Make a copy

      if (bucket.expect_sparse_gradient) {
        // Indicate that this bucket expects a Single Sparse gradient
        const auto variable_index = bucket_indices[bucket_index].front(a);// Get the index of the tensor
        const auto& variable = replicas_[replica_index][variable_index]; // Get the tensor
        TORCH_INTERNAL_ASSERT(bucket_indices[bucket_index].size() = =1);
        replica.variables = {variable}; // This copy contains only one variable
      } else {
        at::TensorOptions options;
        // The start index of the variable in the flattened tensor.
        size_t offset = 0;

        // Reserve enough space for the per-variable fields stored in bucket
        // replica for efficiency.
        const size_t num_variables = bucket_indices[bucket_index].size(a); replica.variables.reserve(num_variables); 
        replica.offsets.reserve(num_variables);
        replica.lengths.reserve(num_variables);
        replica.sizes_vec.reserve(num_variables);

        // Iterate over bucket variables.
        for (const auto variable_index : bucket_indices[bucket_index]) { // Iterate over the variable in the bucket
          TORCH_CHECK(
              variable_index < replicas_[replica_index].size(),
              "Out of range variable index specified.");
          const auto& variable = replicas_[replica_index][variable_index];
          if(! options.has_device()) {
            options = options.device(variable.device());
          } else {
            TORCH_CHECK(
                variable.device() == options.device(),
                "All parameters in a bucket must be "."placed on the same device.");
          }
          if(! options.has_dtype()) {
            options = options.dtype(variable.dtype());
          } else {
            TORCH_CHECK(
                variable.dtype() == options.dtype(),
                "All parameters in a bucket must have the same dtype.");
          }
          
          const auto length = variable.numel(a);// Set variables for the replica
          replica.variables.push_back(variable); // A new variable is added here, so we finally know the number of variables in the bucket
          // Set some meta information about the replica variable
          replica.offsets.push_back(offset);
          replica.lengths.push_back(length);
          replica.sizes_vec.push_back(variable.sizes());
          offset += length;
        }

        // Allocate bucket contents tensor.
        replica.contents = at::empty({static_cast<long>(offset)}, options);

        initialize_bucket_views(replica, replica.contents); Initialize cotents and views
      }

      // Add bucket replica to enclosing bucket.
      bucket.replicas.push_back(std::move(replica)); // Add a new replica to the bucket's replica list
    }

    // Map participating variables to this bucket.
    // This is identical across replicas so we only need to do this once.
    size_t intra_bucket_index = 0;
    for (const auto variable_index : bucket_indices[bucket_index]) { // Iterate over the variable in the bucket
      TORCH_CHECK(
          variable_index < variable_locators_.size(),
          "Out of range variable index specified.");
      variable_locators_[variable_index] = Reducer knows how to specify a varaible in the bucket
          VariableLocator(bucket_index, intra_bucket_index++);
    }
    bucket.variable_indices = std::move(bucket_indices[bucket_index]);

    buckets_.push_back(std::move(bucket)); // Insert the bucket into the Reducer}}Copy the code

2.3 Initializing the View

Initialize_bucket_views Set Replica contents and views.

// (see Note: "Gradient Layout Contract" in initialize_buckets).
void Reducer::initialize_bucket_views( Reducer::BucketReplica& replica, at::Tensor& contents) {
  for (size_t i = 0; i < replica.variables.size(a); i++) {auto& v = replica.variables[i];
    const auto offset = replica.offsets[i];
    const auto length = replica.lengths[i];
    if (v.is_non_overlapping_and_dense()) { // Dense type tensor
      // If the param's memory is dense, match its layout, anticipating
      // the autograd engine (AccumulateGrad) will also create gradients
      // matching its layout.
      replica.bucket_views_in.push_back( // Replica.bucket_views_in contains views
          contents.as_strided(v.sizes(), v.strides(), offset));
    } else { // Sparse tensors
      // Fall back to a C-style contiguous view, again anticipating
      // AccumulateGrad will do the same when stashing grads for non-dense
      // params.
      replica.bucket_views_in.push_back( // Replica.bucket_views_in contains views
          contents.narrow(0, offset, length).view(v.sizes()));
    }
    // By default `bucket_views_out` and `bucket_views_in` are
    // essentially the same thing.
    replica.bucket_views_out = replica.bucket_views_in; // Out is also a view

    // If gradient_as_bucket_view_ is set as true, then there are two cases to
    // handle: initialize_bucket_views could be called inside initialize_buckets
    // when rebuild_buckets, if grad has already been defined/calculated in
    // previous iteration, old grad needs to be copied into new bucket_view and
    // let grad point to the new bucket_view, initialize_bucket_views could also
    // be called inside initialize_buckets during construction. Grads are not
    // defined during construction time, in this case, do not let grad point to
    // bucket_view, because grads should be kept as being undefined for globally
    // unused parameters.
    if (gradient_as_bucket_view_) {
      auto& bucket_view = replica.bucket_views_in.back(a);runGradCallbackForVariable(v, [&](auto& grad) {
        if (grad.defined() && !grad.is_alias_of(bucket_view)) {
          bucket_view.copy_(grad);
          grad = bucket_view; // Gradient has been modified and needs to be written back
          // The grad is modefied and needs to be written back.
          return true;
        }
        // The grad is not modified and does not need to be written back.
        return false; // There is no need to write back because it has not been modified}); }}}Copy the code

2.3.1 BucketReplica member variable

Let’s start by recalling a few of BucketReplica’s member variables.

  • at::Tensor contents: Flattened the contents of the barrel, the result after Flattened (1 dimensional).
  • std::vector<at::Tensor> bucket_views_in: provides a way to view specific gradients in contents from an input perspective.
  • std::vector<at::Tensor> bucket_views_out: provides a way to view specific gradients in contents from an input perspective.

About STD ::vector
<:tensor>
bucket_views_in and STD ::vector
<:tensor>
bucket_views_out

  • These two variables provide methods for manipulating specific gradients in contents, or they provide views that can manipulate gradients for each tensor in contents. The user uses these two variables as entry points to move data in and out of the content for each gradient.
  • In PyTorch, a view is the creation of something that is easy to view. A view shares memory with the original data and simply rearranges the original data to display parts of it directly or by reordering them.

Several PyTorch functions also need to be explained.

  • As_strided: You’ll create a view from your existing tensor and your given step size (still of type tensor), but remember, the result here is a view, so this tensor still shares memory with the original tensor.
  • Narrow: Returns a new tensor that is a smaller version of the original, but still shares memory with the original.

BucketReplica logic is shown below:

+------------------------------------------+
| BucketReplica                            |
|                                          |
|       vector<Tensor> bucket_views_in +--------------------+
|                                          |                |
|                                          |                |
|       vector<Tensor> bucket_views_out +--------------+    |
|                                          |           |    |
|                                          |           |    |
|                                          |           v    v
|                                          |     +-----+----+--------------------------+
|       Tensor contents  +---------------------> |Flattened (Tensor1, Tensor2, Tensor3)|
|                                          |     +-------------------------------------+
|                                          |
|                                          |
|       vector<Tensor> variables  +------------>  [Tensor1,Tensor2,Tensor3]
|                                          |
|                                          |
|                                          |
+------------------------------------------+

Copy the code

2.3.2 call

How to call? If gradient_AS_bucket_view_ is set to true, there are two cases to deal with:

  • Initialize_bucket_view can be called from initialize_bucket in rebuild_buckets. If grad was defined/calculated in the last iteration, copy the old grad into the new bucket_view. And make grad point to the new bucket_view,
  • In the construction process, it can also be ininitialize_bucketIn the callinitialize_bucket_views. Gradients are not defined during construction, in which case do not point to bucket_view because gradients should remain undefined for globally unused parameters.

2.4 Initializing local use variables

Initialize_used_maps_ initialize_used_maps_ (Globally Unused Parameters)

The gradient of Globally Unused Parameters should remain the same both forwards and backwards. Detecting unused parameters requires global information, because in one DDP process, a parameter may not exist in one operation, but may participate in training in the same iteration of another process. So DDP maintains locally unused parameter information in bitmaps and starts additional AllReduce to collect global bitmaps. Because bitmaps are much smaller than tensor sizes, all parameters in the model share the same bitmap, rather than creating per-bucket bitmaps. The bitmap is on the CPU to avoid booting a dedicated CUDA kernel for each update. However, some ProcessGroup backends may not be able to run AllReduce on CPU tensors. For example, ProcessGroupNCCL supports only CUDA tensors. Also, because DDP should work with any custom ProcessGroup backend, it cannot assume that all backends support CPU tensors. To solve this problem, DDP maintains another bitmap on the same device as the first model parameter and invokes a non-blocking copy operation to move the CPU bitmap to the device bitmap for collection communication.

The specific code is as follows:

void Reducer::initialize_local_used_map(a) {
  const auto replica_count = replicas_.size(a);const auto variable_count = replicas_[0].size(a); local_used_maps_.resize(replica_count);
  local_used_maps_dev_.resize(replica_count);

  for (size_t i = 0; i < replica_count; i++) {
    at::TensorOptions options;
    options = options.dtype(at::kInt);

    // Deliberately don't pin the memory even if local_used_maps_dev_ will
    // be cuda. See Note [local_used_maps_ -> local_used_maps_dev copying]
    local_used_maps_[i] =
        at::zeros({static_cast<long>(variable_count)}, options);

    // This tensor needs to be on the same device as replica because backend
    // such as NCCL may not support CPU tensors, and hence it might not work
    // if we always put it on CPU.
    options = options.device(replicas_[i][0].device());
    local_used_maps_dev_[i] =
        at::empty({static_cast<long>(variable_count)}, options); }}Copy the code

The initialization process is as follows:

                                    +
                                    |
                                    |
                                    v
                  rpc_context_ = ThreadLocalDistAutogradContext
                                    +
                                    |
                                    |
                                    v
                  buckets_ & variable_locators_ (clear & resize)
                                    +
                                    |
                                    |
                                    v
+----------------------->  from 0~ bucket_count : +---------------------------> | + | | | +-------------------------------------------------------------------+ | | | init  Bucketset bucket_indices                           |     |
|      |                            +                                      |     |
|      |                            |                                      |     |
|      |                            |                                      |     |
|      |                            v                                      |     |
|      |   ^ +------------> from 0~ replica_count : +-----------------> | | | | | | | | | | | +---------------------------------------------------+ | | | | | | | init BucketReplica | | | | | | | | | | | | <----+ | +--+ | <--+ | <---+ | | bucket.replicas.push_back(std::move(replica)) | |  | | | | | +----------------------+----------------------------+ | | | | | | | | v | | buckets_.push_back(std::move(bucket)) | | + | +-------------------------------------------------------------------+ | vCopy the code

The Reducer obtained is roughly as follows. It should be noted that each bucket has only one BucketReplica:

            +----------------------------------------+                 +------------------+
            |tensor index 4, tensor index 5, tensor 6| <------+        | index 2, index 3| + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- + | ^ | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + +---------------------------------------------------------+ | Reducer | | +----------------------------------+ +------------+ | | | | |Bucket | | |Bucket | | | | | | | + | | | | | | vector<Bucket> buckets_ +---> | | vector<size_t> variable_indices | | indices ++ | | | | | | | | | | | | | | vector<BucketReplica> replicas | ... | replicas | | | | | | + | | + | | | | | | | | | | | | | | | +----------------------------------+ +------------+ | | | |  | | | +---------------------------+ +---------------------------------------------------------+ | | | | v v +---------------------------------------+ +-------------------+ | +----------------------------------+ | | +---------------+ | | | BucketReplica | | | | BucketReplica | | | | | | | | | | | | | | | | | | | | vector<Tensor> bucket_views_in | | | | views_in | | | | | | | | | | | | vector<Tensor> bucket_views_out | | | | views_out | | | | | | |  | | | | | Tensor contents | | | | contents | | | | | | | | | | | | vector<Tensor> variables | | | | variables | | | | +  | | | | + | | | +----------------------------------+ | | +---------------+ | +---------------------------------------+ +-------------------+ | | | | v v +---------------+------------+ +---------+----------+ |Tensor4, Tensor 5, Tensor 6|    | Tensor 2, Tensor 3 |
                                   +----------------------------+    +--------------------+
Copy the code

0 x03 static figure

3.1 the reason

Although PyTorch is a dynamic graph, the user can explicitly let DDP know that the training graph is static. This can be set when:

  1. The used and unused parameter sets remain the same throughout the training loop, in which case it doesn’t matter if the user sets find_unsued_parameters to true.

  2. The pattern of training does not change throughout the training cycle (meaning that there is no iteration-dependent control flow). When the graph is set to static, DDP will support cases that were not previously supported, such as:

    1. Reentrant back propagation.
    2. Multiple activation checkpointing.
    3. Activation CheckPointing and find_unused_parameters = true.
    4. Not all output tensors are used for loss calculations.
    5. There is a model parameter outside the forward function.
    6. When find_unsued_parameters=true or there are unused parameters, performance may be improved because DDP does not search the network for unused parameters within each iteration.

3.2 the use of

_set_STATIC_graph can configure static diagrams. This API should be called after DistributedDataParallel construction and before the training loop starts. Also, all ranks should be called in the same way. Such as:

ddp_model = DistributedDataParallel(model)
ddp_model._set_static_graph()
for i in range(n):
Copy the code

The code for _set_STATIC_graph is:

def _set_static_graph(self) :
    """ Users can explicitly let DDP know the trained graph is static, when 1) the set of used and unused parameters will not change during the whole training loop; in this case, it does not matter whether users set find_unsued_parameters = true or not. 2) how the graph is trained will not change during the whole training loop (meaning there is no control flow depending on iterations). When graph is set to be static, DDP will support cases that can not be supported in the past: 1) reentrant backwards 2) activation checkpointing multiple times 3) activation checkpointing with find_unused_parameters = true. 4) not all output tensors are used in loss calculation. 5) there is model parameter that is outside of forward function. 6) potentially improve performance when find_unsued_parameters = true or there are unused parameters, as DDP will not search graph in each iteraton to detect unused parameters when static_graph is set to be True. This API should be called after DistributedDataParallel construction, and before training loops starts. Also it should be called in the same way for all ranks. For example: ddp_model = DistributedDataParallel(model) ddp_model._set_static_graph() for i in range(n): ..... "" "
    self.static_graph = True
    self.reducer._set_static_graph() Call Reducer to configure
    self.logger._set_static_graph()
    if self.find_unused_parameters:
        warnings.warn(
            "You passed find_unused_parameters=true to DistributedDataParallel, "
            "`_set_static_graph` will detect unused parameters automatically, so "
            "you do not need to set find_unused_parameters=true, just be sure these "
            "unused parameters will not change during training loop while calling "
            "`_set_static_graph`."
        )
Copy the code

3.2 Reducer

The Reducer can only generate a static map after the first iteration, because after all PyTorch is still dynamic and had to go through a dynamic generation step anyway.

void Reducer::set_static_graph(a) {
  std::lock_guard<std::mutex> lock(mutex_);
  TORCH_CHECK(
      num_iterations_ == 0."set_static_graph() should be called before training loop starts "
      "and after DistributedDataParallel is constructed.");
  static_graph_ = true;
  // when static_graph_ is set as true, always initialize_local_used_map
  // and detect the global unused parameters in the first iteration.
  initialize_local_used_map(a); }Copy the code

0 barrels x04 reconstruction

4.1 Why reconstruction

Because PyTorch is dynamically generating the computed graph, you need to rebuild the bucket accordingly. However, the static graph will only be rebuilt after the first iteration and not if finD_UNused_parameters_ is set.

  // Returns true if we should rebuild buckets, else false. We only rebuild
  // buckets once after the first iteration and never rebuild them if
  // find_unused_parameters_.
  inline bool should_rebuild_buckets() const {
    return(static_graph_ || ! find_unused_parameters_) && ! has_rebuilt_bucket_; }Copy the code

4.2 Preparing for Reconstruction

Let’s start by looking at some of the preparations before rebuilding.

Push_rebuilt_params inserts a list of rebuild parameters.

void Reducer::push_rebuilt_params(const VariableIndex& index) {
  rebuilt_params_.push_back(
      replicas_[index.replica_index][index.variable_index]);
  rebuilt_param_indices_.push_back(index.variable_index);
}
Copy the code

Second, push_rebuilt_params_for_all_indices traverses each replica and sets the values for each variable in the replica.

void Reducer::push_rebuilt_params_for_all_indices(a) {
  std::lock_guard<std::mutex> lock(mutex_);
  if (!should_rebuild_buckets() | |! rebuilt_param_indices_.empty()) {
    return;
  }
  const auto replica_count = replicas_.size(a);for (size_t replica_index = 0; replica_index < replica_count;
       ++replica_index) {
    const auto variable_count = replicas_[replica_index].size(a);for (size_t variable_index = 0; variable_index < variable_count;
         ++variable_index) {
      const auto index = VariableIndex(replica_index, variable_index);
      push_rebuilt_params(index); }}}Copy the code

4.3 the reconstruction

Let’s look at the reconstruction mechanism.

DDP uses rebuilt_PARAMs_ and rebuilt_PARAM_indices_ to rebuild the bucket based on the time the tensor receives the gradient in the backward propagation.

The rebuild_buckets function makes broadcast communication calls and can overlap with the next forward() call, so it can be asynchronous.

  • Rebuilding a bucket with find_unused_parameters=true is an asynchronous operation because we can rebuild the bucket multiple times, where the subgraph is trained and the parameter index order may change more frequently.
  • In the case of find_unused_parameters=false, the bucket is rebuilt only once and the performance cost is negligible. If the bucket has been rebuilt, rebuild_buckets returns true.
bool Reducer::rebuild_buckets(a) {
  // Ensure reduction for previous backwards pass is finished. If user's model
  // has unused parameters for example, this will raise an error recommending to
  // run with find_unused_parameters=True, instead of the size mismatch
  // exception below.
  std::lock_guard<std::mutex> lock(mutex_);
  ensure_prior_reduction_finished(a);if (!should_rebuild_buckets() || rebuilt_params_.empty()) {
    return false;
  }

  std::vector<std::vector<size_t>> rebuilt_bucket_indices;
  std::vector<size_t> bucket_size_limits;
  bucket_size_limits.push_back(kDefaultFirstBucketBytes);
  bucket_size_limits.push_back(bucket_bytes_cap_);
  rebuilt_bucket_indices = compute_bucket_assignment_by_size(
      rebuilt_params_,
      bucket_size_limits,
      expect_sparse_gradients_[0],
      rebuilt_param_indices_);

  // For rebuilt bucket indices, it needs to be synced across all ranks.
  // Broadcast the newly rebuilt bucket indices from rank 0 in default.
  // After syncing up rebuilt bucket indices, initialize buckets for reducer.
  sync_bucket_indices(rebuilt_bucket_indices);

  has_rebuilt_bucket_ = true; // Only rebuild once
  rebuilt_params_.clear(a); rebuilt_param_indices_.clear(a);initialize_buckets(std::move(rebuilt_bucket_indices));
  return true;
}
Copy the code

4.4 When to set reconstruction

Rebuild is set only if:

  1. The storage bucket was rebuilt for the first time

  2. Static_graph_ is true or find_unused_parameters_ is false

  3. This backpropagation process requires running AllReduce.

Here we simply dump the tensor and its parameter indexes to rebuilt_PARAMs_ and rebuilt_param_indices_ based on the gradient arrival order. Then at the end of finalize_BACKWARD (), the bucket is rebuilt based on rebuilt_PARAMs_ and rebuilt_PARAM_indices_, and then broadcast and initialize the bucket.

Furthermore, we need to dump only one copy of the tensor and parameter index.

In the case of mark_variable_ready, push_rebuilt_params(index) is called to insert the list.

void Reducer::mark_variable_ready(VariableIndex index) {
  // Rebuild bucket only if 1) it is the first time to rebuild bucket 2)
  // static_graph_ is true or find_unused_parameters_ is false,
  // 3) this backward pass needs to run allreduce.
  // Here, we just dump tensors and their parameter indices into
  // rebuilt_params_ and rebuilt_param_indices_ based on gradient arriving
  // order, and then at the end of finalize_backward(), buckets will be
  // rebuilt based on rebuilt_params_ and rebuilt_param_indices_, and then
  // will be broadcasted and initialized. Also we only need to dump tensors
  // and parameter indices of one replica.
  if (should_rebuild_buckets()) {
    push_rebuilt_params(index); // Insert the list
  }

  const auto replica_index = index.replica_index;
  const auto variable_index = index.variable_index;

  if (replica_index == 0) {
    checkAndRaiseMarkedTwiceError(variable_index);
    perIterationReadyParams_.insert(variable_index);
  }
  backward_stats_[replica_index][variable_index] =
      current_time_in_nanos() - cpu_timer_.backward_compute_start_time;

  // Any time we mark a variable ready (be it in line due to unused parameters,
  // or via an autograd hook), we require a call to the finalize function. If
  // this doesn't happen before the next iteration (or call to
  // `prepare_for_backwards`), we know something is wrong.
  require_finalize_ = true;

  const auto& bucket_index = variable_locators_[variable_index];
  auto& bucket = buckets_[bucket_index.bucket_index];
  auto& replica = bucket.replicas[replica_index];

  set_divide_factor(a);if (bucket.expect_sparse_gradient) {
    mark_variable_ready_sparse(index);
  } else {
    mark_variable_ready_dense(index);
  }

  // TODO(@pietern): Make this work for both CPU/CUDA tensors.
  // When using CPU tensors we don't need to do this.
  // // Record event so that we can wait for all of them.
  // auto& event = replica.events[bucket_index.intra_bucket_index];
  // event.record();

  // Check if this was the final gradient for this bucket.
  if (--replica.pending == 0) {
    // Kick off reduction if all replicas for this bucket are ready.
    if (--bucket.pending == 0) {
      mark_bucket_ready(bucket_index.bucket_index); }}// Run finalizer function and kick off reduction for local_used_maps once the
  // final bucket was marked ready.
  if (next_bucket_ == buckets_.size()) {

    if (dynamic_graph_find_unused()) {
      all_reduce_local_used_map(a); }// The autograd engine uses the default stream when running callbacks, so we
    // pass in the current CUDA stream in case it is not the default.
    const c10::Stream currentStream = get_current_stream(a); torch::autograd::Engine::get_default_engine().queue_callback([=] {
      std::lock_guard<std::mutex> lock(this->mutex_);
      // Run callback with the current stream
      c10::OptionalStreamGuard currentStreamGuard{currentStream};
      if (should_collect_runtime_stats()) {
        record_backward_compute_end_time(a); }// Check that all buckets were completed and had their work kicked off.
      TORCH_INTERNAL_ASSERT(next_bucket_ == buckets_.size());
      this->finalize_backward();
    });
  }
}
Copy the code

4.5 Direct Invocation

The _rebuild_buckets function can also be called directly, for example, if the forward function is called once during the training period.

def forward(self, *inputs, **kwargs) :
    with torch.autograd.profiler.record_function("DistributedDataParallel.forward"):
        self.reducer.save_thread_local_state()
        if torch.is_grad_enabled() and self.require_backward_grad_sync:
            self.num_iterations += 1
            self.reducer.prepare_for_forward()
        if self.ddp_uneven_inputs_config.ddp_join_enabled:
            ones = torch.ones(1, device=self.device)
            work = dist.all_reduce(ones, group=self.process_group, async_op=True)
            if self.ddp_uneven_inputs_config.ddp_join_throw_on_early_termination:
                # Active ranks schedule an allreduce with zeros, inactive
                # ranks schedule them with 1. If the result ! = 0 it
                # indicates at least one rank has terminated and we should
                # throw.
                zeros = torch.zeros(1, device=self.device)
                dist.all_reduce(zeros, group=self.process_group)
                should_throw_stop_iteration = zeros.item()
                if should_throw_stop_iteration:
                    raise RuntimeError(
                        "Detected at least one rank that exhausted inputs. Throwing across all ranks."
                    )
            else:
                self.reducer._set_forward_pass_work_handle(
                    work,
                    self.ddp_uneven_inputs_config.ddp_join_divide_by_initial_world_size,
                )

        # Calling _rebuild_buckets before forward compuation,
        # It may allocate new buckets before deallocating old buckets
        # inside _rebuild_buckets. To save peak memory usage,
        # call _rebuild_buckets before the peak memory usage increases
        # during forward computation.
        # This should be called only once during whole training period.
        
        Make a direct call here
        if torch.is_grad_enabled() and self.reducer._rebuild_buckets(): # set
            logging.info("Reducer buckets have been rebuilt in this iteration.")
Copy the code

For example, the Join method can also be directly called for reconstruction.

@contextmanager
def join(
    self,
    divide_by_initial_world_size=True,
    enable=True,
    throw_on_early_termination=False.) :
  
  									# Ignore other code
    
                    else:
                        # Some DDP process still needs to be joined.
                        if self.ddp_uneven_inputs_config.ddp_join_throw_on_early_termination:
                            # Schedule allreduce telling active ranks to terminate
                            ones = torch.ones(1, device=self.device)
                            dist.all_reduce(ones, group=self.process_group)
                            # Raising StopIteration doesn't throw error in python 3.6
                            Throws RuntimeError in 3.7+ (PEP 479), so just
                            # raise RuntimeError here.
                            raise RuntimeError(
                                f"Rank {self._distributed_rank} exhausted all inputs."
                            )
                        if is_last_joiner:
                            is_last_joiner = False
                        # It will rebuild buckets only once during training period
                        
                        # call here.
                        self.reducer._rebuild_buckets()
                        # Schedule a corresponding broadcast if we are syncing module
                        # buffers in the forward pass.
                        self._check_and_sync_module_buffers()   
Copy the code

Now that we’re talking about joins, let’s look at this concept.

0x05 Join

Join aims to solve the problem of uneven training data, that is, to allow some workers with less input (who have completed the Join operation) to continue to perform collective communication with those workers that have not finished yet, which is a spooking operation (Shadow).

5.1 origin

Behind DDP are the all-reduce operations of several set communication libraries, which complete the gradient synchronization between workers. And when the input between the levels of training data is uneven, the DDP can stall. Because ensemble communication requires participation by all ranks in the process group, if one rank has a low input, the other ranks will hang or report errors (depending on the back end), and any class performing synchronous ensemble communication will encounter this problem at every iteration.

Therefore, DDP provides a “Join” API, which is a context manager used in the training loop of each rank. A rank with a low volume of data will run out of inputs ahead of time, which will give a false impression of the set traffic and build a dummy all-reduce to match other ranks when data is low. How to create this illusion is specified by the registration hook.

The general idea is as follows:

+----------------------------+ | Data | | +--------+ +--------+ | | | | | Empty | | | | | | | | | +-----+--+ +--------+ | | | | | | | +----------------------------+ | | +------------+ | +------------+ | | | | | +----> | Model | | | Model | <-----+ | | | | | | | | +------+-----+ | +------+-----+ | | | | | | | | | | | | v | v | | +------+-----+ | +--------+----------+ | | | Forward +<---+ | _JoinHook | | | | (local) | | | | | +------+-----+ | | | | | | | | | | | | | | v | +---------------+ | | | +------+-----+ | | main_hook | | | | | Backward | | | | | | | | (local) | | | | | | | +------+-----+ | | | | | | | | | | | | | | | | | | | | v | | | | | | +------+-----+ | | | | | | | All-Reduce | Sync grads | | All-Reduce | | | | | | <--------------> | | (Dummy) | | | | +------+-----+ | | | | | | | | +---------------+ |  | | | +-------------------+ | | v | | | +--------+-------+ | | | | Update Weights | | | | | | | | | +--------+-------+ | | | | | | | | | | +--------------+ +-------------+Copy the code

5.2 the use of

5.2.1 DistributedDataParallel

Join can be used with gradient distribution. For example, rank 0 and rank 1 will start two workers. Rank 0 will receive 5 inputs and rank 1 will receive 6 inputs.

If no Join is used, rank 1 will die and hang on the sixth input, because rank 0 has no input, so rank 1 has to wait. If you use Join, this problem does not occur and you can end up with no problem.

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.distributed.algorithms.join import Join
from torch.nn.parallel import DistributedDataParallel as DDP

BACKEND = "nccl"
WORLD_SIZE = 2
NUM_INPUTS = 5

def worker(rank) :
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(BACKEND, rank=rank, world_size=WORLD_SIZE)

    model = DDP(torch.nn.Linear(1.1).to(rank), device_ids=[rank])
    # Rank 1 gets one more input than rank 0
    inputs = [torch.tensor([1]).float(a)for _ in range(NUM_INPUTS + rank)]

    num_inputs = 0
    with Join([model]):
        for input in inputs:
            num_inputs += 1
            loss = model(input).sum()
            loss.backward()

    print(f"Rank {rank} has exhausted all {num_inputs} of its inputs!")

def main() :
    mp.spawn(worker, nprocs=WORLD_SIZE, join=True)

if __name__ == "__main__":
    main()
Copy the code

This produces the following output (where print comes from ranks 0 and 1 and can be sorted arbitrarily) :

Rank 0 has exhausted all 5 of its inputs!
Rank 1 has exhausted all 6 of its inputs!
Copy the code

5.2.2 ZeroRedundancyOptimizer

The Join context works not only with one class, but with multiple classes, such as PyTorch’s ZeroRedundancyOptimizer.

from torch.distributed.optim import ZeroRedundancyOptimizer as ZeRO
from torch.optim import Adam

def worker(rank) :
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(BACKEND, rank=rank, world_size=WORLD_SIZE)

    model = DDP(torch.nn.Linear(1.1).to(rank), device_ids=[rank])
    optim = ZeRO(model.parameters(), Adam, lr=0.01)
    # Rank 1 gets one more input than rank 0
    inputs = [torch.tensor([1]).float(a)for _ in range(NUM_INPUTS + rank)]

    num_inputs = 0
    # Pass both `model` and `optim` into `Join()`
    with Join([model, optim]):
        for input in inputs:
            num_inputs += 1
            loss = model(input).sum()
            loss.backward()
            optim.step()

    print(f"Rank {rank} has exhausted all {num_inputs} of its inputs!")
Copy the code

This will produce the same output as before. The significant change is that you need to pass an instance of ZeroRedundancyOptimizer separately to Join().

Mechanisms such as ZeroRedundancyOptimizer will be analyzed later.

5.3 the principle

In the latest documentation pytorch.org/tutorials/a… PyTorch gives some explanations.

For better use, we’ll introduce the Join class and the support classes Joinable and JoinHook.

Note: This section is in the v1.10.0 version code.

5.3.1 Joinable

First, classes that are compatible with the Join Context manager must inherit from the abstract base class Joinable. In particular, Joinable must implement:

  • join_hook(self, **kwargs) -> JoinHook

This returns the JoinHook instance Joinable, which determines how the joining process should affect the collective communication performed by the Joinable for each iteration.

  • join_device(self) -> torch.device

This returns the device used by the Join context manager to perform collective communication, such as torch. Device (” CUDA :0″) or Torch. Device (” CPU “).

  • join_process_group(self) -> ProcessGroup

This returns the group of processes that the Join context manager uses to perform the collective communication.

To recap, JoinHook is responsible for specific behavior, and JOIN_device and JOIN_PROCESS_Group are responsible for specific set communication.

Note that join_device and JOIN_PROCESS_group are required attributes to ensure that the context manager can arrange collective communication between “joined” and “unjoined” processes. One usage is to use all-reduce to count the number of “unadded” processes in each iteration. Another use is the mechanism required to implement throw_on_early_termination=True, which we explain below.

DistributedDataParallel and ZeroRedundancyOptimizer have inherited Joinable and implemented the above method, which is why we can use them directly in the previous examples.

class DistributedDataParallel(Module, Joinable) :

class ZeroRedundancyOptimizer(Optimizer, Joinable) :
Copy the code

DDP involves providing data, so it makes sense to inherit Joinable. Why should ZeroRedundancyOptimizer inherit? This is because ZeroRedundancyOptimizer can work with DDP, and ZeroRedundancyOptimizer also has collection operations inside, so it needs to be managed by Join. `

The Joinable class should ensure that the Joinable constructor is called because it initializes an instance of JoinConfig, which the context manager uses internally to ensure correctness. JoinConfig will be saved in the _join_config ‘field of each Joinable.

5.3.2JoinHook

Next, let’s break down the JoinHook class. JoinHook provides two entry points into the context manager:

  • main_hook(self) -> None

This hook is called repeatedly by each rank that has joined an unjoined rank. It is intended to hide the collective communication performed by Joinable in each training iteration (for example, ina forward pass, reverse pass, and optimizer step), that is, how the ranks that have been joined communicate with the ranks that have not been joined.

  • post_hook(self, is_last_joiner: bool) -> None

Once all ranks have been added, this hook is called. It passes an additional bool is_last_Joiner that indicates whether this rank is one of the last to be added. This parameter may be useful for synchronization.

5.3.2.1 ZeroRedundancyOptimizer

The built-in ZeroRedundancyOptimizer main Hook provides a concrete example of a hook: Since the added rank is still responsible for updating and synchronizing its argument sharding, the main Hook still performs the optimizer steps.

class _ZeROJoinHook(_JoinHook) :
    def __init__(self, zero) :
        assert isinstance(zero, ZeroRedundancyOptimizer), \
            "ZeRO join hook requires passing in a ZeroRedundancyOptimizer " \
            "instance as the state"
        self.zero = zero
        super().__init__()

    def main_hook(self) :
        """ Performs an optimizer step, which updates the joined process's shard of the parameters and broadcasts those parameters. """
        self.zero.step()
Copy the code

The step function is abbreviated as follows:

def step(
    self,
    closure: Optional[Callable[[].float]] = None,
    **kwargs: Any.) - >Optional[float] :
    _Join.notify_join_context(self) # this will be notified
    # Check if the model trainability has changed
    is_trainable_mask = self._get_is_trainable_mask()
    ifis_trainable_mask ! = self._is_trainable_mask: self._build_param_buckets() self._is_trainable_mask = is_trainable_mask# Sync the exposed `param_groups` attributes to the local optimizer in
    # case they have been updated
    self._sync_param_groups(self.param_groups, self.optim.param_groups)

    # Run the optimizer step on this shard only
    if closure is not None:
        loss = self.optim.step(closure=closure, **kwargs)  # type: ignore[call-arg]
    else:
        loss = self.optim.step(**kwargs)

    # Sync all of the updated parameter shards across the ranks
    self._sync_parameters()

    # Sync any updated attributes in the local optimizer to the exposed
    # `param_groups`
    self._sync_param_groups(self.optim.param_groups, self.param_groups)

    return loss
Copy the code

DistributedDataParallel – allel

  • Main_hook still does a series of operations to trick other ranks.
  • Post-hook broadcasts the final updated model from one of the last ranks added to ensure that the model is the same across all ranks.
class _DDPJoinHook(_JoinHook) :
    def __init__(self, ddp, divide_by_initial_world_size) :
        """ Sets config variables for internal usage. """
        ddp.logger._set_uneven_input_join()
        self.ddp = ddp
        self.ddp._divide_by_initial_world_size = divide_by_initial_world_size
        super().__init__()

    def main_hook(self) :
        """ Shadows the DDP collective communication operations in the forward and backward passes. """
        ddp = self.ddp
        # Buckets are rebuilt only once during a training period
        ddp.reducer._rebuild_buckets()

        # Schedule a broadcast if we are syncing module buffers in the
        # forward pass
        ddp._check_and_sync_module_buffers()

        # Check if need to sync in the backward pass
        work = ddp._check_global_requires_backward_grad_sync(is_joined_rank=True)
        work.wait()
        should_sync_backwards = work.result()[0].item() ! =0
        # Forward parameter sync is disabled in the next iteration if we
        # are skipping gradient sync this iteration, so set
        # `require_forward_param_sync` accordingly
        ddp.require_forward_param_sync = should_sync_backwards
        if not should_sync_backwards:
            return

        # Schedule one allreduce per gradient bucket to match the backward
        # pass allreduce
        ddp._match_all_reduce_for_bwd_pass()

        # Check if we need to allreduce locally unused parameters
        if ddp.find_unused_parameters:
            ddp._match_unused_params_allreduce()

        # Rebuilt parameters are pushed only once during a training period
        ddp.reducer._push_all_rebuilt_params()

    def post_hook(self, is_last_joiner: bool) :
        """ Syncs the final model to ensure that the model is the same across all processes. """
        self.ddp._sync_final_model(is_last_joiner)
Copy the code

_sync_final_model Where the latest model is broadcast.

# When running in join model, agrees upon a common rank and broadcast model
# parameters to all other ranks.
def _sync_final_model(self, is_last_joiner) :
    # Agree upon the process that will be the authoritative model copy.
    # The current rank is a candidate for being the authoritative copy if
    # is_last_joiner=True. We break ties via picking the larger rank.
    self._authoritative_rank = self._find_common_rank(
        self._distributed_rank, is_last_joiner
    )
    self._sync_params_and_buffers(authoritative_rank=self._authoritative_rank)
Copy the code

5.3.3 Join

Finally, let’s look at how these base classes fit into the Join class itself.

  • __init__(self, joinables: List[Joinable], enable: bool = True, throw_on_early_termination: bool = False)

As we saw in the previous example, the constructor receives a Joinable list that participates in the training loop. These should be the classes that perform collective communication in each iteration.

Enable is a bool and can be set to False if you know there will be no uneven input, in which case the context manager becomes similar to contextlib.nullContext (). This may also disable join-related calculations in the Joinable list.

Throw_on_early_termination is a bool, which can be set to True so that each level throws an exception when uneven input is detected. This is useful for situations where the context manager requirements are not met, usually when collective communication from different classes can be interleaved arbitrarily, such as when DistributedDataParallel is used with models with SyncBatchNorm layers. In this case, set this parameter to True so that the application logic can catch the exception and determine how to proceed.

  • The core logic appears in the__exit__()Method, which loops through each unadded rank if it existsJoinableAnd then once all the ranks have joined, call their POST hook. Both the primary hook and the rear hook followJoinableThe order in which S is passed is iterated.
  • The context manager needs a heartbeat from an unjoined process. Therefore, everyJoinableClasses should be invoked before the collective communication of each iterationJoin.notify_join_context(). The context manager will ensure that only the first one is passed inJoinableActually sends the heartbeat.

5.4 example

Let’s look at it concretely with an example. In the following code, each rank prints (1) the number of inputs for all ranks seen before the Join, and (2) the total number of inputs for all ranks.

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.distributed.algorithms.join import Join, Joinable, JoinHook

BACKEND = "nccl"
WORLD_SIZE = 2
NUM_INPUTS = 5

class CounterJoinHook(JoinHook) :
    r""" Join hook for :class:`Counter`. Arguments: counter (Counter): the :class:`Counter` object using this hook. sync_max_count (bool): whether to sync the max count once all ranks join. """
    def __init__(self, counter, sync_max_count) :
        self.counter = counter
        self.sync_max_count = sync_max_count

    def main_hook(self) :
        r""" Shadows the counter's all-reduce by all-reducing a dim-1 zero tensor. """
        t = torch.zeros(1, device=self.counter.device)
        dist.all_reduce(t)

    def post_hook(self, is_last_joiner: bool) :
        r""" Synchronizes the max count across all :class:`Counter` s if ``sync_max_count=True``. """
        if not self.sync_max_count:
            return
        rank = dist.get_rank(self.counter.process_group)
        common_rank = self.counter.find_common_rank(rank, is_last_joiner)
        if rank == common_rank:
            self.counter.max_count = self.counter.count.detach().clone()
        dist.broadcast(self.counter.max_count, src=common_rank)

class Counter(Joinable) :
    r""" Example :class:`Joinable` that counts the number of training iterations that it participates in. """
    def __init__(self, device, process_group) :
        super(Counter, self).__init__()
        self.device = device
        self.process_group = process_group
        self.count = torch.tensor([0], device=device).float()
        self.max_count = torch.tensor([0], device=device).float(a)def __call__(self) :
        r""" Counts the number of inputs processed on this iteration by all ranks by all-reducing a dim-1 one tensor; increments its own internal count. """
        Join.notify_join_context(self)
        t = torch.ones(1, device=self.device).float()
        dist.all_reduce(t)
        self.count += t

    def join_hook(self, **kwargs) -> JoinHook:
        r""" Return a join hook that shadows the all-reduce in :meth:`__call__`. This join hook supports the following keyword arguments: sync_max_count (bool, optional): whether to synchronize the maximum count across all ranks once all ranks join; default is ``False``. """
        sync_max_count = kwargs.get("sync_max_count".False)
        return CounterJoinHook(self, sync_max_count)

    @property
    def join_device(self) -> torch.device:
        return self.device

    @property
    def join_process_group(self) :
        return self.process_group

    # select the rank with the largest rank to join
    def find_common_rank(self, rank, to_consider) :
        r""" Returns the max rank of the ones to consider over the process group. """
        common_rank = torch.tensor([rank if to_consider else -1], device=self.device)
        dist.all_reduce(common_rank, op=dist.ReduceOp.MAX, group=self.process_group)
        common_rank = common_rank.item()
        return common_rank

def worker(rank) :
    assert torch.cuda.device_count() >= WORLD_SIZE
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(BACKEND, rank=rank, world_size=WORLD_SIZE)

    counter = Counter(torch.device(f"cuda:{rank}"), dist.group.WORLD)
    inputs = [torch.tensor([1]).float(a)for _ in range(NUM_INPUTS + rank)]

    with Join([counter], sync_max_count=True) :for _ in inputs:
            counter()

    print(f"{int(counter.count.item())} inputs processed before rank {rank} joined!")
    print(f"{int(counter.max_count.item())} inputs processed across all ranks!")

def main() :
    mp.spawn(worker, nprocs=WORLD_SIZE, join=True)

if __name__ == "__main__":
    main()
Copy the code

Since rank 0 sees 5 inputs and rank 1 sees 6, it produces output:

10 inputs processed before rank 0 joined!
11 inputs processed across all ranks!
11 inputs processed before rank 1 joined!
11 inputs processed across all ranks!
Copy the code

Some key points to highlight:

  • The Counter instance performs an All Reduce operation in each iteration, so:
    • For the rank that has joined, its main hook also performs a single All Reduce to shadow it. Note that this all-reduce calls a tensor with a value of 0, so it doesn’t affect the overall result.
    • Other ranks that are not joined will assume that this is still a correct full set operation.
    • This handles the uneven input.
  • Counter in the class__call__()Method at the beginning of the callJoin.notify_join_context()Because this is where each collection operation (all-reduce) takes place, this is where the context manager needs to be informed that this example has not yet been joined (finished ranks will not be called here).
  • The ‘is_last_Joiner’ parameter is used to determine the broadcast source in post-hooks.
  • We will besync_max_countKeyword arguments are passed to the context manager, which forwards them to the join hook of ‘Counter’.
  • In post-hooks, self.counter. Max_count is broadcast.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Pytorch distributed series 3 – distributed training, torch. Utils. Data. The distributed. DistributedSampler do?

Pytorch Distributed Series 1 — Clarify the environment variables associated with Torch.distributed. Launch

Pytorch distributed data framework – How to synchronize allel?

Pytorch (distributed) data parallel personal practice – DataParallel/DistributedDataParallel

The nn Pytorch DataParallel

Discuss.pytorch.org/t/dataparal…

Pytorch.org/docs/stable…

PyTorch source code interpretation of distributed training to understand?

PyTorch AutoGrad C++ layer implementation

PYTORCH Automatic Differential (1)

How does PyTorch speed up data parallel training? Distributed secrets revealed

Pytorch Distributed training (two init_process_group)

Pytorch.org/tutorials/i…

Pytorch.org/docs/master…

Pytorch.org/tutorials/i…

DP & DDP for PyTorch source Code Interpretation: Model parallel and Distributed training parsing

Parameters and Buffers in the Pytorch model

PyTorch Developer Day 2020 PyTorch Distributed Data Parallel (DDP)

Learn more about The Hook mechanic in PyTorch

[英 文 subtitle] An in-depth look at Pytorch AutoGrad

DISTRIBUTED TRAINING WITH UNEVEN INPUTS USING THE JOIN CONTEXT MANAGER

Talk about ZeroRedundancyOptimizer and Join in torch1.10