0 x00 the

In this article, we introduced DataPrallel – based parallel operations and back-propagation.

Other articles in this series are as follows:

Automatic Differentiation of Deep Learning Tools (1)

Automatic Differentiation of Deep Learning Tools (2)

Automatic differentiation of Deep Learning Tools (3) — Interpretation of examples

PyTorch implements forward propagation (1) — Base class (1)

PyTorch implements forward propagation (2) — Base class (2)

PyTorch how to implement forward propagation (3) – implementation

How to implement back propagation (1)—- call engine

Pytorch how to implement backward propagation (2)—- engine static structure

Pytorch how to implement backward propagation (3)—- engine dynamic logic

PyTorch how to implement backward propagation (4)—- specific algorithm

PyTorch distributed (1)—— History and Overview

PyTorch how to use GPU

PyTorch distributed (2) —– DataParallel – gradient

0x01 Forward Operation

Recalling the current forward diagram, REPLICATE calls broadcast. forward and stores input_device and num_inputs to its context.

+----------------------------------------------------------------------------------------+ | DataParallel.forward | | | | | | replicate +---------------> parallel_apply gather | | | +----------------------------------------------------------------------------------------+ +---------------------------+  | Broadcast | | | | | | | | forward() +-----------> | | | | | +---------------------+ | | | ctx | | | | input_device | | | | | | | | num_inputs | | | | | | | +---------------------+ | | | | | | | | | | | | | +---------------------------+Copy the code

1.1 the parallel

Currently, we have used Scatter to distribute and copy data from Device [0] to different cards, and Replicate the model from Device [0] to different cards with Replicate function. In this way, each card has the same model and different data. Now we call forward to calculate the loss and gradient, respectively. Parallel_apply.

Replicas = self. Module, replicate(inputs, kwargs, self. Elsidersif = elsif (replicas, inputs, kwargs) elsif = elsif (replicas, inputs, kwargs)Copy the code

Corresponding to our propagation diagram is:

Parallel_apply is based on the threading implementation, using replica and input data prepared earlier, then the for loop starts multi-threading for forward propagation, and finally outputs the propagation results.

def parallel_apply(modules, inputs, kwargs_tup=None, devices=None): Assert len(modules) == inputs # ensure that each GPU has its own metadata. Assert len(modules) == len(kwargs_tup) else: Kwargs_tup = ({},) * len(modules) # If devices is not None: assert len(modules) == len(devices) else: devices = [None] * len(modules) devices = [_get_device_index(x, Lock = threading.lock () results = {} grad_enabled, Is_grad_enabled (), is_grad_enabled() # def worker(I, module, input) kwargs, device=None): torch.set_grad_enabled(grad_enabled) if device is None: device = get_a_var(input).get_device() try: # Set current device with torch.cuda.device(device), autocast(enabled=autocast_enabled): # this also avoids accidental slicing of `input` if it is a Tensor if not isinstance(input, (list, tuple)): Input = (input,) output = module(*input, **kwargs) results[I] = output except Exception: with lock: results[i] = ExceptionWrapper( where="in replica {} on device {}".format(i, device)) if len(modules) > 1: If a process controls multiple Gpus, start multiple threads. Copy Threads = [threading.Thread(target=_worker, args=(I, module, input, kwargs), device)) for i, (module, input, kwargs, device) in enumerate(zip(modules, inputs, kwargs_tup, devices))] for thread in threads: thread.start() for thread in threads: thread.join() else: Outputs = [] for I in range(inputs: inputs[0], inputs[0], Inputs [0]) : outputs = [] for I in range(inputs (len(Inputs)): output = results[i] # error handle if isinstance(output, ExceptionWrapper): Outputs. Reraise () outputs. Append (outputs) # outputs n outputsCopy the code

At this point, the forward propagation corresponds to the figure below. Now the parallel operation calls the Forward method of Module.

+----------------------------------------------------------------------------------------+ | DataParallel.forward | | | | 1 2 3 | | replicate +---------------> parallel_apply gather | | | +----------------------------------------------------------------------------------------+ +---------------------------+  +-------------------+ | Broadcast | | module | | | | | | | | | | 1 | | 2 | | forward() +-----------> | forward() +---------> | | | | | | | | | +---------------------+ | | | | | ctx | | | | | | input_device | | | | | | | | | | | | num_inputs | | | | | | | | | | | +---------------------+ | | | | | | | | | | | | | | | | | | | | | | | | | | | +---------------------------+ +-------------------+Copy the code

1.2 Gather

Currently, we have used Scatter to distribute and copy data from Device [0] to different cards, and Replicate the model from Device [0] to different cards with Replicate function. In this way, each card has the same model and different data. We then call forward to calculate the loss and gradient, respectively. Parallel_apply.

All you need to do now is merge the gradient of distributed computing into device[0], self.output_device.

Replicas = self. Module, replicate(inputs, kwargs, self. Outputs = elsif (replicas, inputs) elsif (replicas, inputs) Kwargs) # Collect devices[0] Return self.gather(outputs, self.output_device)Copy the code

Corresponding to our propagation diagram is:

Let’s see how the results are gathered to device[0] and how device[0] acts as a parameter server.

1.2.1 Python world

Gather mainly calls gather. Apply (target_device, dim, *outputs) to complete the collection.

def gather(outputs, target_device, dim=0): [0] {"" very low voltage sors from different GPUs on a specified device (-1 means the CPU). """ def gather_map(outputs): out = outputs[0] if isinstance(out, torch.Tensor): Apply (target_device, dim, *outputs) # Call the following Gather if out is None: return None if isinstance(out, dict): return type(out)(((k, gather_map([d[k] for d in outputs])) for k in out)) return type(out)(map(gather_map, zip(*outputs))) # Recursive function calls like this create reference cycles. # Setting the function to None clears the refcycle. try: res = gather_map(outputs) finally: gather_map = None return resCopy the code

Gather calls comm. Gather to do the work, and comm. Gather leads us into the C++ world.

We omit some verification code.

Class Gather(Function): @staticMethod def forward(CTX, target_device, dim, *inputs): # target_device = device[0] # store several variables inside the context, Target_device = _get_device_index(target_device, True) ctx.target_device = target_device ctx.dim = dim ctx.input_gpus = tuple(i.get_device() for i in inputs) if all(t.dim() == 0 for t in inputs) and dim == 0: inputs = tuple(t.view(1) for t in inputs) ctx.unsqueezed_scalar = True else: ctx.unsqueezed_scalar = False ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs) return comm.gather(inputs, Ctx.dim, ctx.target_device) @staticMethod def backward(CTX, grad_output) # notice, Scattered_grads = Scatter. Apply (CTx.input_gpus, ctx.input_sizes, ctx.dim, grad_output) if ctx.unsqueezed_scalar: scattered_grads = tuple(g[0] for g in scattered_grads) return (None, None) + scattered_gradsCopy the code

Now the forward calculation is shown as follows:

Gather calls the Gather forward function, which stores input_gPUS, input_sizes, and dim in CTX, which will be used later.

+-----------------------------------------------------------------------------------------+ | DataParallel.forward | | |  | 1 2 3 | | replicate +---------------> parallel_apply +--------------> gather | | | +-----------------------------------------------------------------------------------------+ +---------------------------+ +-------------------+ +--------------------+ | Broadcast | | module | |Gather | | | | | | | | | | | | | | 1 | | 2 | | 3 | | forward() +-----------> | forward() +--------> | forward() | | | | | | | | | | | | | |  +---------------------+ | | | | +----------------+ | | | ctx | | | | | |ctx | | | | input_device | | | | | | input_gpus  | | | | | | | | | | | | | | num_inputs | | | | | | input_sizes| | | | | | | | | | | | | +---------------------+ | | | |  | dim | | | | | | | +----------------+ | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | +---------------------------+ +-------------------+ +--------------------+Copy the code

1.2.2, c + + world

The Gather function calls _gather_out_impl to complete the copy operation.

at::Tensor gather( at::TensorList tensors, int64_t dim, C10 ::optional<int32_t> destination_index) {// destination_index is index of device[0] int64_t total_size = 0; auto& first = tensors.front(); const auto first_size = first.sizes(); dim = at::maybe_wrap_dim(dim, first); std::vector<int64_t> expected_size(first_size.begin(), first_size.end()); auto memory_format = first.suggest_memory_format(); for (size_t i = 0; i < tensors.size(); i++) { const auto& tensor = tensors[i]; expected_size[dim] = tensor.size(dim); total_size += tensor.size(dim); if (memory_format ! = MemoryFormat::Contiguous && tensor.suggest_memory_format() ! = memory_format) { memory_format = MemoryFormat::Contiguous; } } expected_size[dim] = total_size; at::Device device(DeviceType::CPU); // If (! destination_index || *destination_index ! Device = at::Device(DeviceType::CUDA, destination_index?) {DeviceType::CUDA, destination_index? *destination_index : -1); } // First, build an empty target tensor on the target device, Call result at::Tensor result = at::empty(Expected_size, first.options().device(device), memory_format); return _gather_out_impl(tensors, result, dim); // Then gather result}Copy the code

_Gather_out_impl performs the specific Gather operation, which copies the input Tensors onto the target tensor, i.e. onto GPU0.

// ***************** Gather ******************* // // Gather a list of CUDA tensors on one or more devices to a target tensor or // device, either CPU or CUDA. // no checks static inline at::Tensor& _gather_out_impl( at::TensorList tensors, at::Tensor& out_tensor, int64_t dim) { std::vector<int64_t> chunk_sizes; chunk_sizes.reserve(tensors.size()); for (auto& tensor : tensors) { chunk_sizes.push_back(tensor.size(dim)); } auto chunks = out_tensor.split_with_sizes(/*split_sizes=*/chunk_sizes, /*dim=*/dim); for (size_t i = 0; i < tensors.size(); I++) {/ / copy to the GPU above 0 chunks [I] copy_ (tensors [I], / * non_blocking = * / out_tensor is_cuda ()); } return out_tensor; }Copy the code

0x02 Calculate the loss

Now that we have collected the gradient on device[0], we can start back propagation, with the overall logic shown in the figure above. The first is to calculate the loss at device[0]. In fact, this step can be regarded as the intermediate link between forward calculation and backward propagation, and it can be regarded as the beginning of backward propagation, as shown in the figure below.

Let’s take a look at the sample code, which contains the key points:

  1. The data is already on the default GPU, GPU 0.
  2. Prediction is the forward calculation output of gather to GPU 0.
  3. useloss = criterion(prediction,target_var)Computes loss on the default GPU.
  4. Start backpropagation with loss.Backward ().
for batch_idx, (data, label) in pbar: if args.cuda: data,label= data.cuda(),label.cuda(); Data_v = Variable(data) target_var = Variable(label) prediction= model(data_v,target_var,args) # 2. DataParallel. Forward () # Prediction is based on the combination of two Gpus, allel, gradient, etc. The computation amount of each GPU is batch_size/len(device_ids). After the forward propagation, aggregate the result into the main GPU. Criterion = nn criterion(prediction,target_var) # 3. Computes Loss Optimizer.zero_grad () loss. Backward () # 4. Start back propagation with optimizer.step()Copy the code

0x03 Backward Propagation

We ran the Forward part above, calculated the losses, and then ran the Loss.Backward () part of the code above.

3.1 Distribution Gradient

We first come to the distribution gradient part, which is used to scatter losses between GPUs so that subsequent backward propagation can be independently carried out on each GPU. Corresponding to the following figure:

3.1.1 Gather. Backward

As mentioned above, Prediction is the forward calculation output of gather to GPU 0. Loss is calculated according to prediction, so reverse propagation starts from Loss-.Backward (), and the first step from back to forward comes to the propagation operation of Gather, corresponding to the backward function of Gather, The core code is Scatter. Apply.

Class Gather(Function): @staticMethod def forward(CTX, target_device, dim, *inputs): # target_device = device[0] # store several variables inside the context, Target_device = _get_device_index(target_device, True) ctx.target_device = target_device ctx.dim = dim ctx.input_gpus = tuple(i.get_device() for i in inputs) if all(t.dim() == 0 for t in inputs) and dim == 0: inputs = tuple(t.view(1) for t in inputs) ctx.unsqueezed_scalar = True else: Ctx. unsqueezed_scalar = False ctx.input_sizes = tuple(i.size(ctx.dim) for I in inputs) # this will enter the C++ world and concentrate the output on GPU 0. Return comm. Gather (inputs, ctx.dim, ctx.target_device) @staticmethod def backward(CTX, grad_output): Fetch variables stored in context propagated forward, Scattered_grads = Scatter. Apply (CTx.input_Gpus, ctx.input_sizes, CTx.DIM, grad_output) if ctx.unsqueezed_scalar: scattered_grads = tuple(g[0] for g in scattered_grads) return (None, None) + scattered_gradsCopy the code

As can be seen below, BACKWARD uses ctx.input_gpus, ctx.input_sizes, ctx.DIM, grad_output stored when backward propagates, and calls Scatter. Apply.

In the figure, the forward propagation is at the top, the back propagation is at the bottom, and in the middle are some code modules used in both forward and backward propagation.

+--------------------------------------------------------------------------------------+ | DataParallel.forward | | | | 1 2 3 | | replicate +---------------> parallel_apply +--------------> gather | | | +--------------------------------------------------------------------------------------+ +---------------------------+ +-------------------+ +--------------------+ | Broadcast | | module | |Gather | | | | | | | | | | | | | | 1 | | 2 | | 3 | | forward() +-----------> | forward() +--------> | forward() | | | | | | | | | | | | | | +---------------------+ | | |  | +----------------+ | | | ctx | | | | | |ctx | | | | input_device | | | | | | input_gpus | | | | | | | | | | | | | | num_inputs | | | | | | input_sizes| | | | | | | | | | | | | +---------------------+ | | | | | dim | | | | | | | +----------------+ | | | | | | | | | | | | | | | | | <---------+ backward() | | | | | | 3 | | | | | | | +---------------------------+ +-------------------+ +--------------------+ +--------------------------------------------------------------------------------------+ | loss.backward() | | 3 | | <--------------------+ | | | | | +--------------------------------------------------------------------------------------+Copy the code

3.1.2 Scatter

Scatter. Apply actually calls its forward method.

  • The previously stored variables are first extracted from the context, primarily the input devices input_device (source device) and target_gpus (target device).
  • Gets the flow to the target device.
  • Call Comm. Scatter to distribute gradients to target devices.
class Scatter(Function): @staticmethod def forward(ctx, target_gpus, chunk_sizes, dim, input): target_gpus = [_get_device_index(x, True) for x in target_gpus] ctx.dim = dim ctx.input_device = input.get_device() if input.device.type ! = "cpu" else -1 streams = None if torch.cuda.is_available() and ctx.input_device == -1: # Perform CPU to GPU copies in a background stream streams = [_get_stream(device) for device in target_gPUS] # Perform CPU to GPU copies in a background stream streams = [_get_stream(device) for device in target_gpus outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams) # Synchronize with the copy stream if streams is not None: for i, output in enumerate(outputs): with torch.cuda.device(target_gpus[i]): main_stream = torch.cuda.current_stream() main_stream.wait_stream(streams[i]) output.record_stream(main_stream) return outputs @staticmethod def backward(ctx, *grad_output): return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)Copy the code

3.1.3 c + +

The python code above outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams) directly into the C++ world. The code is located in Torch/CSRC/CUDA /comm.cpp.

What Scatter does is take the tensor and split it and distribute it to the flow of the devices.

std::vector<at::Tensor> scatter(
    const at::Tensor& tensor,
    at::IntArrayRef devices,
    const c10::optional<std::vector<int64_t>>& chunk_sizes,
    int64_t dim,
    const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>&
        streams) {
  dim = at::maybe_wrap_dim(dim, tensor);
  
  // 把tensor进行split
  std::vector<at::Tensor> chunks = chunk_sizes
      ? tensor.split_with_sizes(/*split_sizes=*/*chunk_sizes, /*dim=*/dim)
      : tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim);
  
  at::cuda::OptionalCUDAStreamGuard cuda_guard;
  for (size_t i = 0; i < chunks.size(); ++i) {
    const auto device_index = static_cast<int16_t>(devices[i]);
    if (device_index != tensor.get_device()) {
      if (i < (streams ? streams->size() : 0U) && (*streams)[i]) {
        cuda_guard.reset_stream(*(*streams)[i]);
      }
      // 发送给各个设备的流
      chunks[i] = chunks[i].to( 
          {DeviceType::CUDA, device_index},
          /*non_blocking=*/true,
          /*copy=*/false,
          /*memory_format=*/at::MemoryFormat::Preserve);
    }
  }
  return chunks;
}
Copy the code

3.2 Parallel backward propagation

Now the gradient has been distributed to each GPU, and then the parallel backward propagation officially enters. This part of the function is to run backward propagation on each GPU in parallel and calculate parameter gradient. Corresponding to the following figure:

This part calls BACKWARD of the original model, which is specific to value 4 in the figure below:

+--------------------------------------------------------------------------------------+ | DataParallel.forward | | | | 1 2 3 | | replicate +---------------> parallel_apply +--------------> gather | | | +--------------------------------------------------------------------------------------+ +---------------------------+ +-------------------+ +--------------------+ | Broadcast | | module | |Gather | | | | | | | | | | | | | | 1 | | 2 | | 3 | | forward() +-----------> | forward() +--------> | forward() | | | | | | | | | | | | | | +---------------------+ | | |  | +----------------+ | | | ctx | | | | | |ctx | | | | input_device | | | | | | input_gpus | | | | | | | | | | | | | | num_inputs | | | | | | input_sizes| | | | | | | | | | | | | +---------------------+ | | | | | dim | | | | | | | +----------------+ | | | | | | | | | | | | | | | <---------+ backward() | <---------+ backward() | | | | 4 | | 3 | | | |  | | | +---------------------------+ +-------------------+ +--------------------+ +--------------------------------------------------------------------------------------+ | loss.backward() | | 4 3 | | <------------------+ <--------------------+ | | | | | +--------------------------------------------------------------------------------------+Copy the code

3.3 Merge gradient

This part of the function is to merge the gradient on GPU 0. The overall process expansion corresponds to the following figure:

3.3.1 Broadcast. Backward

This part corresponds to the reverse propagation of Broadcast.

class Broadcast(Function): @staticmethod def forward(ctx, target_gpus, *inputs): Target_gpus = [_get_device_index(x, True) for x in target_gpus] Inputs ctx.target_gpus = target_gpus if len(Inputs) == 0: Return tuple() ctx.num_inputs = len(inputs) # input Inputs = inputs[0]. Get_device () # elsif outputs = inputs[0 Comm.broadcast_coalesced (Inputs, CTx. target_gPUS) non_differentiables = [] # Set the context which does not need a gradient for IDX, input_requires_grad in enumerate(ctx.needs_input_grad[1:]): if not input_requires_grad: for output in outputs: non_differentiables.append(output[idx]) ctx.mark_non_differentiable(*non_differentiables) return tuple([t for tensors in Outputs for t in tensors]) @staticmethod def backward(CTX, *grad_outputs): Ctx. input_device is the previously stored GPU 0. return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)Copy the code

Therefore, we can expand the flow chart:

+--------------------------------------------------------------------------------------+ | DataParallel.forward | | | | 1 2 3 | | replicate +---------------> parallel_apply +--------------> gather | | | +--------------------------------------------------------------------------------------+ +---------------------------+ +-------------------+ +--------------------+ | Broadcast | | module | |Gather | | | | | | | | | | | | | | 1 | | 2 | | 3 | | forward() +-----------> | forward() +--------> | forward() | | | | | | | | | | | | | | +---------------------+ | | |  | +----------------+ | | | ctx | | | | | |ctx | | | | input_device | | | | | | input_gpus | | | | | | | | | | | | | | num_inputs | | | | | | input_sizes| | | | | | | | | | | | | +---------------------+ | | | | | dim | | | | | | | +----------------+ | | | | | | | | | | | | | | backward() | <---------+ backward() | <---------+ backward() | | 5 | | 4 | | 3 | | | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + +--------------------------------------------------------------------------------------+ | loss.backward() | | 5 4 3 | |  <------------------------+ <------------------+ <--------------------+ | | | | | +--------------------------------------------------------------------------------------+Copy the code

3.3.2 rainfall distribution on 10-12 ReduceAddCoalesced

Broadcast. Backward calls the ReduceAddCoalesced. Apply, which corresponds to the forward method of ReduceAddCoalesced, in order to merge the gradient to the target device destination, namely GPU 0.

class ReduceAddCoalesced(Function): Destination is GPU 0 def forward(CTX, destination, num_inputs, *grads): [I]. Get_device () for I in range(0, len(grads), Grads_ = [I: I + NUM_inputs] for I in range(0, len(inputs), NUM_inputs)] Outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs outputs return (None, None,) + Broadcast.apply(ctx.target_gpus, *grad_outputs)Copy the code

3.3.3 c + +

Add gradients from multiple Gpus, merge and add.

def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760): """Sums tensors from multiple GPUs. Small tensors are first coalesced into a buffer to reduce the number of synchronizations. Args: inputs (Iterable[Iterable[Tensor]]): iterable of iterables that contain tensors from a single device. destination (int, optional): a device on which the output will be placed (default: current device). buffer_size (int): maximum size of the buffer used for coalescing Returns: A tuple of tensors containing an elementwise sum of each group of inputs, placed on the ``destination`` device. """ dense_tensors: List[List] = [[] for _ in inputs] # shape (num_gpus, num_tensors) output = [] ref_order = [] # process sparse ones first since they may have different sizes on different gpus for tensor_at_gpus in zip(*inputs): if all(t.is_sparse for t in tensor_at_gpus): Result = reduce_add(tensor_at_gPUS, destination) # this will be sparse too output.append(result) ref_order.append(tensor_at_gpus[0]) else: for coll, t in zip(dense_tensors, tensor_at_gpus): coll.append(t.to_dense() if t.is_sparse else t) ref_order.append(dense_tensors[0][-1]) itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors] # now the dense ones, which have consistent sizes for chunks in zip(*itrs): Flat_tensors = [_flatten_dense_tensors(chunks) for chunks] # (num_gpus,) # merge flat_result = reduce_add(flat_tensors, destination) for t in _unflatten_dense_tensors(flat_result, chunks[0]): # The unflattened tensors do not share storage, and we don't expose # base flat tensor anyways, so give them different version counters. # See NOTE [ Version Counter in comm.*_coalesced ] output.append(t.data) return  tuple(_reorder_tensors_as(output, ref_order))Copy the code

3.4 Updating model parameters

This part of the function is: update gradient parameters. Perform gradient descent and update model parameters on the main GPU.

In addition, since model parameters are only updated on the master GPU, while other slave Gpus are not synchronously updated at this time, the updated model parameters need to be copied to the remaining slave Gpus to achieve parallelism. This is done in the next for loop, and so on.

The corresponding example code is:

for batch_idx, (data, label) in pbar: # 6. If args.cuda: data,label= data.cuda(),label.cuda(); Data_v = Variable(data) target_var = Variable(label) prediction= model(data_v,target_var,args) # 2. DataParallel. Forward () # Prediction is based on the combination of two Gpus, allel, gradient, etc. The computation amount of each GPU is batch_size/len(device_ids). When the computation is finished, add the result to the main GPU. Criterion = nn criterion(prediction,target_var) # 3. Computes Loss Optimizer.zero_grad () loss. Backward () # 4. Start propagating back optimizer.step() # 5. Update the modelCopy the code

0 x04 summary

To summarize the process, initially the data and model are put into the default GPU, which is GPU 0, and then iterated as follows:

Corresponding to the figure below.

  1. Scatter distributes data to other Gpus.
  2. Replicate distributes models to other Gpus.
  3. Parallel_apply starts multiple threads for forward computation.
  4. Gather gathers the output to GPU 0.
  5. GPU 0 calculates losses.
  6. Scatter gradients to other Gpus.
  7. The model invokes BACKWARD calculations.
  8. Merge the gradient to GPU 0.
  9. Optimizer.step updates the model.
+-----+ +-------+ |GPU1 | | GPU1 | main thread +-----+ +-------+ +-----> Forward----> scatter +--------------> replicate-------> parallel_apply +--------> gather +---------+ + + + | 1 | 2 | 3 | | | | | | | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + | -- - | | | | | | | + -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | | | | | | | | | | 2  | 2 | | 2 thread|1 thread 2 thread 3 | 1 | | 1 | | 1 | | | | | | | v | v | v | | | | v v v v v v | +--+---+ +--+---+ +--+---+ +--+---+ +--+---+ +--+---+ +-------+ | | GPU1 | | GPU2 | | GPU3 | | GPU1 | | GPU2 | | GPU3 | | GPU1 | | + -- -- -- -- -- - + + + + -- -- -- -- -- -- -- -- -- -- -- - + + - + - + + - + - + + - + - + + - + - + - + + | | | | ^ ^ ^ | | | | 4 | | | | | | -- -- -- -- -- -- -- -- -- -- ^ | | | | 4 | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | +------------------------------------------------------------------------------------------------------+ | +------+ | | GPU1 | | +------+ main thread +-> loss = criterion(...) +-----> scatter +--------------> model.backward() +----------> reduce gradient +-------> optimizer.step + + + +------+ 9  | 5 | 6 | 7 | GPU1 | | | | +--+---+ | v---------------v +--------------------+ ^ | | | | | | | | 8 | | | | thread 1 thread 2 thread 3 | | | | | + | | +-------------+ | | | | | | | | | | v v v v v v v | | | +--+---+ +---+-+ +--+--+ +-+---+ +--+--+ +---+--+ +--+--+ +--+--+ +-+--+ +-+---+ | GPU1 | | GPU1| | GPU2| |GPU3 | | GPU1| | GPU2 | |GPU3 | | GPU1| |GPU2| | GPU3| +------+ +-----+ +-----+ +-----+ +-----+ +------+ +-----+ +-----+ +----+ +-----+Copy the code

The mobile phone is as follows:

At this point, DP analysis is finished, we will introduce some DDP related knowledge in the next chapter.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

PyTorch source code interpretation torch. Optim: optimization algorithm interface details

Pytorch (distributed) data parallel personal practice – DataParallel/DistributedDataParallel

The nn Pytorch DataParallel

PyTorch source code interpretation of distributed training to understand?

Discuss.pytorch.org/t/dataparal…

PyTorch DDP series 2: implementation principle and source code analysis

Pytorch-CUDA From Getting Started to Giving Up (Part 2)

Pytorch pits: The differences between assignment, shallow copy, and deep copy, and the pits for model.state_dict() and model.load_state_dict()

DP & DDP for PyTorch source Code Interpretation: Model parallel and Distributed training parsing