0 x00 the

We have already covered some of the supporting modules for DDP, which has provided the necessary groundwork for this article, which starts with the Python world code and the C++ world initialization section. The core code of the C++ world is described below.

Other articles in this series are as follows:

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

PyTorch distributed (3) —– DataParallel – gradient

PyTorch distributed (4)—— Distributed application concepts

—— DistributedDataParallel – what to use

DistributedDataParallel — gradient — gradient — — — — — —

—– DistributedDataParallel – conditional processing groups

PyTorch distributed (8) ——– DistributedDataParallel allel allel allel allel allel allel allel allel allel allel

0 x01 review

1.1 Data Parallelism

DDP is the realization of data parallel training, in order to wake up everyone’s memory, we still want to see a whole process of data parallel, from FairScale Github source code.

1.2 DDP architecture

The following text is from pytorch.org/docs/master…

Here are the DDP implementation components. The stack diagram shows the structure of the code.

We follow the architecture from the top down.

1.2.1 Distributed Data parallelism

At the top is the distributed data parallel component.

  • Distributed. Py:
    • This is the Python entry point for DDP. It implements the initialization step corresponding tonn.parallel.DistributedDataParallelThe moduleforwardFunction, which calls the C++ library.
    • It’s_sync_paramThe functionality is: when a DDP process is running on multiple devices, it performs in-process parameter synchronization, and it also broadcasts model buffers from rank 0 to all other processes.
    • Interprocess parameter synchronization inReducer.cppIn the implementation.
  • Comm.h: Implements the coalesced Broadcast Helper function, which is called during initialization to broadcast model state and synchronize model buffers prior to forward propagation.
  • Reducer. H: Provide the core implementation of gradient synchronization in back propagation. It has three entry point functions:
    • Reducer: its constructor is indistributed.pyIs called,ReducerWill be registeredReducer::autograd_hook()To the gradient accumulator.
    • autograd_hook()When the gradient is ready, the Autograd engine calls this function.
    • prepare_for_backward()indistributed.pyIs called when the DDP forward pass endsprepare_for_backward(). If in the DDP constructor, put thefind_unused_parametersSet toTrueDDP traverses the autograd calculation graph to find unused parameters.

1.2.2 process

The following are two process-related components.

  • Processgroup.hpp: contains the abstract API for all ProcessGroup implementations.c10dThe library provides three out-of-the-box implementations, namely ProcessGroupGloo, ProcessGroupNCCL, and ProcessGroupMPI.DistributedDataParallelwithProcessGroup::broadcast()The model state is sent from the process of Rank 0 to other processes during initialization, and theProcessGroup::allreduce()Sum of gradients.
  • Store.hpp: Assists collection services of process group instances in finding each other.

1.3 Overall IMPLEMENTATION of DDP

We combine the paper with pytorch.org/docs/master… Together, take a look at the overall DDP implementation.

We summarize the steps in gradient iteration in a distributeddataparty as follows:

  • Prerequisite:

    • DDP rely on c10dProcessGroupTo communicate. Therefore, the application mustProcessGroupCreate the instance before building the DDP.
  • Constuctor:

    • The rank 0 process will reference the local module and put the modelstate_dict()Parameters are broadcast to all processes to ensure that all processes are trained using the same initialization values and model copies.
    • Each DDP process creates a localReducerGradient synchronization will be handled later during backpass.
    • To improve communication efficiency,ReducerOrganize parameter gradients into buckets, one bucket at a time.
      • Initialize the bucket and assign parameters to the bucket in reverse order to improve communication efficiency.
      • You can configure the bucket size by setting the parameter bucket_CAP_MB in the DDP constructor.
      • The mapping from parameter gradients to buckets is determined at build time based on bucket size limits and parameter sizes. Model parameters are (roughly)Model.parameters()Buckets are allocated in reverse order from the given model. The reason for using the reverse order is that DDP expects gradients to be ready in about that order during the reverse pass.
      • An example is shown below. Please note that,grad0andgrad1inbucket1In, the other two gradients arebucket0In the. Of course, this assumption may not always be true, and when this happens, it can damage DDP backward velocity as it can’tReducerStart correspondence as early as possible.
    • In addition to the buckets,ReducerAutograd hooks are also registered during construction, one for each parameter. When the gradient is ready, these hooks fire during backpass. For each parameter, add grad_accumulator and autograd_hook.
  • Forward Pass:

    • Each process reads its own training data, and the DistributedSampler ensures that each process reads different data.
    • The DDP takes input and passes it to the local model.
    • The model is calculated forward and the result is set to OUT. Computing is now done on each process (CUDA device).
    • iffind_unused_parametersSet toTrue, DDP will analyze the output of the local model and traverse the calculation graph from OUT, marking unused parameters as ready, because the calculation graph will change every time, so it will traverse every time.
      • This Mode (Mode) allows running backwards on a subgraph of the model, and DDP traverses the Autograd graph by outputting out from the model and marks all unused parameters as ready to reduce the parameters involved in the back pass.
      • During the backward pass,ReducerIt just waits for the unprepared parameters, but it still specifies all buckets. Marking parameter gradients ready does not help DDP skip buckets, but it prevents DDP from waiting forever for non-existent gradients during backpass.
      • Note that traversing an Autograd diagram introduces additional overhead, so the application should only set it up if necessaryfind_unused_parametersforTrue
    • Back out. The model network output does not need to gather to rank 0, unlike DP.
  • Backward Pass:

    • backward()Call this function directly on LossTensorThis is out of the control of DDP, which uses autograd hooks registered at build time to trigger gradient synchronization. When a gradient is ready, its corresponding DDP hook on that gradient accumulator fires.
    • Perform all-reduce in autograd_hook. If the parameter index is param_index, the parameter param_index is used to obtain the parameter and is marked as ready. If all gradients in a bucket are ready, the bucket is ready.
    • When all gradients in a bucket are ready, they will be on that bucketReducerStart the asynchronousallreduceTo calculate the average gradient for all processes.
    • If all buckets are ready, wait for all All-reduce buckets to complete. When all the buckets are ready,ReducerWill block waiting for allallreduceThe operation is complete. When this is done, write the mean gradientparam.gradFields for all parameters.
    • The gradients of all processes will reduce, and after updating, everyone’s model weight will be the same. So after the backward propagation is complete, the Grad fields on the same parameter corresponding to different DDP processes should be equal.
    • You don’t need to broadcast parameters after each iteration like DP does. But Buffers still need to be broadcast from rank 0 to other processes at each iteration.
  • Optimizer Step:

    • From the optimizer’s point of view, it is optimizing the local model.

    • Model copies on all DDP processes can be kept in sync because they all start in the same state and have the same mean gradient in each iteration.

0 x02 initialization

Since the Python world can set member variables to classes at many times, we’ll start with __init__.

2.1 __init__

Its core logic is:

  • Set the device type.

  • Set the device IDs.

  • Set self.process_group to groupMember.world by default.

  • Configure various class member variables.

  • Check the parameters.

  • Set the bucket size.

  • Build parameters.

  • Broadcast the state_dict() of rank 0 to other workers to ensure that the initial model states of all workers are the same.

  • Reducer is established.

    The specific code is as follows:

class DistributedDataParallel(Module) :

    def __init__(
        self,
        module,
        device_ids=None,
        output_device=None,
        dim=0,
        broadcast_buffers=True,
        process_group=None,
        bucket_cap_mb=25,
        find_unused_parameters=False,
        check_reduction=False,
        gradient_as_bucket_view=False.) :

        super(DistributedDataParallel, self).__init__()

        Set the device type
        self.is_multi_device_module = len({p.device for p in module.parameters()}) > 1
        distinct_device_types = {p.device.type for p in module.parameters()}
        self.device_type = list(distinct_device_types)[0]

        Set device IDs
        if (
            device_ids is None
            or len(device_ids) == 0  # For backward compatibility.
            or self.device_type == "cpu"
            or self.is_multi_device_module
        ):

            self.device_ids = None
            self.output_device = None
        else:
            self.device_ids = [_get_device_index(x, True) for x in device_ids]
            if output_device is None:
                output_device = device_ids[0]
            self.output_device = _get_device_index(output_device, True)

        # set process group
        if process_group is None:
            self.process_group = _get_default_group()
        else:
            self.process_group = process_group

        Configure various member variables
        self.static_graph = False
        self.dim = dim
        self.module = module
        self.device = list(self.module.parameters())[0].device
        self.broadcast_buffers = broadcast_buffers
        self.find_unused_parameters = find_unused_parameters
        self.require_backward_grad_sync = True
        self.require_forward_param_sync = True
        self.ddp_uneven_inputs_config = _DDPUnevenInputsConfig(
            ddp_join_enabled=False,
            ddp_join_divide_by_initial_world_size=False,
            ddp_join_throw_on_early_termination=False,
        )
        self.gradient_as_bucket_view = gradient_as_bucket_view
        if hasattr(module, "_ddp_params_and_buffers_to_ignore"):
            self.parameters_to_ignore = module._ddp_params_and_buffers_to_ignore
        else:
            self.parameters_to_ignore = []

        # to check the parameters
        # Check that a module does not have Uninitialized parameters
        for param in module.parameters():
            if isinstance(param, torch.nn.parameter.UninitializedParameter):
                raise RuntimeError(
                    "Modules with uninitialized parameters can't be used with `DistributedDataParallel`. "
                    "Run a dummy forward pass to correctly initialize the modules"
                )
        # used for intra-node param sync and inter-node sync as wel
        self.broadcast_bucket_size = int(250 * 1024 * 1024)

        # reduction bucket size
        self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)
        # Whether to perform input tensor CPU to GPU copies on a side-stream
        self.use_side_stream_for_tensor_copies = (
            os.environ.get("PYTORCH_DDP_USE_SIDE_STREAM"."1") = ="1"
        )

        # Build parameters
        # TODO(wayi@): Remove this field since SPMD is no longer supported,
        # and also remove all the relevant unnecessary loops.
        # Module replication within process (single-process multi device)
        # this is not supported in the future
        self._module_copies = [self.module]
        # Build parameters for reducer.
        parameters, expect_sparse_gradient = self._build_params_for_reducer()
        # Verify model equivalence.
        dist._verify_model_across_ranks(self.process_group, parameters)
        
        
        # Sync params and buffers. Ensures all DDP models start off at the same value.
        # Broadcast the state_dict() of rank 0 to other workers to ensure that all workers have the same initial model state;
        self._sync_params_and_buffers(authoritative_rank=0)
        
        # In debug mode, build a mapping of parameter index -> parameter.
        ifdist._get_debug_mode() ! = dist._DistributedDebugLevel.OFF: param_to_name_mapping = self._build_param_to_name_mapping(parameters)else:
            param_to_name_mapping = {}
            
        # Builds reducer.
        self._ddp_init_helper(parameters, expect_sparse_gradient, param_to_name_mapping)
Copy the code

Let’s choose some important steps to analyze.

2.2 Construction Parameters

For DDP, the first key step is the build parameters. Note here that if the current situation is single-machine multi-GPU, i.e. single-process multi-device (just like DP), then the model needs to be replicated within the process.

But it won’t be supported in the future. It will be removed. So parameters are the set of parameters for [ToyModel], and parameters[0] are the parameters for ToyModel. BucketReplica will talk about that later.

    # TODO(wayi@): Remove this field since SPMD is no longer supported,
    # and also remove all the relevant unnecessary loops.
    # Module replication within process (single-process multi device)
    
    self._module_copies = [self.module] Build a list such as [ToyModel]
    # Build parameters for reducer.
    parameters, expect_sparse_gradient = self._build_params_for_reducer()
Copy the code

Let’s look at the important parameters in the model:

  • Parameter: Parameter that needs to be updated by the Optimizer during backpropagation. We can get throughmodel.parameters()You get these parameters.
  • Buffer: Parameters that do not need to be updated by the Optimizer during backpropagation. We can get throughmodel.buffers()You get these parameters.

2.2.1 _build_params_for_reducer

The _build_params_for_reducer is a reducer set parameter. The logic is as follows:

  • Iterate over _module_copies to get the (module, parameter) list modules_and_parameters. These parameters need to be derived and cannot be ignored in the list.
  • Use sets to remove arguments that might be shared among multiple modules.
  • Build a parameter list.
  • Check if a module expects a sparse gradient, and put the results in expect_sparse_gradient.
  • Get module parameters, which, along with the buffer below, are used to synchronize to other workers.
  • Get the module’s buffer, which module_buffers will use in subsequent synchronization.
  • Return the list of parameters and EXPECT_SPARse_gradient.
Self. _copies = [self.module]

def _build_params_for_reducer(self) :
        
        # Build tuple of (module, parameter) for all parameters that require grads.
        modules_and_parameters = [
            [
                (module, parameter)
                Get the list of modules
                for module_name, module in replica.named_modules()
                Get the argument list, and the argument is the derivative, not in the ignore list
                for parameter in [
                    param
                    # Note that we access module.named_parameters instead of
                    # parameters(module). parameters(module) is only needed in the
                    # single-process multi device case, where it accesses replicated
                    # parameters through _former_parameters.
                    for param_name, param in module.named_parameters(recurse=False)
                    if param.requires_grad
                    and f"{module_name}.{param_name}" not in self.parameters_to_ignore
                ]
            ]
            for replica in self._module_copies
        ]

        # Deduplicate any parameters that might be shared across child modules.
        Use sets to remove arguments that might be shared across modules
        memo = set()
        modules_and_parameters = [
            # "p not in memo" is the deduplication check.
            # "not memo.add(p)" is always True, and it's only there to cause "add(p)" if needed.
            [(m, p) for m, p in replica_mps if p not in memo and not memo.add(p)]
            for replica_mps in modules_and_parameters
        ]

        # Build list of parameters.
        Build a parameter list
        parameters = [
            list(parameter for _, parameter in replica)
            for replica in modules_and_parameters
        ]

        # Checks if a module will produce a sparse gradient.
        def produces_sparse_gradient(module) :
            if isinstance(module, torch.nn.Embedding) or isinstance(
                module, torch.nn.EmbeddingBag
            ):
                return module.sparse
            return False

        # Build list of booleans indicating whether or not to expect sparse
        # gradients for the corresponding parameters.
        # Parameter expected SPARSE Gradients
        expect_sparse_gradient = [
            list(produces_sparse_gradient(module) for module, _ in replica)
            for replica in modules_and_parameters
        ]

        # The following modules_params and modules_buffers are used for
        # param/buffer sync in _sync_params.
        Get module parameters, which, along with the buffer below, are used to synchronize to other workers
        self.modules_params = [
            list(self._get_parameters(m)) for m in self._module_copies
        ]
        # Collect buffers for modules, filtering out buffers that should be ignored.
        Get module's buffer, module_buffers will be used in future synchronization
        named_module_buffers = [
            [(buffer, buffer_name) for buffer_name, buffer in m.named_buffers()]
            for m in self._module_copies
        ]
        self.modules_buffers = [
            [
                buffer
                for (buffer, buffer_name) in module_buffers
                if buffer_name not in self.parameters_to_ignore
            ]
            for module_buffers in named_module_buffers
        ]

        return parameters, expect_sparse_gradient
Copy the code

The parameters example is shown below, and you can see that only the [0] element makes sense. The original [0] element itself contains four elements:

parameters = {list: 1} 
0 = {list: 4}           
 0 = {Parameter: 10} Parameter containing:\ntensor([[4.0381 e-02.3.8828 e-02.1  )   
 1 = {Parameter: 10} Parameter containing:\ntensor([0.0438.0.2033.0.2771.0.0721.)2 = {Parameter: 5} Parameter containing:\ntensor([[0.0094.0.1319.0.0713.0.3155.)3 = {Parameter: 5} Parameter containing:\ntensor([0.0008.0.0582.0.1245.0.2538, )
 __len__ = {int} 4
__len__ = {int} 1
Copy the code

2.2.2 modules_buffers

Where is self.modules_buffers used? It was later used for broadcast parameters, such as:

    # When running in join mode, checks and performs sync of module buffers if
    # the models have buffers that should be synchronized in the forward pass.
    def _check_and_sync_module_buffers(self) :
        if self.will_sync_module_buffers():
            authoritative_rank = self._find_common_rank(self._distributed_rank, False)
            self._distributed_broadcast_coalesced(
                self.modules_buffers[0], self.broadcast_bucket_size, authoritative_rank
            )
Copy the code

_find_common_rank is used here to get all the valid ranks currently used in DDP.

def _find_common_rank(self, input_rank, rank_cond) :
    # -1 indicates that this rank is not under consideration to be the
    # common_rank
    rank_to_use = torch.tensor(
        [input_rank if rank_cond else -1],
        device=self.device,
    )
    Use the MAX operation to get the maximum number
    dist.all_reduce(rank_to_use, op=ReduceOp.MAX, group=self.process_group)
    if rank_to_use.item() == -1:
        raise ValueError(
            "BUG! Expected rank_cond to be true for at least one process."
        )
    return rank_to_use.item() # Return to all ranks
Copy the code

2.3 Validation Model

Next comes the validation phase.

2.3.1 Background

Since the following code is used, let’s first look at the background knowledge broadcast. If you are not familiar with this part, you may wonder why broadcast can be broadcast from Rank 0 to other ranks when all ranks call the same broadcast code.

process_group->broadcast(vec)->wait(a);// Broadcast rank 0's meta to the corresponding device
Copy the code

We came to the torch/lib/c10d/ProcessGroupMPI CPP. As you can see, it uses MPI’s MPI_Bcast API for broadcast operations, where opts.rootrank is key.

c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupMPI::broadcast(
    std::vector<at::Tensor>& tensors,
    const BroadcastOptions& opts) {
  checkSingleTensor(tensors);
  std::function<void(std::unique_ptr<WorkEntry>&)> runFunc =
      [opts, this](std::unique_ptr<WorkEntry>& entry) {
        auto data = (entry->src)[0];
        c10::DeviceGuard guard(data.device());
        std::unique_lock<std::mutex> globalLock(pgGlobalMutex_);
        MPI_CHECK(MPI_Bcast( // Call the MPI API
            data.data_ptr(),
            data.numel(),
            mpiDatatype.at(data.scalar_type()),
            opts.rootRank, // Here is the key, just broadcast other ranks from root
            pgComm_));
      };
  auto entry = std::make_unique<WorkEntry>(&tensors, &tensors, std::move(runFunc));
  return enqueue(
      std::move(entry),
      "mpi:broadcast",
      c10::optional<std::vector<at::Tensor>>(tensors));
}
Copy the code

Opts is an instance of BroadcastOptions.

class BroadcastOptions:
    rootRank: int
    rootTensor: int
    timeout: timedelta
Copy the code

In the C++ world it corresponds to the following:

struct BroadcastOptions {
  int rootRank = 0;
  int rootTensor = 0;
  std::chrono::milliseconds timeout = kUnsetTimeout;
};
Copy the code

BroadcastOptions is automatically initialized to 0 by C++, so all processes of a rank use rootRank = 0 to call MPI_Bcast, which broadcasts from rank = 0 to other ranks.

c10::intrusive_ptr<ProcessGroup::Work> broadcast(
    std::vector<at::Tensor>& data,
    const BroadcastOptions& opts = BroadcastOptions()) override;
Copy the code

2.3.2 Specific code

Let’s look at how to validate the model.

_verify_model_across_ranks verifies that the parameters of the model (Replica 0) have the same size/strides after the broadcast.

    # Verify model equivalence.
    dist._verify_model_across_ranks(self.process_group, parameters)
Copy the code

The code below shows that _verify_model_across_ranks actually calls verify_replicA0_across_processes.

module.def(
    "_verify_model_across_ranks",
    &::c10d::verify_replica0_across_processes,
    py::arg("process_group"),
    py::arg("replicas"),
    py::call_guard<py::gil_scoped_release>());
Copy the code

In verify_replicA0_across_PROCESSES, model_replicas is the previous parameters, and the logic is as follows:

  • First, get the metadata from the model_replicas.
  • Then clone metadata to metadata_dev.
  • The metadata_dev of Process 0 is then broadcast to the corresponding device.
    • Each process will run the same code, but in process_group->broadcast, only rank 0 is set to root_rank, so only rank 0 is broadcast.
    • After broadcast, all processes have the same metadatA_dev, which is the data in process 0.
  • Then copy metadata_dev back to Control and compare control with model_replicas[0] to see if it is the same as the original.
    • Check that the control has the same size as the model_replicas.
    • So we’re using accessor here, LibTorch uses accessor for quick Tensor access, if the Tensor is on the CPU, if the Tensor is on the GPU, packed_accessor, This is mentioned in the section “Core Developers get a full look at PyTorch’s internals”.

The specific code is as follows:

// Verifies corresponding params in replica 0 have the same sizes/strides
// across processes.
void verify_replica0_across_processes( c10::intrusive_ptr
       <:processgroup>
         process_group, std::vector
        <:vector>
         > model_replicas)
        <:tensor>
        {
  size_t i = 0;
  for (const auto& t : model_replicas[0]) {
    i += 2 * t.dim(a); } at::TensorOptions options; options = options.dtype(at::kLong);
  auto metadata = at::empty({static_cast<long>(i)}, options);

  // Technically, process 0 is the broadcast source, so only process 0 needs
  // to populate metadata. But no harm keeping work aligned across processes.
  auto metadata_accessor = metadata.accessor<int64_t.1> (); i =0;
  // Copy the model_replicas[0] to metadata_accessor
  for (const auto& t : model_replicas[0]) {
    for (const auto& sz : t.sizes()) {
      metadata_accessor[i++] = sz;
    }
    for (const auto& str : t.strides()) { metadata_accessor[i++] = str; }}// Then clone metadata to metadata_dev
  auto metadata_dev = metadata.clone().to(model_replicas[0] [0].device());
  std::vector<at::Tensor> vec{metadata_dev};
  / / radio metadata_dev
  process_group->broadcast(vec)->wait(a);// Broadcast the meta of Process 0 to the corresponding device

  // After that, metadata_dev is the same for all processes
  // Technically, process 0 doesn't need to double-check metadata, because it
  // was the source. But no harm keeping work aligned.
  auto control = at::empty({static_cast<long>(i)}, options);
  // copy metadata_dev back to control
  control.copy_(metadata_dev, /*non_blocking=*/false);
  
  // Then compare control with model_replicas[0] to see if it is the same as the original
  auto control_accessor = control.accessor<int64_t.1> (); i =0;
  for (size_t p = 0; p < model_replicas[0].size(a); p++) {const auto& t = model_replicas[0][p];
    // I'd like to include which process we are in the message,
    // but ProcessGroup::getRank is not public!
    for (const auto& sz : t.sizes()) {
      TORCH_CHECK(
          sz == control_accessor[i++],
          "replicas[0][",
          p,
          "] in this process"
          " with sizes ",
          t.sizes(),
          " appears not to match sizes of the same param in process 0.");
    }
    for (const auto& str : t.strides()) {
      TORCH_CHECK(
          str == control_accessor[i++],
          "replicas[0][",
          p,
          "] in this process"
          " with strides ",
          t.strides(),
          " appears not to match strides of the same param in process 0."); }}}Copy the code

2.4 Broadcast Status

The next step is to broadcast the model’s initial parameters and variables from rank 0 to other ranks.

    # Sync params and buffers. Ensures all DDP models start off at the same value.
    # Broadcast the state_dict() of rank 0 to other workers to ensure that all workers have the same initial model state;
    self._sync_params_and_buffers(authoritative_rank=0)
Copy the code

Against 2.4.1 state_dict

Let’s see what we need to broadcast first.

Pytorch’s state_dict is a dictionary object that maps each layer of the model to its corresponding parameters, such as weights and biases for each layer of the Model. Only layers with trainable parameters (convolution, linear, etc.) are saved in the state_dict of the model, while layers with no parameters themselves (pooling, BN, etc.) are not saved in the state_dict, such as for the model below.

class ToyModel(nn.Module) :
    def __init__(self) :
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10.10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10.5)
Copy the code

State_dict is as follows:

self.module.state_dict() = {OrderedDict: 4} 
 'net1.weight' = {Tensor: 10} tensor([[ 0.2687.0.0840, -0.1032.0.3079.0.0385, -0.0495, -0.3068, -0.1271,\n         -0.1067, -0.1966],\n        [-0.1203.0.1789.0.0666.0.1882.0.1335.0.1921, -0.1145, -0.1781,\n          0.0661, -0.2339],\n        [ 0.1865, -0.2076.0.2071.0
 'net1.bias' = {Tensor: 10} tensor([ 0.2146, -0.1599.0.2350, -0.2843, -0.0773, -0.2151.0.1864, -0.3068,\n        -0.2093.0.1365])
 'net2.weight' = {Tensor: 5} tensor([[ 0.1922, -0.0148, -0.1884.0.2124, -0.1361.0.0172, -0.2371.0.1946,\n          0.2047, -0.2697],\n        [-0.2690.0.1372.0.2269.0.0436, -0.1353, -0.2054, -0.2418, -0.2300,\n          0.1987.0.0007],\n        [ 0.0995, -0.2659, -0.2374, -0
 'net2.bias' = {Tensor: 5} tensor([0.1488.0.0791.0.1667.0.1449.0.0545])
Copy the code

2.4.2 _sync_params_and_buffers

_sync_params_and_buffers collects trainable parameters from the state_dict of the module, and then broadcasts these parameters.

The specific code is:

    def _sync_params_and_buffers(self, authoritative_rank=0) :
        module_states = []
        for name, param in self.module.state_dict().items():
            if name not in self.parameters_to_ignore:
                module_states.append(param)

# module_states = {list: 4} [tensor ([[0.2687, 0.0840, 0.1032, 0.3079, 0.0385, 0.0495, 0.3068, 0.1271, \ n - 0.1067, 0.1966], and \ n [0.1203, 0.1789, 0.0666, 0.1882, 0.1335, 0.1921, 0.1145, 0.1781, 0.0661 \ n, 0.2339], the \ n [0.1865, 0.2076, 0.2071,
                
        if len(module_states) > 0:
            self._distributed_broadcast_coalesced(
                module_states, self.broadcast_bucket_size, authoritative_rank
            )
Copy the code

Distributed_broadcast_coalesced calls dist._broadcast_coalesced

import torch.distributed as dist

def _distributed_broadcast_coalesced(
        self, tensors, buffer_size, authoritative_rank=0
    ) :
        dist._broadcast_coalesced(
            self.process_group, tensors, buffer_size, authoritative_rank
        )
Copy the code

2.4.3 dist. _broadcast_coalesced

Let’s follow the code, starting with Torch \distributed_init_.py, which imports _broadcast_COALesced.

if is_available():
    from torch._C._distributed_c10d import (
        Store,
        FileStore,
        TCPStore,
        ProcessGroup,
        PrefixStore,
        Reducer,
        Logger,
        BuiltinCommHookType,
        GradBucket,
        _DEFAULT_FIRST_BUCKET_BYTES,
        _register_comm_hook,
        _register_builtin_comm_hook,
        _broadcast_coalesced, # Import here
        _compute_bucket_assignment_by_size,
        _verify_model_across_ranks,
        _test_python_store,
        _DistributedDebugLevel,
        _get_debug_mode
    )
    ifsys.platform ! ='win32':
        from torch._C._distributed_c10d import (
            HashStore,
            _round_robin_process_groups,
        )

    from .distributed_c10d import *  # noqa: F403
    # Variables prefixed with underscore are not auto imported
    # See the comment in `distributed_c10d.py` above `_backend` on why we expose
    # this.

    from .distributed_c10d import _backend, _all_gather_base
Copy the code

We continue to find Torch \ CSRC \distributed\c10d\init.cpp

  module.def(
      "_broadcast_coalesced".// Define a lambda such that the pybind11 prototype can take a std::vector
      // for the tensor list argument, but still pass it to the underlying
      // function as a c10::ArrayRef.
      [](c10::intrusive_ptr<::c10d::ProcessGroup> process_group,
         std::vector<at::Tensor> tensors, // NOLINT
         size_t buffer_size,
         int rank) {
        broadcast_coalesced( / / here
            std::move(process_group), tensors, buffer_size, rank);
      },
      py::arg("process_group"),
      py::arg("tensors"),
      py::arg("buffer_size"),
      // The source of truth rank to broadcast the tensors from.
      py::arg("src") = 0,
      py::call_guard<py::gil_scoped_release>());
Copy the code

Finally comes torch/lib/c10d/comm.cpp, where the tensor is broadcast using ProcessGroup.

// Broadcast many tensors to all processes in the process group.
void broadcast_coalesced(
    c10::intrusive_ptr<c10d::ProcessGroup> process_group,
    at::TensorList tensors,
    size_t buffer_size,
    int rank) {
  // Coalesce tensors into buckets taking into account the maximum buffer size.
  // This routine is multi-device aware, so the tensors can be split across
  // multiple devices and can contain a mix of CPU and CUDA tensors.
  // Count buckets first
  const auto buckets =
      compute_bucket_assignment_by_size(tensors.vec(), {buffer_size});

  // Returns tensor at specified index in input tensor list.
  const auto lookup = [&tensors](size_t index) { return tensors[index]; };

  // We maintain a maximum of 2 in flight broadcast operations to avoid
  // allocating too much memory (in case the specified tensors are very large).
  std::deque<BroadcastWork> in_flight; // Create a list of broadcast works
  constexpr auto max_in_flight = 2;
  for (const auto& bucket : buckets) { / / traverse the barrel
    if (in_flight.size() >= max_in_flight) { // The broadcast dimension is set to 2 to avoid excessive memory usage
      in_flight.front().finish(a);// Broadcast variables
      in_flight.pop_front(a); } in_flight.emplace_back(process_group, c10::fmap(bucket, lookup), rank);
  }

  while(! in_flight.empty()) {
    in_flight.front().finish(a); in_flight.pop_front();
  }
}
Copy the code

BroadcastWork uses a BroadcastWork ProcessGroup to broadcast tensors.

class BroadcastWork {
 public:
  BroadcastWork(
      const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
      std::vector<at::Tensor> bucket_tensors,
      int root_rank = 0)
      : bucket_tensors_(std::move(bucket_tensors)),
        flat_tensor_({torch::utils::flatten_dense_tensors(bucket_tensors_)}) {
    BroadcastOptions broadcastOptions;
    broadcastOptions.rootRank = root_rank;
    work_ = process_group->broadcast(flat_tensor_, broadcastOptions);
  }

  void finish(a) {
    work_->wait(a);// Copy the output of the broadcast operation back.
    auto output_tensors = torch::utils::unflatten_dense_tensors(
        flat_tensor_.front(), bucket_tensors_);
    TORCH_INTERNAL_ASSERT(output_tensors.size() == bucket_tensors_.size());
    for (size_t i = 0; i < output_tensors.size(a); i++) { bucket_tensors_[i].copy_(output_tensors[i], /*non_blocking=*/true); }}protected:
  // The list of tensors to broadcast. They are guaranteed to be
  // placed on the same device and have the same dtype.
  std::vector<at::Tensor> bucket_tensors_;

  // The vector with a single flattened tensor containing the contents
  // of the tensors in bucket_tensors_. It must be stored in a vector
  // because c10d::ProcessGroup::broadcast takes a vector argument.
  std::vector<at::Tensor> flat_tensor_;

 private:
  // The broadcast work that is kicked off upon construction.
  c10::intrusive_ptr<c10d::ProcessGroup::Work> work_;
};
Copy the code

2.5 Initializing Functions

The business function is then initialized with a call to _DDP_init_helper.

2.5.1 _ddp_init_helper

The _DDP_init_helper function is used to initialize services. Its main logic is as follows:

  • The parameters are divided into buckets, and the parameter distribution is evenly distributed into buckets according to the reverse order of forward propagation as far as possible (the gradient calculated first in forward propagation will be transmitted back first), which can improve the communication speed and merge speed.
  • Reset the bucket splitting status.
  • Generate a Reducer, which internally registers autograd_hook, which is used for gradient synchronization during backpropagation;
  • Logging configuration;
  • Pass the DDP Handle to the SyncBatchNorm Layer;

The specific code is as follows:

    def _ddp_init_helper(self, parameters, expect_sparse_gradient, param_to_name_mapping) :
        """ Initialization helper function that does the following: (1) bucketing the parameters for reductions (2) resetting the bucketing states (3) registering the grad hooks (4) Logging constructin-time DDP logging data (5) passing a handle of DDP to SyncBatchNorm Layer """
        self.num_iterations = 0
        # The bucket size limit is specified in the constructor.
        # Additionally, we allow for a single small bucket for parameters
        # that are defined first, such that their gradients don't spill into
        # a much larger bucket, adding unnecessary latency after gradient
        # computation finishes. Experiments showed 1MB is a reasonable value.
        bucket_indices = dist._compute_bucket_assignment_by_size(
            parameters[0],
            [dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap],
            expect_sparse_gradient[0],)# Note: reverse list of buckets because we want to approximate the
        # order in which their gradients are produced, and assume they
        # are used in the forward pass in the order they are defined.
        self.reducer = dist.Reducer(
            parameters,
            list(reversed(bucket_indices)), # Use bucket index
            self.process_group,
            expect_sparse_gradient,
            self.bucket_bytes_cap,
            self.find_unused_parameters,
            self.gradient_as_bucket_view,
            param_to_name_mapping,
        )

        self.logger = dist.Logger(self.reducer)

        # Set logging data that can be got during construction time.
        self.logger.set_construction_data_and_log(
            self.module.__class__.__name__,
            [] if self.device_ids is None else self.device_ids,
            -1 if self.output_device is None else self.output_device,
            self.broadcast_buffers,
        )

        # passing a handle to torch.nn.SyncBatchNorm layer
        self._passing_sync_batchnorm_handle(self._module_copies)
Copy the code

2.5.2 Calculating Buckets

First, _compute_bucket_assignment_by_size completes bucket splitting. Here parameters[0] is the list of corresponding tensors.

_DEFAULT_FIRST_BUCKET_BYTES = 1048576
# reduction bucket size
self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024)
        
bucket_indices = dist._compute_bucket_assignment_by_size(
            parameters[0].The bucket size limit is an array
            [dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap],
            expect_sparse_gradient[0],)Copy the code
2.5.2.1 Content of the paper

We are going to analyze it based on the content of the paper.

The idea of gradient bucketing is based on the observation that set communication is more efficient on large tensors.

Experiments have shown that DDP can achieve higher throughput and lower latency if it waits and stores multiple gradients into an AllReduce operation over a short period of time, rather than starting dedicated AllReduce as soon as each gradient store becomes available. This is especially useful for models with many small parameters. However, DDP should not transfer all data in one AllReduce, otherwise no communication can be started until the computation is complete.

Parameter-to-Bucket Mapping has a significant impact on DDP speed. On each backward propagation, the tensors in all parameter gradients are copied into the bucket, and the average gradient is copied back into the bucket after AllReduce. To speed up replication operations, buckets are always created on the same device as parameters. If the model spans multiple devices, DDP considers device affinity to ensure that all parameters in the same bucket are on the same device. The order of AllReduce also has an impact on the results, as it determines how much communication can overlap with calculations. DDP starts AllReduce in the reverse order of model.parameters().

Therefore, in order to improve communication efficiency, DDP organizes Reducer parameter gradient into buckets, one bucket at a time. The mapping from parameter gradients to buckets is determined at build time based on bucket size limits and parameter sizes. You can set bucket_CAP_MB to set the bucket size.

The Model parameters are assigned to the bucket in (roughly) model.parameters () in the opposite order to the given Model. The reason for using the reverse order is:

  • The order of back propagation is the reverse order of forward propagation calculation.
  • DDP expects gradients to be ready in the approximate order of the previous propagation during backpropagation.
2.5.2.2 Grouping basis

DDP is grouped by type and device as a key, because the tensor on different devices shouldn’t be on the same set, the tensor of the same type should be on the same bucket. Using type and device as keys ensures that tensors of the same type are allocated to the same bucket on the same device.

// Tensors may be coalesced into buckets. Buckets must contain tensors of
// the same type, on the same device, so a bucket can identified by a
// composite key of a tensor's type identifier and its device.
struct BucketKey {
  BucketKey(c10::ScalarType type, c10::Device device)
      : type(std::move(type)), device(std::move(device)) {}

  const c10::ScalarType type;
  const c10::Device device;

  // See torch/csrc/utils/hash.h for dispatch code.
  static size_t hash(const BucketKey& key) {
    return c10::get_hash(key.type, key.device); // Use type and device as keys}};Copy the code
2.5.2.3 compute_bucket_assignment_by_size

The key structure is as follows, BucketAccumulator can be considered an actual bucket.

struct BucketAccumulator {
    std::vector<size_t> indices; // Bucket contents, is a list of tensors
    size_t size = 0; // Bucket size, such as several MB
  }; // The logical contents of the bucket

  // Keep vector of indices and size accumulator by tensor type and device.
std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
      buckets; // A list of all buckets. Each actual bucket can be considered a BucketAccumulator
Copy the code

Let’s look at the specific logic of compute_bucket_assignment_by_size:

  • Defines a bucket size limit list. Bucket_size_limit_iterators.
  • Buckets are defined as a list of buckets. Each bucket can be considered a BucketAccumulator.
  • Pass through all tensors passed in:
    • Then you have a tensor index for all your tensor, starting at 0 and going up to tensors. Size (). If you have passed in indices, then you get the index of the tensor.
    • If expected Sparse Gradients are configured, place the tensor in a bucket by itself because it won’t fit with other tensors.
    • Use the tensor information to construct the key of the bucket and find the corresponding bucket.
      • Indices, tensor Index list for BucketAccumulator.
      • Increase the bucket size.
    • If necessary, set the initial value to the size limit.
    • Get the current minimum limit.
    • If the size of the bucket is greater than the minimum limit, the current size of the bucket has reached the maximum limit of the bucket, and it should be moved to a new bucket.
      • In fact, it does move to a new logical bucket, but it is still executed in the existing bucket, because the type and device are still the same, and should continue to accumulate in the original bucket, but the indice of the original bucket has been moved to result, so it is empty.
      • Insert the contents of the bucket to return result, that is, when the bucket is too large, it is inserted into result first.
      • Create a new bucket, bucket is a reference, so it is equivalent to empty the original bucket, that is, the original bucket continues to use, but the original bucket has been transferred to result.
      • Proceed to the next size limit.
    • Insert the rest of the in-bucket indices into the return value, as some were inserted directly into result before.
    • Sort results by:
      • If tensor_indices is not empty, tensors are in gradient ready order and do not need to be sorted.
      • If tensor_indices is empty, sort by index, assuming that the order of tensors is the order they use (or the reverse of the order they gradient produces). This sort ensures that buckets are prepared in a continuous order.
      • The Reducer was created with a list(reversed(bucket_indices).
    • And then you get result, and then you get result, and then you have a bucket for each vector, and then you have your index, and you sort it from small to large.
std::vector<std::vector<size_t>> compute_bucket_assignment_by_size(
    const std::vector<at::Tensor>& tensors,
    const std::vector<size_t>& bucket_size_limits, // Bucket size limit
    const std::vector<bool>& expect_sparse_gradient,
    const std::vector<int64_t>& tensor_indices) { Tensor_indices is not actually passed in for initialization
  // Either expect_sparse_gradient is not specified or it has as many elements
  // as the vector with tensors.
  TORCH_INTERNAL_ASSERT(
      expect_sparse_gradient.empty() ||
      (tensors.size() == expect_sparse_gradient.size()));
  TORCH_INTERNAL_ASSERT(tensors.size(a) >0);

  std::vector<std::vector<size_t>> result;
  result.reserve(tensors.size()); // Reserved size

  // Keep iterator into the size_limit vector by tensor type and device.
  // This is done so that we can use the consecutive bucket limits per type.
  std::unordered_map<
      BucketKey,
      std::vector<size_t>::const_iterator,
      c10::hash<BucketKey>>
      bucket_size_limit_iterators;

  // Local accumulator type for a single bucket.
  struct BucketAccumulator {
    std::vector<size_t> indices; // Bucket contents, is a list of tensors
    size_t size = 0; // Bucket size, such as several MB
  }; // The logical contents of the bucket

  // Keep vector of indices and size accumulator by tensor type and device.
  std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>>
      buckets; // A list of all buckets. Each actual bucket can be considered a BucketAccumulator

  for (size_t i = 0; i < tensors.size(a); i++) {// Iterate over all tensors passed in
    const auto& tensor = tensors[i]; // Get the tensor
    TORCH_CHECK(! tensor.is_sparse(), "No support for sparse tensors.");

    // when tensor_indices is empty, the index of tensors[i] assigned to
    // bucket is i, otherwise the tensor index is tensor_indices[i].
    auto tensor_index = i; // You have a tensor index for all tensor users, starting at 0 up to tensors. Size ()
    if(! tensor_indices.empty()) {
      tensor_index = tensor_indices[i]; // If there is index, get the index of the tensor
    }
    // If we expect a sparse gradient to be produced for this tensor, it cannot
    // be grouped together with other gradients and gets its own bucket.
    // If expected Sparse Gradient is configured, place the tensor in a bucket by itself because it won't fit with other tensors
    if(! expect_sparse_gradient.empty() &&
        expect_sparse_gradient[tensor_index]) {
      result.push_back({tensor_index});
      continue;
    }

    auto key = BucketKey(tensor.scalar_type(), tensor.device()); // Use tensor information to build the key of the bucket
    auto& bucket = buckets[key]; // Get BucketAccumulator
    bucket.indices.push_back(tensor_index); // Insert the tensor index into the tensor list
    bucket.size += tensor.numel() * tensor.element_size(a);// Increase the bucket size

    // Initialize bucket size limit iterator if necessary.
    // Set to the initial size limit if necessary
    if (bucket_size_limit_iterators.count(key) == 0) {
      bucket_size_limit_iterators[key] = bucket_size_limits.begin(a); }[_DEFAULT_FIRST_BUCKET_BYTES, int(bucket_CAP_MB * 1024 * 1024)] // Bucket_size_limit_iterator is a bucket size range.
    auto& bucket_size_limit_iterator = bucket_size_limit_iterators[key];
    const auto bucket_size_limit = *bucket_size_limit_iterator; // The current minimum limit
    if (bucket.size >= bucket_size_limit) { 
      / / if the size of the barrel is greater than the minimum limit, is currently the size of the bucket has reached the maximum limit of the barrel, normally need to transfer to the new barrels (actually moved to logically new barrels, but is actually executed within the existing barrel, because of the type, the device is the same, or should I continue to accumulate within the original barrel, The indice of the original bucket has been transferred to result.
      result.emplace_back(std::move(bucket.indices)); // Insert the contents of the bucket into result, that is, when the bucket is too large, it is inserted into result first.
      bucket = BucketAccumulator(a);// Create a new bucket, bucket is a reference, so it is the same as empty the old bucket, but the old bucket has been changed to result.

      // Advance to the next bucket size limit for this type/device.
      // Proceed to the next size limit
      auto next = bucket_size_limit_iterator + 1;
      if(next ! = bucket_size_limits.end()) { bucket_size_limit_iterator = next; }}}// Add remaining buckets. Add the rest of the buckets to the return value because some of them were inserted directly into result
  for (auto& it : buckets) {
    auto& bucket = it.second;
    if(! bucket.indices.empty()) {
      result.emplace_back(std::move(bucket.indices)); }}// If tensor_indices is not empty, the order of the tensors is in the gradient
  // ready order, so no need to sort.
  // If tensor_indices is empty, sort resulting buckets by the minimum tensor
  // index they include. We assume that the order of the tensors is the order in
  // which they are used (or the reverse order in which their gradients are
  // produced). This sorting step ensures that the buckets are ready in
  // consecutive order.
  // if tensor_indices is not empty, the order of tensors is gradient ready and no more sorting is needed
  Tensor_indices if tensor_indices is empty, sort by index, assuming that the order of tensors is the order they use (or the reverse of the order they gradient produces). This sort ensures that buckets are prepared in a continuous order.
  // The Reducer was created with a list(reversed(bucket_indices)).
  if (tensor_indices.empty()) {
    std::sort(
        result.begin(),
        result.end(), [] (const std::vector<size_t>& a, const std::vector<size_t>& b) {
          // For any two vectors, sort by the smallest index of the two vectors
          const auto amin = std::min_element(a.begin(), a.end()); // The smallest index in a
          const auto bmin = std::min_element(b.begin(), b.end()); // the smallest index in b
          return *amin < *bmin;
        });
  }

  return result; // The result will be a bucket for each vector, and then you'll have your index sorted from small to large.
}
Copy the code

The result is that you have a bucket for each vector, and then you have your index for the tensor, and you sort it from small to large.

Note here: Because the incoming parameters tensors are parameters[0], the parameters[0] follow the return of Parametes (), that is, the Model parameters are assigned to the bucket in (roughly) the reverse order of model.parameters () from the given Model. The reason for using the reverse order is that DDP expects gradients to be ready in about that order during the reverse pass. Eventually DDP starts AllReduce in the reverse order of model.parameters().

+-----------------------------------------------------------------------+
|                                                                       |
|  <tensor index 1, tensor index 2, tensor index 3, tensor index 4>     |
|                                                                       |
|                                                                       |
|  <tensor index 5, tensor index 6, tensor 7> | | | | | |... | | | | | | <tensor index8, tensor index 9, tensor index 10, tensor index 11>   |
|                                                                       |
+-----------------------------------------------------------------------+
Copy the code

2.5.3 Reducer

The following code generates a Reducer.

    self.reducer = dist.Reducer(
        parameters,
        list(reversed(bucket_indices)), # Use bucket index
        self.process_group,
        expect_sparse_gradient,
        self.bucket_bytes_cap,
        self.find_unused_parameters,
        self.gradient_as_bucket_view,
        param_to_name_mapping,
    )
Copy the code

Reducer will be introduced in detail in subsequent articles.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Pytorch distributed series 3 – distributed training, torch. Utils. Data. The distributed. DistributedSampler do?

Pytorch Distributed Series 1 — Clarify the environment variables associated with Torch.distributed. Launch

Pytorch distributed data framework – How to synchronize allel?

Pytorch (distributed) data parallel personal practice – DataParallel/DistributedDataParallel

The nn Pytorch DataParallel

Discuss.pytorch.org/t/dataparal…

Pytorch.org/docs/stable…

PyTorch source code interpretation of distributed training to understand?

PyTorch AutoGrad C++ layer implementation

PYTORCH Automatic Differential (1)

How does PyTorch speed up data parallel training? Distributed secrets revealed

Pytorch Distributed training (two init_process_group)

Pytorch.org/tutorials/i…

Pytorch.org/docs/master…

Pytorch.org/tutorials/i…

DP & DDP for PyTorch source Code Interpretation: Model parallel and Distributed training parsing

Parameters and Buffers in the Pytorch model

Pytorch.org/docs/master…