0 x00 the

The previous paper has introduced how to construct Reducer and several important scenarios. This paper analyzes how Reducer achieves forward propagation.

Other articles in this series are as follows:

Other articles in this series are as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

—– DistributedDataParallel – gradient initialization

PyTorch distributed (10)—— distributed Dataparreducer static schema

—– DistributedDataParallel constructs Reducer and Join operations

0x01 General Logic

We still need to offer a magic weapon, take a look at the overall logic of DDP in the paper:

Then an overall forward propagation strategy is given as follows:

Forward Pass:

  • Each process reads its own training data, and the DistributedSampler ensures that each process reads different data.
  • The DDP takes input and passes it to the local model.
  • The model is calculated forward and the result is set to OUT. Computing is now done on each process (CUDA device).
  • iffind_unused_parametersSet toTrue, DDP will analyze the output of the local model and traverse the calculation graph from OUT, marking unused parameters as ready, because the calculation graph will change every time, so it will traverse every time.
    • This Mode (Mode) allows running backwards on a subgraph of the model, and DDP traverses the Autograd graph by outputting out from the model, marking all unused parameters as ready to reduce the parameters involved in the back pass.
    • During backward propagation, the Reducer will specify all buckets, and during this process,ReducerWaits for an unprepared parameter. Marking parameter gradients ready does not help DDP skip buckets, but it prevents DDP from waiting forever for non-existent gradients during backpass.
    • Note that traversing an Autograd diagram introduces additional overhead, so the application only sets it up when necessaryfind_unused_parametersforTrue
  • Return out. Unlike DP, DDP’s model network output does not need to be gathered into rank 0.

0 x02 Python world

Or we start from the Python code, the code is located in: the torch/nn/parallel/distributed. Py.

We omit the join correlation here and focus only on the body part. The logic of the forward method is as follows:

  • Save thread local state.
  • If you do the configuration, call reduce. prepare_for_forward to prepare for the forward.
  • If DDP_JOIN_enabled is configured, handle it accordingly.
  • Use _rebuild_buckets to reset the bucket before forward propagation.
    • In the _rebuild_buckets function, a new bucket might be allocated before the old bucket is released.
    • If you want to save peak memory usage, call before peak memory usage increases during forward calculation_rebuild_bucket.
  • If synchronization is required, call _sync_params to forward propagate parameters.
  • Carry forward propagation.
  • If you need to synchronize the backward propagation gradient, prepare_for_BACKWARD is called.
    • When the DDP parameter find_unused_parameter is true, it starts a backtrace at the end of the forward to mark all unused parameters, setting them to ready in advance. This can be done with a BACKWARD on top of a subgraph, but this will sacrifice some time.

The specific code is as follows:

    def forward(self, *inputs, **kwargs) :
        with torch.autograd.profiler.record_function("DistributedDataParallel.forward") :Save thread local state
            self.reducer.save_thread_local_state()
          
            Call reducer to prepare forward
            if torch.is_grad_enabled() and self.require_backward_grad_sync:
                self.logger.set_runtime_stats_and_log()
                self.num_iterations += 1
                self.reducer.prepare_for_forward()
                
            If ddP_JOIN_enabled is configured, do the same
            if self.ddp_uneven_inputs_config.ddp_join_enabled:
                ones = torch.ones(1, device=self.device)
                work = dist.all_reduce(ones, group=self.process_group, async_op=True)
                if self.ddp_uneven_inputs_config.ddp_join_throw_on_early_termination:
                    # Active ranks schedule an allreduce with zeros, inactive
                    # ranks schedule them with 1. If the result ! = 0 it
                    # indicates at least one rank has terminated and we should
                    # throw.
                    zeros = torch.zeros(1, device=self.device)
                    dist.all_reduce(zeros, group=self.process_group)
                    should_throw_stop_iteration = zeros.item()
                    if should_throw_stop_iteration:
                        raise RuntimeError(
                            "Detected at least one rank that exhausted inputs. Throwing across all ranks."
                        )
                else:
                    self.reducer._set_forward_pass_work_handle( # is join
                        work,
                        self.ddp_uneven_inputs_config.ddp_join_divide_by_initial_world_size,
                    )

            # Calling _rebuild_buckets before forward compuation,
            # It may allocate new buckets before deallocating old buckets
            # inside _rebuild_buckets. To save peak memory usage,
            # call _rebuild_buckets before the peak memory usage increases
            # during forward computation.
            # This should be called only once during whole training period.
            
            Use _rebuild_buckets to reset the bucket before forward propagation
            # Within this function, a new bucket may be allocated before the old bucket is released.
            To save on peak memory usage, call _rebuild_bucket before the peak memory usage increases during forward calculation.
            # This can only be called once during the entire training period.
            if torch.is_grad_enabled() and self.reducer._rebuild_buckets():
                logging.info("Reducer buckets have been rebuilt in this iteration.")

            If forward propagation parameters need to be synchronized, synchronize them
            if self.require_forward_param_sync:
                self._sync_params()

            if self.ddp_uneven_inputs_config.ddp_join_enabled:
                # Notify joined ranks whether they should sync in backwards pass or not.
                self._check_global_requires_backward_grad_sync(is_joined_rank=False)

            # Forward propagation
            if self.device_ids:
			        	# Multi-card situation
                inputs, kwargs = self.to_kwargs(inputs, kwargs, self.device_ids[0])
                output = self.module(*inputs[0], **kwargs[0])
            else:
                output = self.module(*inputs, **kwargs)

            If you need to synchronize the backward propagation gradient, call prepare_for_BACKWARD
            if torch.is_grad_enabled() and self.require_backward_grad_sync:
			        	When the DDP parameter find_unused_parameter is true, it will start a backtrace at the end of the forward, marking all unused parameters and setting them to ready in advance. This can be done with a BACKWARD in a subgraph, but this will sacrifice some time.

                self.require_forward_param_sync = True
                # We'll return the output object verbatim since it is a freeform
                # object. We need to find any tensors in this object, though,
                # because we need to figure out which parameters were used during
                # this forward pass, to ensure we short circuit reduction for any
                # unused parameters. Only if `find_unused_parameters` is set.
                if self.find_unused_parameters and not self.static_graph:
                    # Do not need to populate this for static graph.
                    self.reducer.prepare_for_backward(list(_find_tensors(output)))
                else:
                    self.reducer.prepare_for_backward([])
            else:
                self.require_forward_param_sync = False

        # TODO. Right now we add this sink for static_graph training only. once
        # this feature is stable, we will add this sink for all cases. E.g.
        # This sink can help capture more accuracte backward start time as well.
        if self.static_graph and self.num_iterations == 1:
            # Need to grab list of tensors from user output in order to pass
            # to custom autograd function.
            output_tensor_list, treespec = tree_flatten(output)
            passthrough_tensor_list = _DDPSink.apply(
                self.reducer,
                *output_tensor_list
            )
            # Reconstruct output data structure.
            output = tree_unflatten(passthrough_tensor_list, treespec)
        return output
Copy the code

_synC_params is used to synchronize model parameters. In particular, _distributed_broadcast_COALesced is used.

def _sync_params(self) :
    with torch.no_grad():
        # module buffer sync
        if self.will_sync_module_buffers():
            # Synchronize buffers across processes.
            # If we are running DDP with the join manager, we have to agree
            # upon a rank to sync module buffers from, since rank 0 may
            # already have been joined and have stale module buffers.
            if self.ddp_uneven_inputs_config.ddp_join_enabled:
                authoritative_rank = self._find_common_rank(
                    self._distributed_rank, True
                )
            else:
                # The process with rank 0 is considered the authoritative copy.
                authoritative_rank = 0
            self._distributed_broadcast_coalesced(
                self.modules_buffers[0],
                self.broadcast_bucket_size,
                authoritative_rank,
            )
Copy the code

0 x03 c + + world

Let’s move on to the C++ world to see how forward propagation is supported. It can be divided into three parts: preparation for forward transmission, reconstruction of the barrel, preparation for backward transmission.

3.1 Prepare for forward propagation

Here we increment num_iterations_ and record the time.

void Reducer::prepare_for_forward(a) {
  std::lock_guard<std::mutex> lock(mutex_);
  num_iterations_++; // This is incremented
  if (should_collect_runtime_stats()) {
    record_forward_compute_start_time();
  }
}
Copy the code

4.2 rebuild bucket

Next, the reconstruction bucket is divided into:

  • Configure various size limits.
  • Calculate the dimensions of the bucket.
  • Synchronization bucket indices.
  • Initialize the bucket.
bool Reducer::rebuild_buckets(a) {
  // Ensure reduction for previous backwards pass is finished. If user's model
  // has unused parameters for example, this will raise an error recommending to
  // run with find_unused_parameters=True, instead of the size mismatch
  // exception below.
  std::lock_guard<std::mutex> lock(mutex_);
  ensure_prior_reduction_finished(a);if (!should_rebuild_buckets() || rebuilt_params_.empty()) {
    return false;
  }

  std::vector<std::vector<size_t>> rebuilt_bucket_indices;
  // Configure various size limits
  std::vector<size_t> bucket_size_limits;
  bucket_size_limits.push_back(kDefaultFirstBucketBytes);
  bucket_size_limits.push_back(bucket_bytes_cap_);
  // Calculate the size of the bucket
  rebuilt_bucket_indices = compute_bucket_assignment_by_size(
      rebuilt_params_,
      bucket_size_limits,
      expect_sparse_gradients_[0],
      rebuilt_param_indices_);

  // For rebuilt bucket indices, it needs to be synced across all ranks.
  // Broadcast the newly rebuilt bucket indices from rank 0 in default.
  // After syncing up rebuilt bucket indices, initialize buckets for reducer.
  // start the database
  sync_bucket_indices(rebuilt_bucket_indices);

  has_rebuilt_bucket_ = true;
  rebuilt_params_.clear(a); rebuilt_param_indices_.clear(a);// Initialize the bucket
  initialize_buckets(std::move(rebuilt_bucket_indices));
  return true;
}
Copy the code

Let’s look at how to reconstruct it.

3.2.1 Calculate bucket dimensions

The key structure of compute_bucket_assignment_by_size is as follows. BucketAccumulator can be considered an actual bucket.

struct BucketAccumulator {
    std::vector<size_t> indices; // Bucket contents, is a list of tensors
    size_t size = 0; // Bucket size, such as several MB
  }; // The logical contents of the bucket

  // Keep vector of indices and size accumulator by tensor type and device.
std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
      buckets; // A list of all buckets. Each actual bucket can be considered a BucketAccumulator
Copy the code

Second, let’s look at the specific logic of compute_bucket_assignment_by_size:

  • Generate a calculation result result, and use the size of the tensors parameter to reserve space for result.

  • Generates a buckets, which is a list of all buckets, and each actual bucket can be considered a BucketAccumulator

  • Iterating over all tensors passed in, for each tensor:

    • If you have index, you get the index of the tensor.
    • If expected Sparse Gradients are configured, place the tensor in a bucket by itself because it won’t fit with other tensors.
    • Use tensor information to build the key of the bucket.
    • Find the bucket with key and get BucketAccumulator.
    • Insert the index of the new tensor into indices for the bucket, indices for tensor index List.
    • Increase the bucket size.
    • If necessary, set the initial value to the size limit.
    • If the size of the bucket is larger than the minimum limit, the size of the bucket has reached the maximum limit, and the bucket should be moved to a new bucket. The indice of the bucket has been moved to result.
      • Insert the contents of the bucket to return result, that is, when the bucket is too large, it is inserted into result first.
      • BucketAccumulator() is a reference to BucketAccumulator. BucketAccumulator() is a reference to BucketAccumulator. BucketAccumulator is a reference to bucket.
  • Insert the remaining intra-bucket indices into the return value result. Some of these have been inserted directly into result before.

  • Sort results by:

    • If tensor_indices is not empty, tensors are in gradient ready order and do not need to be sorted.
    • If tensor_indices is empty, sort by index, assuming that the order of tensors is the order they use (or the reverse of the order they gradient produces). This sort ensures that buckets are prepared in a continuous order.
    • List (reversed(bucket_indices))

Another thing to note: Because tensors are the parameters[0] in Python code, the parameters[0] are returned by parametes(), So DDP ends up starting AllReduce in the reverse order of model.parameters().

std::vector<std::vector<size_t>> compute_bucket_assignment_by_size(
    const std::vector<at::Tensor>& tensors,
    const std::vector<size_t>& bucket_size_limits, // Bucket size limit
    const std::vector<bool>& expect_sparse_gradient,
    const std::vector<int64_t>& tensor_indices) { Tensor_indices is not actually passed in for initialization
  // Either expect_sparse_gradient is not specified or it has as many elements
  // as the vector with tensors.
  TORCH_INTERNAL_ASSERT(
      expect_sparse_gradient.empty() ||
      (tensors.size() == expect_sparse_gradient.size()));
  TORCH_INTERNAL_ASSERT(tensors.size(a) >0);

  std::vector<std::vector<size_t>> result;
  result.reserve(tensors.size()); // Reserved size

  // Keep iterator into the size_limit vector by tensor type and device.
  // This is done so that we can use the consecutive bucket limits per type.
  std::unordered_map<
      BucketKey,
      std::vector<size_t>::const_iterator,
      c10::hash<BucketKey>>
      bucket_size_limit_iterators;

  // Local accumulator type for a single bucket.
  struct BucketAccumulator {
    std::vector<size_t> indices; // Bucket contents, is a list of tensors
    size_t size = 0; // Bucket size, such as several MB
  }; // The logical contents of the bucket

  // Keep vector of indices and size accumulator by tensor type and device.
  std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
      buckets; // A list of all buckets. Each actual bucket can be considered a BucketAccumulator

  for (size_t i = 0; i < tensors.size(a); i++) {// Iterate over all tensors passed in
    const auto& tensor = tensors[i]; // Get the tensor
    TORCH_CHECK(! tensor.is_sparse(), "No support for sparse tensors.");

    // when tensor_indices is empty, the index of tensors[i] assigned to
    // bucket is i, otherwise the tensor index is tensor_indices[i].
    auto tensor_index = i; // You have a tensor index for all tensor users, starting at 0 up to tensors. Size ()
    if(! tensor_indices.empty()) {
      tensor_index = tensor_indices[i]; // If there is index, get the index of the tensor
    }
    // If we expect a sparse gradient to be produced for this tensor, it cannot
    // be grouped together with other gradients and gets its own bucket.
    // If expected Sparse Gradient is configured, place the tensor in a bucket by itself because it won't fit with other tensors
    if(! expect_sparse_gradient.empty() &&
        expect_sparse_gradient[tensor_index]) {
      result.push_back({tensor_index});
      continue;
    }

    auto key = BucketKey(tensor.scalar_type(), tensor.device()); // Use tensor information to build the key of the bucket
    auto& bucket = buckets[key]; // Get BucketAccumulator
    bucket.indices.push_back(tensor_index); Indices = tensor index list
    bucket.size += tensor.numel() * tensor.element_size(a);// Increase the bucket size

    // Initialize bucket size limit iterator if necessary.
    // Set to the initial size limit if necessary
    if (bucket_size_limit_iterators.count(key) == 0) {
      bucket_size_limit_iterators[key] = bucket_size_limits.begin(a); }[_DEFAULT_FIRST_BUCKET_BYTES, int(bucket_CAP_MB * 1024 * 1024)] // Bucket_size_limit_iterator is a bucket size range.
    auto& bucket_size_limit_iterator = bucket_size_limit_iterators[key];
    const auto bucket_size_limit = *bucket_size_limit_iterator; // The current minimum limit
    if (bucket.size >= bucket_size_limit) { 
      // If the size of the bucket is larger than the minimum limit, the size of the bucket has reached the maximum limit, so it is necessary to transfer the bucket to a new bucket. The indice of the original bucket has been transferred to result.
      result.emplace_back(std::move(bucket.indices)); // Insert the contents of the bucket into result, that is, when the bucket is too large, it is inserted into result first.
      bucket = BucketAccumulator(a);// Create a new bucket, bucket is a reference, so it is the same as empty the old bucket, but the old bucket has been changed to result.

      // Advance to the next bucket size limit for this type/device.
      // Proceed to the next size limit
      auto next = bucket_size_limit_iterator + 1;
      if(next ! = bucket_size_limits.end()) { bucket_size_limit_iterator = next; }}}// Add remaining buckets. Add the rest of the buckets to the return value because some of them were inserted directly into result
  for (auto& it : buckets) {
    auto& bucket = it.second;
    if(! bucket.indices.empty()) {
      result.emplace_back(std::move(bucket.indices)); }}// If tensor_indices is not empty, the order of the tensors is in the gradient
  // ready order, so no need to sort.
  // If tensor_indices is empty, sort resulting buckets by the minimum tensor
  // index they include. We assume that the order of the tensors is the order in
  // which they are used (or the reverse order in which their gradients are
  // produced). This sorting step ensures that the buckets are ready in
  // consecutive order.
  // if tensor_indices is not empty, the order of tensors is gradient ready and no more sorting is needed
  Tensor_indices if tensor_indices is empty, sort by index, assuming that the order of tensors is the order they use (or the reverse of the order they gradient produces). This sort ensures that buckets are prepared in a continuous order.
  // The Reducer was created with a list(reversed(bucket_indices)).
  if (tensor_indices.empty()) {
    std::sort(
        result.begin(),
        result.end(), [] (const std::vector<size_t>& a, const std::vector<size_t>& b) {
          // For any two vectors, sort by the smallest index of the two vectors
          const auto amin = std::min_element(a.begin(), a.end()); // The smallest index in a
          const auto bmin = std::min_element(b.begin(), b.end()); // the smallest index in b
          return *amin < *bmin;
        });
  }

  return result;
}
Copy the code

The result is that you have a bucket for each vector, and then you have your index for the tensor, and you sort it from small to large. The Model parameters are assigned to the bucket in (roughly) model.parameters () in the opposite order to the given Model. The reason for using the reverse order is that DDP expects gradients to be ready in about that order during the reverse pass.

+-----------------------------------------------------------------------+
|                                                                       |
|  <tensor index 1, tensor index 2, tensor index 3, tensor index 4>     |
|                                                                       |
|                                                                       |
|  <tensor index 5, tensor index 6, tensor 7> | | | | | |... | | | | | | <tensor index8, tensor index 9, tensor index 10, tensor index 11>   |
|                                                                       |
+-----------------------------------------------------------------------+
Copy the code

3.2.2 Synchronization Bucket Indices

Once the size is generated, sync_bucket_indices is used to synchronize the indices of the bucket with the following logic:

  • Iterate through the bucket and record the size of the bucket to bucket_sizes.
  • Configuration TensorOptions.
  • Put indices and buckets into indices_tensor. Here you read and write tensors through PyTorch Accessor. Accessor is like a tensor, but it hardcodes the dimensions and dTypes of the tensors as template parameters to access elements efficiently.
  • Because processgroups like NCCL only support operations between devices, copy indices_tensor to indices_tensor_device.
  • Broadcast indices_tensor_device.
  • Similarly, the bucket size is broadcast.
  • After the broadcast, go through the bucket and update the bucket_indices with num_buckets, bucket_sizes_tensor, and INDICes_tensor received from Rank 0.
void Reducer::sync_bucket_indices(
    std::vector<std::vector<size_t>>& bucket_indices) {
  
  auto num_buckets = bucket_indices.size(a); std::vector<size_t> bucket_sizes;
  bucket_sizes.reserve(num_buckets);
  int64_t total_size = 0;
  
  // Iterate through the buckets and record the sizes to bucket_sizes
  for (size_t i = 0; i < num_buckets; i++) {
    auto bucket_size = bucket_indices.at(i).size(a); bucket_sizes.push_back(bucket_size);
    total_size += bucket_size;
  }

  / / configuration TensorOptions
  at::TensorOptions options;
  options = options.dtype(at::kInt);
  options = options.device(replicas_[0] [0].device());

  // Group indices and num_bucket together into indices_tensor
  // Broadcast this tensor first, as its size is equal among all processes
  // Put indices and buckets into indices_tensor. Here you read and write tensors through PyTorch Accessor. Accessor is like a tensor, but it hardcodes the dimensions and dTypes of tensors as template parameters
  auto indices_tensor = at::empty({total_size + 1}, at::kInt);
  auto indices_accessor = indices_tensor.accessor<int.1> ();auto indices_accessor_Index = 0;
  for (size_t i = 0; i < num_buckets; i++) {
    const auto& bucket_size = bucket_indices.at(i).size(a);for (size_t j = 0; j < bucket_size; j++) {
      indices_accessor[indices_accessor_Index++] = bucket_indices[i][j];
    }
  }
  indices_accessor[indices_accessor_Index] = num_buckets;

  // Copy CPU tensor to device tensor, as the process_group_ could be NCCL and
  // it can only broadcast device tensors.
  auto indices_tensor_device = at::empty({total_size + 1}, options);
  // Because processGroups like NCCL only support operations between devices, copy indices_tensor to indices_tensor_device
  indices_tensor_device.copy_(indices_tensor, /*non_blocking=*/true);
  std::vector<at::Tensor> indices_tensor_list = {indices_tensor_device};
  // Broadcast indices_tensor_device
  process_group_->broadcast(indices_tensor_list)->wait(a); indices_tensor.copy_(indices_tensor_list.front(), /*non_blocking=*/false);

  // Update num_buckets after receiving it from rank 0
  num_buckets = indices_accessor[indices_accessor_Index];

  // Broadcast bucket_sizes
  // Similarly, the bucket size is broadcast
  auto bucket_sizes_tensor = at::empty({(int64_t)num_buckets}, at::kInt);
  auto bucket_sizes_accessor = bucket_sizes_tensor.accessor<int.1> ();for (size_t i = 0; i < num_buckets; i++) {
    // For rank ! = 0, it is possible that local num buckets bucket_sizes.size()
    // is smaller than broadcasted num_buckets
    bucket_sizes_accessor[i] =
        bucket_sizes.at(std::min(i, (bucket_sizes.size() - 1)));
  }
  auto bucket_sizes_tensor_device = at::empty({(int64_t)num_buckets}, options);
  bucket_sizes_tensor_device.copy_(bucket_sizes_tensor, /*non_blocking=*/true);
  std::vector<at::Tensor> bucket_sizes_tensor_list = {
      bucket_sizes_tensor_device};
  process_group_->broadcast(bucket_sizes_tensor_list)->wait(a); bucket_sizes_tensor.copy_(
      bucket_sizes_tensor_list.front(), /*non_blocking=*/false);

  // Clear bucket_indices first, and then update bucket_indices using received
  // num_buckets, bucket_sizes_tensor and indices_tensor from rank 0
  bucket_indices.clear(a); bucket_indices.reserve(num_buckets);
  indices_accessor_Index = 0;
  Bucket_indices = num_buckets, bucket_sizes_tensor, and indices_tensor
  for (size_t i = 0; i < num_buckets; i++) {
    const auto& bucket_size = bucket_sizes_accessor[i];
    std::vector<size_t> bucket;
    bucket.reserve(bucket_size);
    for (size_t j = 0; j < bucket_size; j++) {
      bucket.push_back(indices_accessor[indices_accessor_Index++]);
    }
    bucket_indices.emplace_back(std::move(bucket)); }}Copy the code

3.2.3 Initializing a Bucket

Synchronization is followed by initialization of the bucket. This part of the code has been analyzed in the previous section, so it is omitted.

3.3 Prepare for backward propagation

After the forward propagation is complete, prepare for the backward propagation by calling prepare_for_BACKWARD.

There are roughly two steps: reset and find unused parameters.

void Reducer::prepare_for_backward(
    const std::vector<torch::autograd::Variable>& outputs) {
  std::lock_guard<std::mutex> lock(mutex_);

  // Record the start time
  cpu_timer_.backward_compute_start_time = current_time_in_nanos(a);if (should_collect_runtime_stats()) {
    record_backward_compute_start_time(a); }// Reset accounting.
  expect_autograd_hooks_ = true;
  reset_bucket_counting(a);// Reset unused parameter accounting.
  has_marked_unused_parameters_ = false;
  // Reset per iteration marked ready parameters.
  perIterationReadyParams_.clear(a);// Reset marked ready parameters for each iteration

  // If static graph is not set, search graph to detect unused parameters.
  // When static graph is set, unused_parameters_ will be detected and will
  // not change after 1st iteration.
  // If static_graph_ = false and find_unused_parameters_ is false,
  // we assume that autograd hooks for ALL variables will be called,
  // and we don't have to search the autograd graph for presence of these hooks.
  if (dynamic_graph_find_unused()) {
    unused_parameters_.clear(a);search_unused_parameters(outputs); // Find arguments that are not used}}Copy the code

3.3.1 reset

The buckets are iterated over, and for each bucket, the pending state of the model copy is reset. The pending state of a model copy is determined by the number of variables in the model copy corresponding to the bucket.

If it is static diagram reset numGradHooksTriggeredMapPerIteration_.

void Reducer::reset_bucket_counting(a) {
  next_bucket_ = 0;
  // Reset num_buckets_ready_ at the beginning of backward computation
  // in each iteration.
  num_buckets_ready_ = 0;

  for (auto& bucket : buckets_) { / / traverse the barrel
    for (auto& replica : bucket.replicas) {
      replica.pending = replica.variables.size(a);// For each bucket, reset the pending state of its copy. The pending status of a model copy is determined by the number of variables in the bucket
    }
    bucket.pending = bucket.replicas.size(a);// Resets the bucket's pending state, which is determined by the number of model replicas
  }

  if (static_graph_) {
    / / reset numGradHooksTriggeredMapPerIteration_numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_; }}Copy the code

3.3.2 Searching for Unused Parameters

Search_unused_parameters completes the “Find unused parameters” function.

We’ll start by looking at the find_unused_parameters_ member variable from the Reducer. If find_unused_parameters_ is set to true, at the end of the forward propagation, DDP backtracks from the specified output, traverses the autograd graph to find all unused parameters and marks them as ready.

For all parameters, DDP has a pointer to their gradient accumulation function, but for those parameters that do not exist in the Autograd diagram, they will be marked as ready the first time the Autograd hook is called.

Since the model output might be ignored, this operation is not done immediately, we just start the specification operation like in torch.autograd.Backward ().

As you can see, it’s expensive to do that. Why do you do that? This is because the computational dynamic graph changes.

  • During training, only one subgraph of the model may be used in one iteration, and since PyTorch is a dynamic calculation, the subgraph will change during the iteration, meaning that some parameters may be skipped during the next iteration.
  • Meanwhile, since all parameters are paged at the beginning, and the hook specifies that the communication will not take place until the whole bucket is ready (i.e., pending == 0), if we do not mark unused parameters as ready, the whole communication process will not take place.
// Traverse the autograd graph starting at the specified output.
// All parameters for which we have a pointer to their gradient accumulation
// functions, but don't show up in the autograd graph will be marked ready for
// for reduction as soon as the first autograd hook is called. This is not
// done immediately because the model output may be ignored, and we only
// want to start performing reductions on `torch.autograd.backward()`.
void Reducer::search_unused_parameters(
    const std::vector<torch::autograd::Variable>& outputs) {
  std::unordered_set<torch::autograd::Node*> seen;
  std::vector<torch::autograd::Node*> queue;

  RECORD_FUNCTION(
      "torch.distributed.ddp.reducer::search_unused_parameters",
      std::vector<c10::IValue>());

  // Seed queue with the grad functions of all outputs.
  for (const auto& output : outputs) {
    const auto& grad_fn = output.grad_fn(a);if (grad_fn) {
      queue.push_back(grad_fn.get()); // Insert the gradient functions of all output nodes into queue}}// Traverse the autograd graph starting at the specified output.
  For each function, find the next edge in the trailing graph, insert the node to which the next edge points into the queue, and continue the loop. Finally, seen is the gradient function for all nodes starting from output
  while(! queue.empty()) {
    auto fn = queue.back(a); queue.pop_back(a);for (const auto& edge : fn->next_edges()) {
      if (auto next_ptr = edge.function.get()) {
        const bool was_inserted = seen.insert(next_ptr).second;
        if (was_inserted) {
          queue.push_back(next_ptr); }}}}// Find accumulator functions that don't show up in this graph.
  // gradAccToVariableMap_ contains all variables that need to be specified
  // Iterate over gradAccToVariableMap_. If seen is not present, the parameter is not being used. Insert unused_parameters_
  for (const auto& it : gradAccToVariableMap_) {
    // If the accumulator function is present in the graph, we know
    // a gradient will be computed for the corresponding parameter.
    if (seen.count(it.first) == 0) {
      unused_parameters_.push_back(it.second); }}// Warn user about unnecessary perf hit if all parameters were used in
  // forward.
  if (unused_parameters_.empty()) {
    TORCH_WARN_ONCE(
        "find_unused_parameters=True was specified in DDP constructor, "
        "but did not find any unused parameters in the forward pass. This flag "
        "results in an extra traversal of the autograd graph every iteration, "
        " which can adversely affect performance. If your model indeed never "
        "has any unused parameters in the forward pass, consider turning this "
        "flag off. Note that this warning may be a false positive if your model "
        "has flow control causing later iterations to have unused parameters."); }}Copy the code

At this point, forward propagation is over, and we get the following:

  • The parameters needed to calculate the gradient have been pilled.
  • The barrel has been rebuilt.
  • Forward propagation is complete.
  • Backtracking from the specified output, the autograd calculation graph is traversed to find all unused parameters and marked as ready.

We will analyze backward propagation in the next article.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Pytorch distributed series 3 – distributed training, torch. Utils. Data. The distributed. DistributedSampler do?

Pytorch Distributed Series 1 — Clarify the environment variables associated with Torch.distributed. Launch

Pytorch distributed data framework – How to synchronize allel?

Pytorch (distributed) data parallel personal practice – DataParallel/DistributedDataParallel

The nn Pytorch DataParallel

Discuss.pytorch.org/t/dataparal…

Pytorch.org/docs/stable…

PyTorch source code interpretation of distributed training to understand?

PyTorch AutoGrad C++ layer implementation

PYTORCH Automatic Differential (1)

How does PyTorch speed up data parallel training? Distributed secrets revealed

Pytorch Distributed training (two init_process_group)

Pytorch.org/tutorials/i…

Pytorch.org/docs/master…

Pytorch.org/tutorials/i…

DP & DDP for PyTorch source Code Interpretation: Model parallel and Distributed training parsing

Parameters and Buffers in the Pytorch model