0 x00 the

NVIDIA Megatron is a distributed training framework based on PyTorch, which is used to train the large Transformer language model. Through the comprehensive application of data parallelism, Tensor parallelism and Pipeline parallelism, it is worth analyzing the mechanism behind GPT3. This series of 5 articles, through the paper and source code and you study together. This article takes a look at how Megatron arranges execution sequences for each stage of the pipeline.

Other articles in this series are:

Model Parallel Distributed Training Megatron (1) — Thesis & Foundation

Model parallel distributed training Megatron (2) — Overall architecture

Model parallel distributed training Megatron (3) — Model parallel implementation

Model parallel distributed training Megatron (4) — How to set up various parallelism

0 x01 background

In the assembly line training, how to arrange the execution sequence for each stage of the assembly line is a key, so here we look at how to make schedule.

For Megatron, get_forward_backward_func obtains pipeline schedule during training, which is classified as flush and interleaving. We have analyzed only the schedule of Flush. Interested readers can research Interleaving on their own.

def get_forward_backward_func() :
    args = get_args()
    if mpu.get_pipeline_model_parallel_world_size() > 1:
        if args.virtual_pipeline_model_parallel_size is not None:
            forward_backward_func = forward_backward_pipelining_with_interleaving
        else:
            forward_backward_func = forward_backward_pipelining_without_interleaving
    else:
        forward_backward_func = forward_backward_no_pipelining
    return forward_backward_func
Copy the code

In summary, Megatron is based on Pipedream-2BW with periodic refreshes.

  • Pipedream-2bw maintains two versions of model weights in the pipeline. “2BW” is double-buffered weights. Pipedream-2bw generates a new model version K (K>d) for each microlot. However, because some of the remaining backward-passing is still dependent on the old version of the model, the new model version cannot immediately replace the old version, but because only two versions are saved, the memory footprint is greatly reduced.
  • Pipedream-flush adds a globally synchronized pipelined update flush on top of Pipedream-2BW, similar to GPipe. This approach reduces memory footprint (that is, maintaining model weights for only one version) at the expense of a partial reduction in throughput capacity.

0 x02 paper

Memory-efficient Pipeline-parallel DNN Training and Efficient large-scale Language Model Training on GPU Clusters Using Megatron-lm is a paper corresponding to Megatron, and we will start with the paper. Note: The following discussion is based on the date of publication of the original paper. Since various open source systems have also evolved, the discussion of other open source systems may not be completely correct today.

An introduction to 2.1

Recently, pipelined model parallelism has been proposed to accelerate model parallelism training. Examples include GPipe (Huang et al., 2019) and PipeDream (Harlap et al., 2018; Narayanan et al., 2019) push multiple inputs to a series of workers in order for training, and each worker is responsible for a model partition, which allows different workers to process different inputs in parallel.

  • Native pipelining can cause model non-convergence due to inconsistent versions of weights between forward and backward propagation of a particular input, and prior art balances memory footprint and throughput in different ways to avoid this.
  • GPipe maintains a single weight version, but periodically refreshes the pipeline (Figure 1A) when the weights are updated for input after pipeline training. These refreshes limit overall throughput due to idle resources.
  • PipeDream does not Flush pipelines regularly, but stores multiple weighted versions, which increases throughput but also memory footprint, making it impossible to train large models due to memory limitations.

Therefore, training large models effectively requires an approach with both high throughput and low memory footprint. In addition, the performance of pipelined parallel systems depends on how DNN model Operators are divided on workers. This is challenging for three reasons:

  • Memory capacity constraints: Parameters and intermediate activations associated with model partitions need to be able to be placed in the main device memory of the accelerator.
  • Heterogeneous Network Interconnection: Today’s training deployments have a heterogeneous network topology with higher bandwidth links between devices on the same server.
  • Large search space for how operators are placed: As the model size increases, splitting operator graphs becomes computationally expensive because of the exponential number of different partitioning methods.

Figure 1 a. Timeline for parallel execution of different pipelines. Backward propagation is assumed to take twice as long as forward propagation; Forward propagation is shown in blue and backward propagation in green. The number represents the micro-batch ID, the time is displayed along the X-axis, and the utilization rate of each worker is displayed along the Y-axis. GPipe maintains a single-weight version, but periodically flushes the flush pipeline. PipeDream does not introduce periodic pipeline refreshes, but maintains multiple weighted versions.

In this paper, the author introduces pipedream-2BW, an efficient pipeline parallel training system for DNN model. The Pipedream-2BW achieves high throughput and low memory footprint through two key contributions. First, the authors propose double buffer weight update (2BW), which is a technique to reduce training memory footprint while avoiding pipeline refresh.

The authors take advantage of the fact that the gradients generated by each input do not need to be immediately applied to weights, but can be accumulated as “coalesced” gradients to limit the number of retained versions of weights. Instead of flushing the pipeline before using the most recently updated weights, 2BW applies the new weights to the inputs that are new to the pipeline, while applying the previous weight version (called the shaded version) to the inputs that are already in training.

Double buffering of each worker’s weights produces a pipelined scheme with higher throughput than GPipe (no pipelined refresh) and higher memory efficiency than PipeDream (here are 2 weighted versions, while in a depth -D pipeline of PipeDream, the worst case is D weighted versions).

The authors also introduce a variant of 2BW (called the PipeDream Flush) that tradeoffs throughput for lower memory footprint and higher performance.

2.2 background

In this section, the author gives a brief overview of distributed training techniques for DNN models.

  • Data parallel.
    • Data parallelism is used to extend model training. Using data parallelism (Xing et al., 2015), each worker has a copy of the whole model, and the input data set is sharded among workers. Workers periodically aggregate their gradients to ensure that all workers see consistent version weights. Data parallelism cannot train large models that cannot fit into a single worker, but can be used for smaller model partitions.
    • Data parallel scale-out usually works well, but there are two limitations: a) Beyond a certain point, the batch size of each GPU becomes too small, which reduces GPU utilization and increases communication cost; B) The maximum number of equipment that can be used is batch size, which limits the number of accelerators that can be used for training. Therefore, various model parallel techniques have been proposed to solve these two challenges.
  • The model of parallel. For large models that are not suitable for a single worker, parallel training of models is generally used.
    • Using model parallelism (Dean et al., 2012; Chilimbi et al., 2014), the weight parameters in the model are segmented on the available worker (matrix multiplication within each Transformer layer is segmented on multiple Gpus), and the intermediate activation and gradient of communication between workers. Interlayer model parallelism underutilizes resources because at most one worker process is active at any point in time.
    • The parallelism of Tensor (Intra-layer) model (Shoeybi et al., 2019) tends to lead to too expensive all-to-all communication in critical path. Because the all-reduce communication required for tensor parallelism goes through inter-server links, which is slower than the high-bandwidth NVLink available in multi-GPU servers, it’s easy to limit the number of model partitions to the number of Gpus in a single server. Also, a high degree of model parallelism creates a large number of small matrix multiplications (GEMMs), which may reduce GPU utilization.
    • FlexFlow (Jia et al., 2018) shows how to split model graphs using model and data parallelism, but low resource utilization still exists when model parallelism is used.
  • Pipeline parallelism. To address the shortcomings of model parallelism, recent work such as PipeDream and GPipe has proposed pipelined parallelism.
    • Through pipelining parallelism, multiple inputs instead of one are injected into a training consisting of inter-layer model partitions. This ensures better use of computing resources.
    • A Batch is divided into smaller microbatches and is pipelined between them. Layers can be assigned to workers in a variety of ways, and the forward and backward propagation of input can use a variety of different plans.
    • However, simple pipelining can lead to mismatches in weight versions between passing a particular input before and after. Specifically, if the weights are updated immediately with the latest version of the weights, an input in the pipeline may see the updated weights propagated backward rather than as it propagated forward, resulting in incorrect gradient calculations.
    • Tier allocation and scheduling policies result in different performance trade-offs. Whatever plans, in order to maintain strict optimizer semantics, the optimizer step requires synchronization across devices, which in the end of each batch pipelining refresh, allow small batch execution (this time not to join the new micro batch) is used to Flush line up to 50% of the time, it depends on the micro injection line batch number. The higher the ratio of the number of microbatches to the line size, the shorter the Flush time for the line. Therefore, in order to achieve high efficiency, larger batch size is usually required.

Users can train their large models using a variety of techniques, each with different trade-offs. In addition, these technologies can be used in combination. However, the combination of these techniques results in non-trivial interactions that require careful reasoning for good performance to maximize the training throughput of large models for a given batch size while maintaining strict optimizer semantics.

Achieving large-scale throughput requires innovation and careful design along multiple axes: efficient kernel implementations make most of the training computationally limited rather than memory-limited; The compute graph should be intelligently partitioned on the device to reduce the number of bytes sent over the network link while limiting the idle time of the device; Use domain-specific communications optimization and fast hardware (state-of-the-art Gpus and high bandwidth links between Gpus on the same and different servers).

In addition, the authors of the paper studied the interactions between various components that affect throughput, including experience and analysis. Based on these studies, the authors offer the following guidelines on how to configure distributed training:

  • Different forms of parallelism interact in complex ways: the parallelization strategy affects traffic, the computational efficiency of the executing kernel, and the idle time that workers have to wait for computations due to pipeline flusher (pipeline bubbles). For example, a suboptimal combination of tensor and pipeline model parallelism can result in throughput up to 2× less, even if the network links between servers have high bandwidth; Tensor model parallelism is effective in multi-GPU servers, but pipeline model parallelism must be used in larger models.

  • Planning for pipeline parallelism affects traffic, pipeline bubble size, and memory used to store activation.

    • The values of hyperparameters, such as microbatch size, affect memory footprint, arithmetic efficiency of the kernel executing on the worker process, and pipeline bubble size. The optimal value for the microbatch size depends on the problem, and an appropriate value can improve throughput by 15%.
    • Distributed training is communication intensive. If nodes are more slowly connected or more communication-intensive partitions will hinder performance scaling.
    • Instead of looking at how to automatically explore the search space for parallel strategies (such as FlexFlow, PipeDream, Tarnawski, and DAPPLE), the paper suggests using heuristics that work well in practice.

2.3 Assembly line weight problem

Let’s review the assembly line weight problem. Below is the naive pipeline execution, which is essentially async SGD.

2.3.1 question 1

The first problem we encounter is that, in general, when we calculate the second iteration, we need to base the calculation on the updated model from the first iteration. But as shown in the figure below, for machine 1, when the second iteration begins (dark blue no. 2 in the red circle), the back propagation of the first iteration (light green no. 1) has not yet begun.

2.3.2 question 2

The second problem is that for machine 2, when it does the 5th mini-batch forward propagation (blue 5 in the second row), it does the forward calculation based on the weight updated twice (blue 5 in the second row is preceded by two green squares, meaning that the weight has been updated twice).

However, when carrying out the fifth mini-batch back propagation (light green 5 in the second line), the weight used is updated for 4 times (the weight of light green 1,2,3 and 4 in the front of the second line will be updated for 4 times in total). This conflicts with the single-node deep learning hypothesis, which will lead to the decline of training effect.

To solve these problems, the authors of PipeDream proposed Weight Stashing to ensure that the same Weight version is used for forward and backward propagation of the same input (figure 1B). Specifically, each machine backs up the weight of several versions, which weight is used for forward propagation, and which weight is used for back propagation.

In the figure above, machine 1 needs to store 4 versions of weights, machine 2 needs to store 3 versions of weights, machine 3 needs to store 2 versions of weights, and machine 4 needs to store 1 version of weights. In the worst case, the total number of stored weighted versions is D, where D is pipeline depth, which is too high a memory footprint for a large model. Using PipeDream’s default weight update semantics, each phase of weight update has a different delay term, and no accumulation is performed in the pipeline.

2.3.3 question 3

Another question is: For example, the 5th mini-batch (dark blue 5). When machine 1 computs 5, the weight based on it is updated once (there is a green in front of it). However, when machine 2 computs 5, The weight based on is updated twice (it is preceded by two greens).

Solution is: every time the forward propagation time, to calculate the weights of every machine based on updated at least, such as for machine 2, just ignore the green 2 updated weight, for machine 3, just ignore the weight of the green, 2, 3 two after the update, they all use by green 1 update after a weight (yellow rectangle box 1) on the drawing.

2.4 Pipedream-2BW system design

Pipedream-2bw uses memory-efficient pipelining parallelism to train large models unsuitable for a single accelerator. Its double-buffered weight update (2BW) and refresh mechanism ensures high throughput, low memory footprint, and weight update semantics similar to data parallelism. Pipedream-2bw split the model into multiple stages on multiple workers, and replicated each stage for the same number of times (data was updated in parallel between copies of the same stage). This parallel pipeline is suitable for models (such as transformer models) that repeat each layer a fixed number of times.

Against 2.4.1 GPipe

GPipe maintains a single version of the model weights. The input batches are divided into smaller microbatches. Weight gradients are cumulative, not applied immediately, and flush pipelines regularly to ensure that multiple weight versions do not need to be maintained. GPipe provides weight update semantics similar to data parallelism. Figure 1A of the original paper shows the timeline of GPipe execution. Periodic pipelined Flush can be expensive, limiting throughput. One way to mitigate this overhead is to do additional accumulation within the pipeline, but this is not always feasible: A) Under large scale factors, the minimum batch size that can be supported is large (proportional to scale factor), and large quantities will affect the convergence of all models, b) GPipe needs to maintain active storage proportional to the batch size.

2.4.2 Double-Buffered Weight Updates (2BW)

Pipedream-2bw combined with 1F1B scheduling (Narayanan et al., 2019) uses a novel double-buffered weight update (2BW) scheme, in which each worker alternates between forward and backward delivery of different inputs, To ensure that the same weight version is used for forward and backward passing of a particular input (original figure 2 of the paper). 2BW has a lower memory footprint than PipeDream and GPipe, and avoids the expensive pipeline refresh of GPipe.

The gradient is calculated using a smaller Mi-Crobatches size. For any input microbatch, Pipedream-2BW uses the same weighted version for forward and backward propagation of the input. Updates are accumulated over multiple microbatches before they are applied at batch granularity, limiting the number of weighted versions that are generated and maintained. Figure 2 shows an example timeline for 2BW.

Pipedream-2bw generates a new weight version for every m microlots (m≥ d, where D is pipeline depth). For simplicity, the authors first assume m=d (d=4 in Figure 2). The new weight version is not available immediately. In particular, in-flight inputs cannot be propagated backward using the latest weight version (for example, input 7 on worker 3 at t=21), because forward delivery of these inputs has been initiated at various stages using older weight versions.

Therefore, the newly generated weighted version needs to be buffered for future use. However, the total number of weight versions that need to be maintained is at most 2, because the weight version used to generate the new weight version can be discarded immediately (future inputs through this phase no longer use the old weight version). For example, in Figure 2, each worker can discard W(0) after processing backward pass of input 8, because all the forward and backward passes of subsequent input use higher weight versions.

Given the input microlot K (an index starting with 1), the weight version used is Max (⌊(k−1)/m⌋−1,0) Max (⌊(k−1)/m⌋−1,0) Max (k (k−1)/m o r −1,0), where M is the number of microlots in the lot (4 in Figure 2). This version of weight is the same for forward and backward propagation of input K. M can be any number ≥ D, and additional gradient accumulation (larger m) will increase the global Batch size.

Original figure 2. The timeline shows the pipeDream-2BW double buffered weight update (2BW) scheme along the X axis. Without losing generality, it is assumed that backward propagation takes twice as long as forward propagation. Pipedream-2bw stores only two weight versions on each worker, reducing the total memory footprint and eliminating the need for expensive pipeline pauses. Wi(v)W_i^{(v)}Wi(v) represents the weight of worker I with version V (including the weight gradient generated from input V). A new weight version is generated in the green box. W4(4)W_4^{(4)}W4(4) is first used in the forward propagation of input 9.

Before in the figure above means the two weights of the system Before discarding the version, and After means the two weights of the system After discarding the version.

2.4.2. Weight Updates with Flushes (PipeDream-Flush)

The authors also propose a second memory-efficient pipelining scheme called the PipeDream Flush. It has a lower memory footprint than the 2BW and Vanilla optimizer, but at the cost of lower throughput. The plan reuses PipeDream’s (Narayanan et al., 2019) 1F1B plan, but maintains a single-weight version and introduces periodic pipeline refreshes to ensure consistent weight versions during weight updates. The timeline for Pipedream-Flush and GPipe with two pipeline phases is shown in Figure 3.

Why 1F1B? Because it reduces the number of N-Flight microbatches to pipeline-depth D, rather than the number of microbatches M of GPipe, 1F1B is memory-efficient. In order to reduce bubble time, in general, m >> d.

Memory usage. With PipeDream Flush, the total number of in-flight “active” input activations is less than or equal to pipeline depth, which makes its memory footprint lower than that of GPipe, which must keep input activations proportional to the gradient average (m) number of microbatches. The Memory footprint of the PipeDream Flush is also lower than that of the PipeDream-2BW, as it requires only one weight version to maintain (compared to two for the PipeDream-2BW).

Semantics. Regular pipeline refreshes ensure that weight updates can be performed using gradients calculated with the latest weight version. This will make the weight update in the following manner: W (t + 1) = W (t) – an argument ⋅ ∇ f (t)) (W W ^ ^ {} (t + 1) = W (t) – attach argument, ∇ f (W ^ {(t)}) W (t + 1) = W (t) – an argument ⋅ ∇ f (t) (W).

Original figure 3. The timetable for GPipe and PipeDream will be divided into two phases. GPipe and PipeDream Flush both use a pipe Flush; Pipedream-flush alternates forward and backward propagation in steady state, keeping memory footprint low by limiting retention to in-flight microbatch activations only.

0 x03 PipeDream is – Flush

When virtual_pipeline_model_PARALLEL_size is not set, the pipeline schedule is flushed. The function is forward_backward_pipelining_without_interleaving.

def get_forward_backward_func() :
    args = get_args()
    if mpu.get_pipeline_model_parallel_world_size() > 1:
        if args.virtual_pipeline_model_parallel_size is not None:
            forward_backward_func = forward_backward_pipelining_with_interleaving
        else:
            # Flush schedule
            forward_backward_func = forward_backward_pipelining_without_interleaving
    else:
        forward_backward_func = forward_backward_no_pipelining
    return forward_backward_func
Copy the code

Why 1F1B? The authors note that 1F1B is memory-efficient because it reduces the number of In-Flight microbatches to pipeline-depth D, rather than the number of microbatches M of GPipe. In order to reduce bubble time, in general, m >> d.

3.1 General Idea

3.1.1 Default Plan

GPipe proposes an execution plan in which forward propagation of all microbatches in a batch is performed first, and then back propagation of all microbatches is performed (figure 3). We can quantify the bubble size of the GPipe pipeline (𝑡𝑝𝑏). We express the number of microbatches in batches as 𝑚, the number of pipeline stages (the number of devices used for pipeline parallelism) as 𝑝, the ideal time for each iteration as 𝑡𝑖𝑑 (assuming perfect or ideal scaling), and The Times 𝑡𝑓 and 𝑝 for performing individual microbatches forward and backward channels.

In this plan, pipeline bubbles contain:

  • 𝑝 − 1 forward propagation at the start of the batch.
  • 𝑝 − 1 at the end of the batch propagates backwards.

Total time spent in the pipeline 𝑡 𝑝 𝑏 = (𝑝 – 1), (𝑡 𝑓 + 𝑡 𝑏), and the task of processing time as 𝑡 𝑖 𝑑 = 𝑚 · (𝑡 𝑓 + 𝑡 𝑏). Therefore, the ideal fraction of the calculation time spent in the pipeline bubble is:


B u b b l e   t i m e   f r a c t i o n ( p i p e l i n e   b u b b l e   s i z e ) = t p b t i d = p 1 m Bubble\ time\ fraction (pipeline\ bubble\ size) = \frac{t_{pb}}{t_{id}} = \frac{p-1}{m}

Figure 3: GPipe pipeline plan, with all microlots (represented numerically) propagating forward (blue) and then backward (green). Gray areas represent pipeline bubbles. For simplicity, we assume that forward propagation takes twice as long as backward propagation. The efficiency of pipeline planning does not depend on this time factor. Each batch in this example consists of eight microlots, and the number in each blue or green box is a unique identifier given to the corresponding microlot (for example, the first batch consists of 1− 8 microlots, the second batch consists of 9− 16 microlots, etc.). The optimizer steps and updates weight parameters during pipeline refresh to ensure strict optimizer semantics.

To make the bubble time fraction small, we need 𝑚, 𝑝. But for such a large 𝑚, this approach has a high memory footprint because it requires intermediate activations (or only input activations for each pipeline phase when recompiling with activations) to be kept in memory for all 𝑚 microbatches to be used throughout the life of the training iteration.

3.1.2 PipeDream is plan

Pipedream-flush breaks an iteration into three phases:

  • Warmup Forward passes: In this stage, except for the last stage, each worker does forward calculation, makes different numbers of forward passes and sends activation to its downstream until the last stage is activated. The plan limits the number of in-flight microlots (the number of microlots that have not completed backpropagation and need to remain active) to pipeline depth, rather than the number of microlots within a batch.

  • Run 1F1B in steady state: After entering the steady state, each worker performs 1F1B operations.

  • Cooldown Backward passes: this stage completes in-flight microbatches and only performs backcalculations and sends gradients downstream to backcalculations.

This new plan spends the same amount of time in bubbles as GPipe, but the maximum number of unfinished forward propagations is the same as the pipeline phase. Therefore, the plan calls for activation to be reduced to 𝑝 or fewer microbatches (m microbatches in the case of the GPipe plan). As a result, pipedream-Flush is much more memory efficient than GPipe when 𝑚 𝑝.

We first give the specific code as follows, and then we will analyze it step by step.

def forward_backward_pipelining_without_interleaving(forward_step_func, data_iterator, model, optimizer, timers, forward_only) :
    """Run non-interleaved 1F1B schedule, with communication between pipeline stages. Returns dictionary with losses if the last stage, empty dict otherwise."""
    timers = get_timers()

    assert len(model) == 1
    model = model[0]

    # Compute number of warmup microbatches.
    num_microbatches = get_num_microbatches()
    num_warmup_microbatches = \
        (mpu.get_pipeline_model_parallel_world_size() -
         mpu.get_pipeline_model_parallel_rank() - 1)
    num_warmup_microbatches = min( num_warmup_microbatches, num_microbatches) num_microbatches_remaining = \ num_microbatches - num_warmup_microbatches unwrapped_model = unwrap_model( model, (torchDDP, LocalDDP, Float16Module)) model_type = unwrapped_model.model_type rank = mpu.get_pipeline_model_parallel_rank() recv_tensor_shapes  = get_tensor_shapes(rank-1, model_type)
    send_tensor_shapes = get_tensor_shapes(rank, model_type)

    # Input, output tensors only need to be saved when doing backward passes
    input_tensors = None
    output_tensors = None
    if not forward_only:
        input_tensors = []
        output_tensors = []
    losses_reduced = []

    # Run warmup forward passes.
    for i in range(num_warmup_microbatches):
        input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
        output_tensor = forward_step(forward_step_func, data_iterator, model,
                                     input_tensor, losses_reduced)
        send_forward(output_tensor, send_tensor_shapes, timers=timers)

        if not forward_only:
            input_tensors.append(input_tensor)
            output_tensors.append(output_tensor)

    # Before running 1F1B, need to receive first forward tensor.
    # If all microbatches are run in warmup / cooldown phase, then no need to
    # receive this tensor here.
    if num_microbatches_remaining > 0:
        input_tensor = recv_forward(recv_tensor_shapes, timers=timers)

    # Run 1F1B in steady state.
    for i in range(num_microbatches_remaining):
        last_iteration = (i == (num_microbatches_remaining - 1))

        output_tensor = forward_step(forward_step_func, data_iterator, model,
                                     input_tensor, losses_reduced)
        if forward_only:
            send_forward(output_tensor, send_tensor_shapes, timers=timers)

            if not last_iteration:
                input_tensor = recv_forward(recv_tensor_shapes, timers=timers)

        else:
            output_tensor_grad = \
                send_forward_recv_backward(output_tensor,
                                           send_tensor_shapes,
                                           timers=timers)

            # Add input_tensor and output_tensor to end of list.
            input_tensors.append(input_tensor)
            output_tensors.append(output_tensor)

            # Pop input_tensor and output_tensor from the start of the list for
            # the backward pass.
            input_tensor = input_tensors.pop(0)
            output_tensor = output_tensors.pop(0)

            input_tensor_grad = \
                backward_step(optimizer, input_tensor, output_tensor,
                              output_tensor_grad)

            if last_iteration:
                input_tensor = None
                send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
            else:
                input_tensor = \
                    send_backward_recv_forward(
                        input_tensor_grad, recv_tensor_shapes, timers=timers)

    # Run cooldown backward passes.
    if not forward_only:
        for i in range(num_warmup_microbatches):
            input_tensor = input_tensors.pop(0)
            output_tensor = output_tensors.pop(0)

            output_tensor_grad = recv_backward(send_tensor_shapes, timers=timers)

            input_tensor_grad = \
                backward_step(optimizer, input_tensor, output_tensor,
                              output_tensor_grad)

            send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)

    return losses_reduced
Copy the code

3.2 Startup Phase

This is done on each worker. Each worker has a different rank, and the specific logic is as follows:

  • First of all, the number of batches batches that this worker needs to execute in the warm-up phase is determined as MIN ((world-size-rank-1), NUM_microbatches). Since rank is an incremental batch, the number of batches required for warm-up will be successively reduced until they are equal to 0. For example, if the world size is 5, the rank range is 0 ~ 4, and the number of microbatches is 4, then the warm-up batches of the first and later stages are 5-0-1 = 4, 5-1-1 = 3, 5-2-1 = 2. 5-3-1 = 1, 5-4-1 = 0.
  • Secondly, the microbatches needed for the stabilization stage are calculated.
  • When backpropagation is required, two FIFO queues are established. Input_tensors store downstreamactivations while output_tensors store downstreamactivations.
timers = get_timers()

assert len(model) == 1
model = model[0]

# Compute number of warmup microbatches.
num_microbatches = get_num_microbatches() # get the number of microbatches
Number of batches batches this worker needs to execute in the warmup phase: MIN ((world-size-rank-1), num_microbatches)
# Since the rank is incrementally increased, the number of microbatches required to warm up will decrease successively until it reaches 0, which will directly enter the stable stage for calculation
# For example, if the world size is 5, the rank range is 0 ~ 4, and the number of microbatches is 4, then the warm-up batches of the first and later stages are 5-0-1, 5-1-1, 5-2-1, 5-3-1, 5-4-1.
num_warmup_microbatches = \
    (mpu.get_pipeline_model_parallel_world_size() -
     mpu.get_pipeline_model_parallel_rank() - 1) 
num_warmup_microbatches = min(
    num_warmup_microbatches,
    num_microbatches) 
# Calculate the number of microbatches needed for the stabilization phasenum_microbatches_remaining = \ num_microbatches - num_warmup_microbatches unwrapped_model = unwrap_model( model, (torchDDP, LocalDDP, Float16Module)) model_type = unwrapped_model.model_type rank = mpu.get_pipeline_model_parallel_rank() recv_tensor_shapes  = get_tensor_shapes(rank-1, model_type)
send_tensor_shapes = get_tensor_shapes(rank, model_type)

# Input, output tensors only need to be saved when doing backward passes
When backpropagation is required, two queues should be set up. Input_tensors store downstreamactivations while output_tensors store downstreamactivations
input_tensors = None
output_tensors = None
if not forward_only:
    input_tensors = []
    output_tensors = []
losses_reduced = []
Copy the code

3.3 Warm-up Phase

The warm-up phase will be processed in sequence according to the number of microbatches that the worker needs to execute in the warm-up phase:

  • Get input activation from upstream.
  • The forward calculation is performed locally, and the activation of the upstream input is the input of this stage.
  • Send local activation downstream.
  • If propagation back is required, then each worker stores the upstream activations in input_tensor and the activations sent downstream in output_tensor.
  • The early phase runs as much forward propagation as possible so that the later phase can start immediately from 1F1B.
# Run warmup forward passes.
for i in range(num_warmup_microbatches):
    Get input activation from upstream
    input_tensor = recv_forward(recv_tensor_shapes, timers=timers)
    The activation of the upstream input is the input of this stage
    output_tensor = forward_step(forward_step_func, data_iterator, model,
                                 input_tensor, losses_reduced)
    Send local activation downstream
    send_forward(output_tensor, send_tensor_shapes, timers=timers)

    if not forward_only:
        input_tensors.append(input_tensor) Save upstream activation
        output_tensors.append(output_tensor) Save local computations for activations that are sent downstream
Copy the code

Recv_forward will return None for the first stage because there is no upstream, otherwise an upstream activation will be returned.

def recv_forward(tensor_shapes, timers) :
    input_tensors = []
    for tensor_shape in tensor_shapes:
        if tensor_shape is None:
            input_tensors.append(None)
        else:
            input_tensors.append(p2p_communication.recv_forward(tensor_shape,
                                                                timers=timers))
    return input_tensors
Copy the code

3.4 Communication Module

3.4.1 Basic communication methods

Pipeline Parallelism requires inter-stage P2P communication. Its main implementation is _communnicate and _communicate which encapsulates the basic communication function of PyTorch. It provides bidirectional P2P communication between stages for pipeline parallelism. On this basis, some API methods are encapsulated. This function is well commented and explained very clearly. The important thing to note here is: how does each layer know what its rank is up and down the pipeline? This is known by calls such as mPU.get_pipeline_model_parallel_next_rank ().

_COMMUNICATE code is as follows:

def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
                 tensor_shape,
                 use_ring_exchange=False,
                 dtype_=None) :
    """Communicate tensors between stages. Used as helper method in other communication methods that are used in megatron/schedules.py. Takes the following arguments: tensor_send_next: tensor to send to next rank (no tensor sent if set to None). tensor_send_prev: tensor to send to prev rank (no tensor sent if set to None). recv_prev: boolean for whether tensor should be received from previous rank. recv_next: boolean for whether tensor should be received from next rank. tensor_shape: shape of tensor to receive (this method assumes that all tensors sent and received in a single function call are the same shape). use_ring_exchange: boolean for whether torch.distributed.ring_exchange() API should be used. dtype_: optional, this is used when the tensor that needs to be communicated is different from args.params_dtype. Returns: (tensor_recv_prev, tensor_recv_next) """
    args = get_args()

    # Create placeholder tensors for receive in forward and backward directions
    # if needed.
    tensor_recv_prev = None
    tensor_recv_next = None

    # Some legacy inference code doesn't set the tensor shape, do so now
    # for the normal values for gpt/bert. This could be removed if inference
    # code is changed to provide tensor_shape.
    if tensor_shape is None:
        tensor_shape = (args.seq_length, args.micro_batch_size, args.hidden_size)

    override_scatter_gather_tensors_in_pipeline = False
    if args.scatter_gather_tensors_in_pipeline:
        tensor_chunk_shape = reduce(operator.mul, tensor_shape, 1)
        if tensor_chunk_shape % mpu.get_tensor_model_parallel_world_size() == 0:
            tensor_chunk_shape = tensor_chunk_shape // \
                mpu.get_tensor_model_parallel_world_size()
        else:
            tensor_chunk_shape = tensor_shape
            override_scatter_gather_tensors_in_pipeline = True
    else:
        tensor_chunk_shape = tensor_shape
    dtype = args.params_dtype
    if args.fp32_residual_connection:
        dtype = torch.float

    requires_grad = True
    if dtype_ is not None:
        dtype = dtype_
        requires_grad = False

    If we need to accept tensors, we allocate the empty tensors first, and the accepted tensors are stored here
    if recv_prev:
        tensor_recv_prev = torch.empty(tensor_chunk_shape,
                                       requires_grad=requires_grad,
                                       device=torch.cuda.current_device(),
                                       dtype=dtype)
    if recv_next:
        tensor_recv_next = torch.empty(tensor_chunk_shape,
                                       requires_grad=requires_grad,
                                       device=torch.cuda.current_device(),
                                       dtype=dtype)

    # Split tensor into smaller chunks if using scatter-gather optimization.
    if not override_scatter_gather_tensors_in_pipeline and \
            args.scatter_gather_tensors_in_pipeline:
        if tensor_send_next is not None:
            tensor_send_next = mpu.split_tensor_into_1d_equal_chunks(tensor_send_next)

        if tensor_send_prev is not None:
            tensor_send_prev = mpu.split_tensor_into_1d_equal_chunks(tensor_send_prev)

    # Send tensors in both the forward and backward directions as appropriate.
    if use_ring_exchange:
        Use Ring Exchange if necessary, which is only available in the new PyTorch
        torch.distributed.ring_exchange(tensor_send_prev=tensor_send_prev,
                                        tensor_recv_prev=tensor_recv_prev,
                                        tensor_send_next=tensor_send_next,
                                        tensor_recv_next=tensor_recv_next,
                                        group=mpu.get_pipeline_model_parallel_group())
    else:
        Torch. Distributed.P2POp generated by the target rank
        ops = []
        if tensor_send_prev is not None:
            send_prev_op = torch.distributed.P2POp(
                torch.distributed.isend, tensor_send_prev,
                mpu.get_pipeline_model_parallel_prev_rank())
            ops.append(send_prev_op)
        if tensor_recv_prev is not None:
            recv_prev_op = torch.distributed.P2POp(
                torch.distributed.irecv, tensor_recv_prev,
                mpu.get_pipeline_model_parallel_prev_rank())
            ops.append(recv_prev_op)
        if tensor_send_next is not None:
            send_next_op = torch.distributed.P2POp(
                torch.distributed.isend, tensor_send_next,
                mpu.get_pipeline_model_parallel_next_rank())
            ops.append(send_next_op)
        if tensor_recv_next is not None:
            recv_next_op = torch.distributed.P2POp(
                torch.distributed.irecv, tensor_recv_next,
                mpu.get_pipeline_model_parallel_next_rank())
            ops.append(recv_next_op)
         
        Send/RECV
        if len(ops) > 0:
            reqs = torch.distributed.batch_isend_irecv(ops)
            for req in reqs:
                req.wait() Use wait to synchronize
                
    # To protect against race condition when using batch_isend_irecv().
    torch.cuda.synchronize()

    # If using scatter-gather optimization, gather smaller chunks.
    # Special optimization. It was mentioned in the paper in 2011 that because all-reduce was done, it could be split and sent first, and the downstream gather would form unified data
    Interested readers can delve deeper into the paper and code,
    if not override_scatter_gather_tensors_in_pipeline and \
            args.scatter_gather_tensors_in_pipeline:
        if recv_prev:
            tensor_recv_prev = mpu.gather_split_1d_tensor(
                tensor_recv_prev).view(tensor_shape).requires_grad_()

        if recv_next:
            tensor_recv_next = mpu.gather_split_1d_tensor(
                tensor_recv_next).view(tensor_shape).requires_grad_()

    return tensor_recv_prev, tensor_recv_next
Copy the code

3.4.2 API

On the basis of _communicate, it encapsulates many API functions, mainly to do different processing according to different parameters, for example:

def send_backward_recv_forward(input_tensor_grad, tensor_shape=None, timers=None) :
    """Batched send and recv with previous rank in pipeline."""
    if mpu.is_pipeline_first_stage():
        input_tensor = None
    else:
        input_tensor, _ = _communicate(
            tensor_send_next=None,
            tensor_send_prev=input_tensor_grad,
            recv_prev=True,
            recv_next=False,
            tensor_shape=tensor_shape)
    return input_tensor
Copy the code

3.4.3 Upstream and Downstream of assembly line

The following functions are used to determine the flow upstream and downstream. Based on the previous article, we know that if the flow is ranks 2, the flow ranks group is [G2, G6, G10, G14], and the flow downstream is Rank 6.


def get_pipeline_model_parallel_first_rank() :
    return _PIPELINE_GLOBAL_RANKS[0]

def get_pipeline_model_parallel_last_rank() :
    last_rank_local = get_pipeline_model_parallel_world_size() - 1
    return _PIPELINE_GLOBAL_RANKS[last_rank_local]

def get_pipeline_model_parallel_next_rank() :
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
    return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline + 1) % world_size]

def get_pipeline_model_parallel_prev_rank() :
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
    return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline - 1) % world_size]
Copy the code

3.5 Stability Stage

The overall logic of the stable phase is as follows: forward calculation -> send activation to downstream of forward calculation & receive gradient from downstream -> Backward calculation -> send gradient calculated by this worker upstream & receive activation from upstream.

3.5.1 track of logic

The specific logic of the stabilization stage is as follows:

  1. Forward_step: Take a microlot (upstream activation) and perform local forward calculations.
  2. Send_forward:
    1. If the propagation is only forward, send_forward is called to send the local settlement result downstream.
    2. Otherwise, call send_forward_recv_BACKWARD: send the calculation locally to the downstream and receive its gradient from the downstream.
  3. Each worker stores the upstream activations in input_tensor, and the activations sent downstream in output_tensor.
  4. Backward_step: local backward calculation.
    1. Eject the first unprocessed (that is, the earliest unprocessed) upstream activation from the queue.
    2. Eject the corresponding local activation from the queue.
    3. Perform a reverse calculation using (upstream activation, local activation, downstream gradient) to perform a reverse calculation on the earliest untreated microlot to obtain the local gradient.
  5. Send_backward:
    1. If it is the last microbatch, only the local gradient input_tensor_grad needs to be passed upstream of the forward calculation.
    2. Otherwise, call send_backward_recv_forward to pass the local gradient input_tensor_grad upstream of the forward calculation, and get another activation value from upstream.
  6. Jump back to 1 to continue processing the next microlot (upstream activation).
# Before running 1F1B, need to receive first forward tensor.
# If all microbatches are run in warmup / cooldown phase, then no need to
# receive this tensor here.
if num_microbatches_remaining > 0:
    Need to run in steady state, so get the activation value of the previous layer
    input_tensor = recv_forward(recv_tensor_shapes, timers=timers)

# Run 1F1B in steady state.
for i in range(num_microbatches_remaining):
    last_iteration = (i == (num_microbatches_remaining - 1))

    # forward calculation
    output_tensor = forward_step(forward_step_func, data_iterator, model,
                                 input_tensor, losses_reduced)
    if forward_only:
        send_forward(output_tensor, send_tensor_shapes, timers=timers)

        if not last_iteration:
            input_tensor = recv_forward(recv_tensor_shapes, timers=timers)

    else:
        Send the intermediate activation to the downstream and get its reverse gradient from the downstream
        output_tensor_grad = \
            send_forward_recv_backward(output_tensor,
                                       send_tensor_shapes,
                                       timers=timers)
          
        # Add input_tensor and output_tensor to end of list.
        input_tensors.append(input_tensor) Save upstream activation to queue
        output_tensors.append(output_tensor) Save locally computed activations that are sent downstream to the queue

        # Pop input_tensor and output_tensor from the start of the list for
        # the backward pass.        
        input_tensor = input_tensors.pop(0) Eject the first unprocessed upstream activation from the queue
        output_tensor = output_tensors.pop(0) # eject the corresponding local activation from the queue

        # Reverse calculation, using (upstream activation, local activation, downstream gradient) to reverse calculation of the earliest untreated microlot to obtain the local gradient
        input_tensor_grad = \
            backward_step(optimizer, input_tensor, output_tensor,
                          output_tensor_grad) # The gradient from downstream is here

        if last_iteration:
            input_tensor = None
            If it is the last microbatch, pass the local gradient input_tensor_grad upstream of the forward calculation
            send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)
        else:
            If not the last microbatch, pass the local gradient input_tensor_grad upstream of the forward calculation, then fetch another activation value from upstream
            input_tensor = \
                send_backward_recv_forward(
                    input_tensor_grad, recv_tensor_shapes, timers=timers)
Copy the code

3.5.2 serial

Where send_forward_recv_BACKWARD, which is logical by name, is sent to and received from the downstream.

def send_forward_recv_backward(output_tensors, tensor_shapes, timers) :
    if not isinstance(output_tensors, list):
        output_tensors = [output_tensors]
    output_tensor_grads = []
    for (output_tensor, tensor_shape) in zip(output_tensors, tensor_shapes):
        if tensor_shape is None:
            output_tensor_grads.append(None)
            continue
        # Send your own activation and get the gradient downstream
        output_tensor_grad = p2p_communication.send_forward_recv_backward(
                output_tensor, tensor_shape, timers=timers)
        output_tensor_grads.append(output_tensor_grad)
    return output_tensor_grads # return gradient
Copy the code

It can be found that for a single worker, both send and RECV are blocked, so communication and calculation must be serial and cannot overlap. Because the forward pass has been transmitted from worker 0 to worker D in the previous warm-up phase, worker D can directly take the input, process it, perform reverse calculation directly, and then return it upstream. So serial doesn’t matter. We can also see it from the legend in the paper:

Figure: Pipedream-Flush propagates forward and backward alternately in steady state, keeping memory footprint low by limiting activation hiding to in-flight microbatches only. As can be seen from the picture:

  • The execution sequence of Worker 1 is: 1 FW(warmup), 2 FW, 1 BW, 3 FW, 2 BW, 4 FW, 3 BW, 4 BW(cooldown)
  • The execution sequence of Worker 2 is: 1 FW, 1BW, 2 FW, 2 BW, 3 FW, 3 BW, 4 FW, 4 BW, Worker 2 directly enters the stable state.

3.6 Cooling Stage

The cooling and warm-up phases are symmetrical, and num_WARMup_microbatches steps are also performed, but only back propagation is performed. This stage is just a pop from the queue because it is a back propagation that has not been cleaned up. This is to pop up the upstream activation and the activation passed to the downstream, and then do the gradient calculation.

# Run cooldown backward passes.
if not forward_only:
    for i in range(num_warmup_microbatches):
        input_tensor = input_tensors.pop(0)
        output_tensor = output_tensors.pop(0)

        output_tensor_grad = recv_backward(send_tensor_shapes, timers=timers)
        input_tensor_grad = \
            backward_step(optimizer, input_tensor, output_tensor,
                          output_tensor_grad)

        send_backward(input_tensor_grad, recv_tensor_shapes, timers=timers)

return losses_reduced
Copy the code

3.7 Where is Flush?

We need to look at Megatron /training.py. It’s the process of training a step. When update_successful, grad_norm, num_zerOS_IN_grad = optimize.step (), num_zerOS_in_grad = optimizer. Flush is complete.

def train_step(forward_step_func, data_iterator, model, optimizer, lr_scheduler) :
    """Single training step."""
    args = get_args()
    timers = get_timers()

    # 1. Zero the gradient
    # Set grad to zero.
    if args.DDP_impl == 'local' and args.use_contiguous_buffers_in_local_ddp:
        for partition in model:
            partition.zero_grad_buffer()
    optimizer.zero_grad()

    # 2. Propagating forward and backward, which for this chapter is called forward_backward_pipelining_without_interleaving
    forward_backward_func = get_forward_backward_func()
    losses_reduced = forward_backward_func(
        forward_step_func, data_iterator, model,
        optimizer, timers, forward_only=False)

    At this point, the whole assembly line is processed, loss and gradient are calculated
    # Empty unused memory
    if args.empty_unused_memory_level >= 1:
        torch.cuda.empty_cache()

    # 3. All-reduce of data parallelism
    # All-reduce if needed.
    if args.DDP_impl == 'local':
        for model_module in model:
            model_module.allreduce_gradients()

    # All-reduce word_embeddings' grad across first and last stages to ensure
    # that word_embeddings parameters stay in sync.
    # This should only run for models that support pipelined model parallelism
    # (BERT and GPT-2).
    # 4. All-reduce of the embedding layer. Weight sharing is also carried out in the embedding layer, so all-reduce is carried out to ensure the uniformity of parameters
    if mpu.is_rank_in_embedding_group(ignore_virtual=True) and \
            mpu.get_pipeline_model_parallel_world_size() > 1:
        if mpu.is_pipeline_first_stage(ignore_virtual=True):
            unwrapped_model = model[0]
        elif mpu.is_pipeline_last_stage(ignore_virtual=True):
            unwrapped_model = model[-1]
        else:  # We do not support the interleaved schedule for T5 yet.
            unwrapped_model = model[0]
        unwrapped_model = unwrap_model(
            unwrapped_model, (torchDDP, LocalDDP, Float16Module))

        if unwrapped_model.share_word_embeddings:
            word_embeddings_weight = unwrapped_model.word_embeddings_weight()
            if args.DDP_impl == 'local':
                grad = word_embeddings_weight.main_grad
            else:
                grad = word_embeddings_weight.grad
            torch.distributed.all_reduce(grad, group=mpu.get_embedding_group())

    # Update parameters.
    # 5. Update the parameter.
    update_successful, grad_norm, num_zeros_in_grad = optimizer.step()

    # Update learning rate.
    if update_successful:
        increment = get_num_microbatches() * \
                    args.micro_batch_size * \
                    args.data_parallel_size
        lr_scheduler.step(increment=increment)
        skipped_iter = 0
    else:
        skipped_iter = 1

    # Empty unused memory
    if args.empty_unused_memory_level >= 2:
        torch.cuda.empty_cache()

    if mpu.is_pipeline_last_stage(ignore_virtual=True) :# Average loss across microbatches.
        loss_reduced = {}
        for key in losses_reduced[0]:
            losses_reduced_for_key = [x[key] for x in losses_reduced]
            loss_reduced[key] = sum(losses_reduced_for_key) / len(losses_reduced_for_key)
        return loss_reduced, skipped_iter, grad_norm, num_zeros_in_grad
    return {}, skipped_iter, grad_norm, num_zeros_in_grad
Copy the code

Now that the NVIDIA Megetron analysis is complete, let’s use the NVIDIA HugeCTR to see how to handle large sparse inserts.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Megatron Papers and Code Analysis (2)

Megatron Papers and Code Analysis (1)

Megatron-lm megatron-LM

Megatron-lm megatron-LM

Megatron learning summary

GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism

PipeDream-Flush

PipeDream: Data parallelism + pipeline

PipeDream-interleaved