0 x00 the

In the previous article, we introduced the overall architecture of PipeDream, the Profile stage, the computing partition stage, the model transformation stage, the run-time engine and the communication module. This is the last article in the PipeDream series, introducing the 1F1B strategy, which is the biggest contribution of PipeDream.

Pipelining parallelism other articles are linked below:

Deep learning pipeline parallel Gpipe(1)– pipeline basic implementation

Deep learning pipeline parallel GPipe (2) —– gradient accumulation

Deep learning pipeline parallel GPipe(3) —- recalculation

Deep learning pipeline parallel PipeDream(1)– Profile stage

Deep learning pipeline parallel PipeDream(2)– computing partitions

Deep learning pipeline parallel PipeDream(3)– transformation model

Deep learning pipeline parallel PipeDream(4)– runtime engine

Deep learning pipeline parallel PipeDream(5)– communication module

0x01 Pipelining Comparison

First, let’s compare the various pipelines analyzed so far.

1.1 General Assembly Line

The basic unit of the DNN model is the layer. PipeDream divides these layers of DNN into stages — each stage consists of a set of successive layers in the model. PipeDream deploys different phases of the model on different machines, and each phase may have a different Replication. This phase performs forward and backward passes for all layers in this phase. PipeDream refers to the phases that contain the input layer as the input phase and the phases that contain the output layer as the output phase.

In the simplest case, as in traditional model parallel training, only one minibatch is active in the system. The figure above shows a computational timeline with four machines and a pipe that can be considered a generic pipeline.

  • In the forward phase, each phase performs forward passes to the miniBatch of layers in the phase and sends the results to the next phase. The output stage calculates the miniBatch loss after the forward pass.
  • In the backward stage, each stage forms a backward channel that transmits the loss to the previous stage one by one.

1.2 Gpipe assembly line

Since PipeDream is based on Gpipe, we’ll look at the problems with Gpipe as well.

The pipeline parallel training diagram of Gpipe is as follows:

  • The layers being trained are divided into stages, each of which contains a continuous set of layers within the model.
  • The input data minibatch is divided into m microbatches. Just like AllReduce, some batches are calculated and then transmitted to the next node. Finally, the parameters are updated synchronously.
  • GPipe uses existing techniques such as gradient accumulation to optimize memory efficiency, swapping memory by discarding activation stores between forward and backward propagation and recalculating them when activation is required for backward propagation.

The pipeline of Gpipe has several problems:

  • Excessive pipeline refreshes lead to an increase in idle time.
  • If M is small, Gpipe may reduce hardware efficiency due to recalculation overhead and frequent pipe refreshes, so M is generally set to a large size.
  • The need to cache m activations results in an increase in memory. The reason is that activation from the intermediate result of each microbatch forward calculation is used by backward calculation and therefore needs to be cached in memory.

1.3 1F1B pipeline

PipeDream’s 1F1B (One Forward Pass followed by One Backward Pass) strategy solves the cache activation problem, Caching activation only depends on the number of stages, further saving video memory.

The parallel approach of Pipeline is to put different layers of the model on different machines (nodes) and perform sequential forward and reverse calculations.

The goal of PipeDream is to combine pipeline parallelism, model parallelism, and data parallelism in a way that minimizes the total training time. However, to make this approach effective for large DNN models and reap the potential benefits of pipeline-parallelization training, PipeDream must overcome three major challenges:

  1. How to automatically divide work across available computing resources (layers of models).
  2. How to schedule calculations to maximize throughput while ensuring that training tasks move forward.
  3. How to ensure effective training in the face of asynchrony brought by pipelining.

1F1B corresponds to the latter two challenges.

1.3.1 ideas

Let’s analyze the idea of 1F1B strategy.

The ultimate goal is to reduce the amount of activation cached and reduce the video memory footprint so that larger models can be trained.

The current dilemma is that even with Checkpointing technology, activation of forward calculation cannot be released until the corresponding backward calculation is completed.

The solution is to try to reduce the retention time of each activation by requiring each micro-Batch data to complete backward calculations as early as possible so that each activation can be released as early as possible.

Note: in PipeDream, the word minibatch is used at the end, so we can think of the minibatch of PipeDream as the micro-batch of Gpipe. From here on, minibatch is used.

The solution is:

  • Let the last stage (Machine 4 in the figure below) do the backward propagation of this minibatch immediately after finishing the forward propagation of one minibatch, so that other stages can start the backward propagation calculation as early as possible, which is the 1F1B strategy. It’s kind of like turning overall synchronization into asynchrony over lots of small chunks of data, and lots of small chunks of data that everyone updates independently.
  • In the steady state of 1F1B, forward/backward calculations are performed strictly alternately on each machine, so that a minibatch data is processed on each GPU to ensure high resource utilization (the whole pipeline is balanced, negligible pipeline pause, no pipeline flush, Ensures that parameter updates at each stage are performed at a fixed cycle.)
  • Faced with the asynchrony of pipelining, 1F1B uses different versions of weights to ensure the effectiveness of training.
  • PipeDream also extends 1F1B. For stage that uses parallel data, round-robin scheduling mode is adopted to allocate tasks to each device on the same stage, ensuring that the forward propagation calculation and backward propagation calculation of a batch of data take place on the same machine. This is the 1F1B-RR (one-forward-noe-back-round-robin).

In fact, the 1F1B strategy is to change a batch of synchronization into a minibatch of asynchronous data (minibatch). After calculating a minibatch, reverse immediately. After the reverse of a minibatch, update the gradient corresponding to the worker. All the workers run together. This can be interpreted as a shift from BSP execution to ASP execution.

1.3.2 graphic

Below is an assembly line with 1F1B implemented.

  • Divide a batch into multiple mini batches, e.g., 1,2,3,4.
  • Insert multiple mini batches into the assembly line one by one.
  • Machine 1 calculates the forward propagation of blue 1 first, and then sends blue 1 to Machine 2 to continue the calculation.
  • Machine 2 then calculates the forward propagation of blue 2 and sends blue 1 to Machine 2 to continue the calculation.
  • When blue 1 traverses Machine 1 ~ 4 from top to bottom, all forward propagation is completed, and then reverse propagation begins, corresponding to the first green 1, and then reverse transmission to Machine 3 ~ 1.
  • When data 1 has completed all back propagation, green 1 comes to Machine 1.
  • After each machine completes the back propagation of its own mini batch, a gradient update is performed locally.
  • Only a subset of the model is transmitted between machines so that computation and communication can be carried out in parallel.

Note that the following figure shows the initial phase and the stabilization phase, which we will refer to later.

0 x02 PipeDream is realized

First, an example graph of four Gpus is presented, and the time flow example of one of the Gpus (Mach. 3) is also shown in the graph. There is some overlap between the calculation and gradient/activation communication.

2.1 Overall Logic

Let’s take a training as an example, combined with the picture below to illustrate.

We need to introduce the term NOAM, active small batch number.

NUM_OPT_ACTIVE_MINIBATCHES (NOAM) = ⌈ (# machines)/(# machines in the input stage) ⌉

The meaning is: the minimum number of batches per input level copy allowed to keep the pipeline full in a steady state based on the partition generated by our algorithm.

The figure above shows the corresponding calculation timeline of the pipeline. Each pipeline has 4 stages running on different machines, so the NOAM of this configuration is 4.

Let’s analyze the operation steps in detail.

  • At the Startup stage at the beginning of training (Startup State in the figure), the input stage first reads enough minibatch data (i.e., NOAM) to ensure that there is corresponding work being processed on each device at the stable stage of pipeline. In the figure above, the input phase sends four small batches of propagation to the output phase.
  • Once the output phase has completed forward propagation of the first small batch (that is, Machine 4’s first blue 1), it performs backward propagation of the same small batch (that is, Machine 4’s first green 1).
  • Then start alternating forward and backward propagation of subsequent small batches (i.e., 2 before, 2 after, 3 before, and 3 after….. for Machine 4) .
  • As the backpropagation process begins to propagate to the early stages in the pipeline (that is, Work 3 to Work 1), each phase begins to alternate between different small batches of forward and reverse processes.
  • In a steady state, each machine is busy propagating forward or back to a small batch.

2.2 Weight Problem

The training mode of Pipeline will introduce two parameter inconsistencies, because it is actually ASP calculation, no coordination will be more dry and more chaotic:

  • In a native PipeDream pipeline, forward propagation at each stage is performed with parameters of a certain version, while backward propagation is performed with parameters of different versions, that is, the parameters used for forward propagation and backward propagation of the same minibatch are inconsistent. For example, as shown above:

    • When minibatch 5 enters Worker 1, its forward propagation logic is executed after the backward propagation calculation of minibatch 1, that is, the parameters used in its forward propagation calculation are the parameters updated after the backward propagation calculation of minibatch 1.
    • However, the minibatch 5 backward propagation logic is calculated after “minibatch 2, minibatch 3, minibatch 4” is executed. That is, the parameters used in this case are “minibatch 1, minibatch 2, minibatch 3, minibatch 4”.
    • This results in inconsistent parameters for forward and backward calculations in MiniBatch 5. That is, when calculating Machine 1, blue 5 and green 5 in the first line, the parameters updated after green 1 must be used.
  • The same minibatch performs the same operation (same forward operation or same backward propagation) on different stages using different versions of parameters. Also as shown in the figure above:

    • For the forward calculation part of minibatch 5 on Worker 1 (blue 5), its forward logic is executed after the backward calculation of minibatch 1.
    • However, the forward calculation part of minibatch 5 on Worker 2 (blue 5) is executed after the backward calculation of “minibatch 1, minibatch 2” is completed.
    • This results in inconsistent versions of the parameters used by MiniBatch 5 for forward calculations between the two stages.

To solve these two problems, PipeDream adopted two technologies, namely weight stashing and Vertical Sync

  • Weight stashing: Maintains multiple versions of weights, one for each active minibatch. Each stage is calculated forward with the weight of the latest version to process the input minibatch. After calculating forward propagation, this parameter is saved for backward calculation of the same minibatch. Weight stashing ensures that the same version of model parameters are used for forward and backward propagation of a given small batch within a phase, but does not guarantee consistency of model parameters for a given small batch across phases.
  • Vertical Sync : Each minibatch is entered into the pipeline using the latest version of the parameters entered into the stage, and the version number of the parameters is used throughout the life cycle of the minibatch data, using the same version of parameters in each phase (as opposed to the latest version of parameters, which is what Weight stashing does). Thus, parameter consistency between stages is realized.

2.3 Weight Stashing

Here is an example:

Worker 1, work 2 … Each has its own weight, denoted as…. That is, in the figure, subscript I represents the ith worker, and superscript (j) represents the JTH minibatch.

In one stage (each worker) :

  • Each backward propagation causes the weights to be updated, and the next forward uses the latest version of the available weights. That is to say, the weight of each worker will be updated after a new green backward propagation appears. Subsequent new operations should be based on this new weight.
  • After the forward propagation is calculated, the weights used by the forward propagation are saved for the backward calculation of the same minibatch.
  • Weight stashing ensures that within a phase, the same version of model parameters are used for forward and backward propagation of a given small batch.

Here is an example:

The blue 5 in the first line of Worker 1 depends on the green 1 in the same line before it. When the first green 1 in Worker 1’s line ends, it means that MiniBatch 1 has completed 4 forward and 4 back propagation of this assembly line. So it’s a new version of weight 1, which is. Therefore, both miniBatch 5 (blue forward and green back) for Work 1 should be calculated based on the new release. So you need to document the new version.

The blue 5 in Worker 2’s second line depends on the green 2 in the same line before it. Similarly, when the first green 2 of Worker 1 ends, it means that minibatch 2 has completed 4 forward and 4 backward propagation of this assembly line. So it’s a new version of weight 2. The forward direction of miniBatch 6 and the unmarked green back direction should be based on the weight 2 of the new version, so the new version needs to be recorded.

For worker 3, from its point of view, its own weight should be performed twice forward and twice backward (worker 4 once, then worker 3 twice). When the forward propagation of MiniBatch 5 is performed, it has been updated (by miniBatch 3’s green update), so it needs to be recorded for future backward updates of MiniBatch 5.

Similarly, worker 1 needs to record,,,,… Each new version of the. Worker 1 corresponds to the weights of miniBatch 1,2,3, and 4.

If because markdown formula shows problems browsing, please click www.cnblogs.com/rossiXYZ/p/…

2.4 Vertical Sync

The current problem is: the forward calculation of minibath 5 on worker 1 uses the parameters after 1 backward propagation, but the calculation of minibath 5 on worker 2 uses the parameters after 2 backward propagation, so the final summary will be chaotic again.

The Vertical Sync mechanism is that each minibatch () entering the pipeline is associated with the latest weight version when entering the pipeline input phase. This version information flows with activation values and gradients as small batches move forward in the pipeline propagation phase. In all phases, forward propagation is computed using saved, not Weight stashing using the latest version of arguments. After using the saved to calculate the backward propagation, each stage independently applies weight updates, creates the latest weights, and then deletes them.

Use the following diagram to illustrate:

Force all workers to use this worker to do the parameters after minibatch 1 backpropagation when calculating minibatch 5, specifically:

For worker 2, the green 1 of this stage is used (the weight of this stage is updated after the backpropagation of 1) to carry out the forward propagation of 5.

Similarly, for worker 3, green 1 of this stage is used (the weight of this stage is updated after backpropagation of 1) to carry out forward propagation of 5. For worker 4, green 1 of this stage is used (the weight of this stage is updated after backpropagation of 1) to carry out forward propagation of 5.

However, such synchronization can lead to a lot of wasteful computing. For example, update 5 uses the weight of 1, but the weight of 2/3/4 backward propagation is calculated for nothing. Therefore, Vertical Sync is not used by default. This is not entirely consistent from layer to layer, but because of the weight stashing, all parameters are valid.

2.5 the buffer

The buffer treatment is explained here.

Parameter status. For each phase, PipeDream mainly maintains all parameters related to the layer in GPU memory that is directly assigned to that phase. Parameters for each layer are stored separately, and each layer is assigned a unique ID. If the phase is not copied, PipeDream applies the update to the latest version of the parameter data stored in GPU memory when weight updates are available in the supplied GPU buffer. If the stages are copied, the weight updates are copied to host memory and then sent to the parameter server. When a new version of the parameter becomes available, the previous version is not immediately discarded as part of the weight storage scheme. Parameter data is discarded only after the backpass using newer parameters is formatted.

Intermediate state. Each layer’s intermediate data is also assigned a unique BLOB ID. When intermediate data is received from the previous level (or, in the case of the input level, from disk), PipeDream copies the intermediate data to GPU memory and places a pointer to the relevant buffer in the work queue. Intermediate data passed by the forward is not discarded until the associated miniBatch completes the backward transfer of this phase. Intermediate data from backward-passing is released when the ML staff has finished using it and, if needed, after it is sent to the next stage. Because of the different requirements for intermediate data in forward and backward delivery, stages in PipeDream typically manage intermediate data from multiple versions of forward delivery and only from a single version of backward delivery that is currently running.

0 x03 code

3.1 Overall Code

We use runtime/translation/main_with_runtime.py to analyze.

Some minor code is omitted below.

The general logic for using Runtime is as follows: runtime/translation/main_with_runtime.py. The main logic is:

  • Parse the input parameters.

  • Load and generate the model.

  • Build models based on modules.

  • Configure parameters such as input size, Batch size, etc.

  • Walk through each layer of the model (skip the last Loss layer).

    • The input tensor is constructed by iterating through each layer of input.
    • Build the output by calling the forward function corresponding to the stage.
    • Iterate over the output of each layer, setting its type and shape.
  • Build the output value tensor type.

  • Load the configuration file.

  • Build a StageRuntime.

  • Optimizer, where optimizer uses AdamWithWeightStashing or SGDWithWeightStashing, so you’re using weight stashing.

  • To load the dataset.

  • Train, save checkpoint.

The overall code is as follows:

def main():
    # 解析输入参数
    global args, best_prec1
    args = parser.parse_args()
​
    # Special case handling for GNMT model
    l2_promote()
​
    torch.cuda.set_device(args.local_rank)
​
    # build tokenizer
    tokenizer = Tokenizer(os.path.join(args.data_dir, config.VOCAB_FNAME))
​
    # define loss function
    criterion = build_gnmt_criterion(
        vocab_size=tokenizer.vocab_size, padding_idx=config.PAD, smoothing=0.1)
​
    # create stages of the model
    # 加载,生成模型
    module = importlib.import_module(args.module)
    args.arch = module.arch()
    # 依据模块来构建模型
    model = module.model(criterion)
​
    # 依据参数进行配置比如输入大小,batch size等
    input_size = [args.max_length_train, args.batch_size]
    training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],
                              "input2": input_size, "target": [args.max_length_train * args.batch_size],
                              "target_length": [args.batch_size]}
    dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,
              "target": torch.int64, "target_length": torch.int32}
    inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}
    target_tensor_names = {"target", "target_length"}
    
    # 遍历模型的每个层(跳过最后loss层)
    for module_id, (stage, inputs, outputs) in enumerate(model[:-1]):  # Skip last layer (loss).
        input_tensors = []
        # 遍历每层的输入,构建输入张量
        for module_input in inputs:
            if module_input in inputs_module_destinations:
                inputs_module_destinations[module_input] = module_id
​
            input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),
                                      dtype=dtypes[module_input])#.cuda()
            input_tensors.append(input_tensor)
        #stage.cuda()
        # PyTorch should not maintain metadata for a backward pass on
        # synthetic inputs. Without the following line, the runtime is
        # as much as 1.5x slower in a full DP configuration.
        with torch.no_grad():
            # 通过调用stage对应的forward函数,构建出输出
            output_tensors = stage(*tuple(input_tensors))
        if not type(output_tensors) is tuple:
            output_tensors = [output_tensors]
        # 遍历每层的输出,设置其类型和形状    
        for output, output_tensor in zip(outputs,
                                         list(output_tensors)):
            # output 是 ['out2', 'out1']
            training_tensor_shapes[output] = list(output_tensor.size())
            dtypes[output] = output_tensor.dtype
​
    # 构建输出值张量类型           
    eval_tensor_shapes = {}
    for key in training_tensor_shapes:
        eval_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])
        training_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])
​
    # 加载配置文件
    configuration_maps = {
        'module_to_stage_map': None,
        'stage_to_rank_map': None,
        'stage_to_depth_map': None
    }
    if args.config_path is not None:
        json_config_file = json.load(open(args.config_path, 'r'))
        configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
        configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
        configuration_maps['stage_to_rank_map'] = {
            int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
        configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)
​
    # 构建一个 StageRuntime
    r = runtime.StageRuntime(
        model=model, distributed_backend=args.distributed_backend,
        fp16=args.fp16, loss_scale=args.loss_scale,
        training_tensor_shapes=training_tensor_shapes,
        eval_tensor_shapes=eval_tensor_shapes,
        training_tensor_dtypes=dtypes,
        inputs_module_destinations=inputs_module_destinations,
        target_tensor_names=target_tensor_names,
        configuration_maps=configuration_maps,
        master_addr=args.master_addr,
        rank=args.rank, local_rank=args.local_rank,
        num_ranks_in_server=args.num_ranks_in_server,
        verbose_freq=args.verbose_frequency,
        model_type=runtime.TRANSLATION,
        enable_recompute=args.recompute)
​
    # stage needed to determine if current stage is the first stage
    # num_stages needed to determine if current stage is the last stage
    # num_ranks needed to determine number of warmup_minibatches in case of pipelining
    args.stage = r.stage
    args.num_stages = r.num_stages
    args.num_ranks = r.num_ranks
    if not is_first_stage():
        args.synthetic_data = True
​
    # define optimizer
    if args.no_input_pipelining:
        num_versions = 1
    else:
        # number of versions is the total number of machines following the current
        # stage, shared amongst all replicas in this stage
        num_versions = r.num_warmup_minibatches + 1
​
    # if specified, resume from checkpoint
    if args.resume:
        checkpoint_file_path = "%s.%d.pth.tar" % (args.resume, r.stage)
        assert os.path.isfile(checkpoint_file_path)
        print("=> loading checkpoint '{}'".format(checkpoint_file_path))
        checkpoint = torch.load(checkpoint_file_path)
        args.start_epoch = checkpoint['epoch']
        best_prec1 = checkpoint['best_prec1']
        r.load_state_dict(checkpoint['state_dict'])
        print("=> loaded checkpoint '{}' (epoch {})"
                .format(checkpoint_file_path, checkpoint['epoch']))
​
    # TODO: make this configurable by args
    # 建立 optimizer,使用了AdamWithWeightStashing 或者 SGDWithWeightStashing
    use_adam_optimizer = True
    if use_adam_optimizer:
        optimizer = adam.AdamWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
            macrobatch=args.macrobatch)
    else:
        optimizer = sgd.SGDWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, momentum=args.momentum,
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)
​
    if args.resume:
        optimizer.load_state_dict(checkpoint['optimizer'])
​
    cudnn.benchmark = True
​
    # 加载 dataset
    train_dataset = LazyParallelDataset(
        src_fname=os.path.join(args.data_dir, config.SRC_TRAIN_FNAME),
        tgt_fname=os.path.join(args.data_dir, config.TGT_TRAIN_FNAME),
        tokenizer=tokenizer,
        min_len=args.min_length_train,
        max_len=args.max_length_train,
        sort=False,
        max_size=None)
​
    val_dataset = ParallelDataset(
        src_fname=os.path.join(args.data_dir, config.SRC_VAL_FNAME),
        tgt_fname=os.path.join(args.data_dir, config.TGT_VAL_FNAME),
        tokenizer=tokenizer,
        min_len=args.min_length_train,
        max_len=args.max_length_train,
        sort=True)
​
    distributed_sampler = False
    if configuration_maps['stage_to_rank_map'] is not None:
        num_ranks_in_first_stage = len(configuration_maps['stage_to_rank_map'][0])
        if num_ranks_in_first_stage > 1:
            distributed_sampler = True
​
    # TODO: fix random seeds
    train_loader = train_dataset.get_loader(
        batch_size=args.batch_size, seeds=range(args.epochs),
        batch_first=False, shuffle=True,
        bucketing=not args.no_bucketing, num_workers=args.workers,
        world_size=r.num_ranks_in_first_stage,
        rank=r.rank_in_stage if r.stage == 0 else 0
    )
​
    val_loader = val_dataset.get_loader(
        batch_size=args.batch_size, batch_first=False,
        shuffle=True, num_workers=args.workers,
        world_size=r.num_ranks_in_first_stage,
        seeds=range(args.epochs),
        rank=r.rank_in_stage if r.stage == 0 else 0
    )
​
    # if checkpoint is loaded, start by running validation
    if args.resume:
        assert args.start_epoch > 0
        validate(val_loader, r, args.start_epoch-1)
​
    # 进行训练,保存checkpoint
    for epoch in range(args.start_epoch, args.epochs):
        if distributed_sampler:
            train_loader.sampler.set_epoch(epoch)
        adjust_learning_rate(optimizer, epoch, args.epochs, r, args.lr_policy)
​
        # train or run forward pass only for one epoch
        if args.forward_only:
            validate(val_loader, r, epoch)
        else:
            train(train_loader, r, optimizer, epoch)
​
            # evaluate on validation set
            prec1 = validate(val_loader, r, epoch)
            if r.stage != r.num_stages: prec1 = 0
​
            # remember best prec@1 and save checkpoint
            best_prec1 = max(prec1, best_prec1)
​
            should_save_checkpoint = args.checkpoint_dir_not_nfs or r.rank_in_stage == 0
            if args.checkpoint_dir and should_save_checkpoint:
                save_checkpoint({
                    'epoch': epoch + 1,
                    'arch': args.arch,
                    'state_dict': r.state_dict(),
                    'best_prec1': best_prec1,
                    'optimizer' : optimizer.state_dict(),
                    'tokenizer': tokenizer.get_state()
                }, args.checkpoint_dir, r.stage, epoch)
Copy the code

3.2 Training function

Let’s look at the train code

  • First, the Startup warm-up phase is entered, which needs to be executed until the output completes the forward propagation of the first small batch, corresponding to Startup State in the figure above.

  • Then the forward and backward transmission of subsequent small batches is carried out alternately. From this point, the Steady State in the figure above is entered. At each stage, for each small batch:

    • Forward propagation is implemented to push minibatch to downstream workers. That’s 1F.
    • If it is the last stage, update the loss.
    • Gradient zero clearing.
    • Load saved weights.
    • Backward propagation. That’s 1B.
    • Restore the latest weights. So far, 1F1B is done in this step.
    • Do the next step.
  • Finally, there is the remaining backward propagation, corresponding to the forward propagation during the warm-up phase.

def train(train_loader, r, optimizer, epoch): batch_time = AverageMeter() losses = AverageMeter() top1 = AverageMeter() top5 = AverageMeter() # switch to train mode n  = r.num_iterations(loader_size=len(train_loader)) if args.num_minibatches is not None: n = min(n, args.num_minibatches) r.train(n) if not is_first_stage(): train_loader = None r.set_loader(train_loader) end = time.time() epoch_start_time = time.time() if args.no_input_pipelining: num_warmup_minibatches = 0 else: Num_warmup_minibatches = R.NUM_WARMup_Minibatches # start NUM_Warmup_Minibatches Batches Forward passes It needs to be executed until the output completes the forward propagation of the first small batch, corresponding to the Start State in the figure above. For I in range(NUM_WARMup_MINIBATCHES): R.Un_Forward () # For I in range(N-num_warmup_MINIbatches): # perform Forward pass R.un_forward () Measure accuracy and record loss output, target, loss, NUM_tokens = R.Utput, R.target, R.loss.item (), r.num_tokens() losses.update(loss, Elapsed time batch_time.update(time.time() -end) end = time.time() epoch_time = (end - Epoch_start_time) / 3600.0 full_epoch_time = (epoch_time/float(I +1)) * float(n) else: # perform backward pass if args. Fp16: r.zero_grad() # perform backward pass if args. Fp16: r.zero_grad() # perform backward pass if args. Optimizer.zero_grad () # gradient clear optimizer.load_old_params() # load stash weight r.un_backward () # spread backward, Load_new_params () # restore the new weight optimizer.step() # At the same time, update the parameter # Finish remaining Backward passes # of the final backward spread, corresponding to the forward spread for I in range(NUM_warmup_minibatches): Optimizer.zero_grad () optimizer.load_old_params() # load stash weight r.un_backward () # spread backward, Load_new_params () # restore the new weight optimizer.step() # Wait for all helper threads to complete r.wait()Copy the code

The r of the above parameter is of type StageRuntime, so let’s look at run_forward and run_BACKWARD.

3.3 Forward propagation

Here are the run_forward and _run_forward methods of the StageRuntime class, which do the forward propagation.

def run_forward(self, recompute_step=False): ""Run forward pass. "" # Receive tensors from previous worker. Self. receive_tensors_forward() # Receive tensors = from previous stage Self. tensors[-1] # Run forward pass. Self. run_forward(sors) # Send tensors forward. Self.send_tensors_forward () # send to the next stage self.forward_stats.reset_stats() self.forward_minibatch_id += 1 def _run_forward(self, tensors): # Perform forward pass through model (self.modules_with_dependencies already # has modules in topological order). # Get module and corresponding input, Modules.modules_with_dependencies. Modules () all_input_names = self.modules_with_dependencies. All_input_names () All_output_names = self.modules_with_dependencies. All_output_names () output_names) in \ enumerate(zip(modules, all_input_names, all_output_names)): if i == (len(modules) - 1) and self.is_criterion: # If layer is criterion (loss). If self. Model_type == SPEECH_TO_TEXT: output = tensors["output"].transpose(0, 1).float() output_sizes = tensors["output_sizes"].cpu() target = tensors["target"].cpu() target_sizes = tensors["target_length"].cpu() input0_size = tensors["input0_size"].cpu() module_outputs = [module(output, target, output_sizes, target_sizes) / input0_size[0]] else: module_outputs = [module(tensors[input_name], tensors["target"]) for input_name in input_names] module_outputs = [sum(module_outputs)] else: Outputs = module(*[tensors[input_name] for input_names]) If not isinstance(module_outputs, tuple): Outputs = (outputs,) outputs = list(outputs) # Put the calculation results into tensors, Outputs for (output_names, module_output) in zip(output_names, module_outputs): Tensors [output_name] = module_output self.output = tensors[input_names[0]] If self.is_criterion and self.model_type == TRANSLATION: loss_per_batch = tensors[output_names[0]] * tensors[self.criterion_input_name].size(1) loss_per_token = loss_per_batch /  tensors["target_length"][0].item() self.loss = loss_per_token elif self.is_criterion: self.loss = tensors[output_names[0]] else: self.loss = 1Copy the code

3.4 Back Propagation

Running the run_BACKWARD of the engine completes the backward calculation.

def run_backward(self): Receive_tensors_backward () # Receive input gradients needed for backward pass.self.receive_tensors_BACKWARD () # Receive gradients needed for backward pass.self through modules in reverse order. inputs = {} outputs = {} input_gradients = {} output_gradients = {} # Get input and output names spanning all modules in this stage. all_input_names_set = set() all_output_names_set = set() # Get module and corresponding input, Modules.modules_with_dependencies. Modules () all_input_names = self.modules_with_dependencies. All_input_names ()  all_output_names = self.modules_with_dependencies.all_output_names() for (input_names, output_names) in zip(all_input_names, all_output_names): for input_name in input_names: all_input_names_set.add(input_name) for output_name in output_names: all_output_names_set.add(output_name) tensors = self.tensors.pop(0) # Set inputs, outputs, and output_gradients. # Only set outputs/output_gradients for tensors that are not inputs of # other modules in this stage. # Similarly, only set inputs for tensors that are not outputs of other # modules in this stage. for (module, input_names, output_names) in \ zip(reversed(modules), reversed(all_input_names), reversed(all_output_names)): for output_name in output_names: if output_name not in all_input_names_set: if output_name not in self.gradients: output_gradients[output_name] = None else: Gradients [output_name] = self.gradients[output_name] if tensors[output_name]. Requires_grad: outputs[output_name] = tensors[output_name] for input_name in input_names: if input_name not in all_output_names_set: inputs[input_name] = tensors[input_name] # Hook to record input gradients. def hook_wrapper(input_name): def hook(input_gradient): input_gradients[input_name] = input_gradient return hook for input_name in inputs: if input_name ! = "input0" and input_name ! = "input1" and input_name ! = "input2" \ and inputs[input_name].requires_grad: inputs[input_name].register_hook(hook_wrapper(input_name)) if "loss" in outputs: Outputs ["loss"] *= self.loss_scale # Perform backward pass. Outputs [outputs[output_name] for output_name in outputs]), grad_tensors=tuple([output_gradients[output_name] for output_name in outputs])) # Input tensors don't need gradients. for input_name in inputs: if not inputs[input_name].requires_grad: self.gradients[input_name] = inputs[input_name] continue if input_name ! = "input0" and input_name ! = "input1" and input_name ! = "input2" and input_name ! = "input": self.gradients[input_name] = input_gradients[input_name] # Send output gradients. self.send_tensors_backward() # Send self.gradients to the next layer of the reverse graph if self.verbose_freq > 0 and self.backward_minibatch_id % self.verbose_freq == 0: self.backward_stats.print_stats() self.backward_stats.reset_stats() self.backward_minibatch_id += 1Copy the code

Let’s use the previous picture to deepen the impression.

Sending logic:

 StageRuntime            CommunicationHandler              send_helper_thread
​
      +                           +                                 +
      |                           |                                 |
      | 1                         |                                 |
      v                           |                                 |
 run_backward                     |                                 |
      |                           |                                 |
      | 2                         |                                 |
      |                           |                    wait on backward_send_queues
      v                  3        v                                 |
send_tensors_backward +--------> send                               |
                                  |                                 |
                                  |                                 |
                                  |  4                              |
                                  v               5                 v
               backward_send_queues.add(tensor) +----> tensor = queue.remove()
                                                notify              |
                                                                    |
                                                                    | 6
                                                                    v
                                                                  _send
                                                                    |
                                                                    | 7
                                                                    |
                                                                    v
                                                                 dist.send
Copy the code

Acceptance logic:

StageRuntime CommunicationHandler recv_helper_thread + + + | | | | 1 | | | | | 4 v | v run_backward | _recv | | | | | | | | | 5 | | | | 2 | v | | dist.recv / dist.broadcast | | | v 3 v | receive_tensors_backward +---------> recv | + | | | |  | | | | | | | | v | | backward_receive_queues.remove() | | | | | | | | | | | | | | wait on backward_receive_queues | | | | | | | | | | | | 6 v | backward_receive_queues <-------+ queue.add(tensor) | | notify | | 7 v 3 return | gradients[output_name] <---------------+Copy the code

3.5 Weight Stashing

Weight is implemented by OptimizerWithWeightStashing Stashing.

A lot of minor code is omitted, load_old_params and load_new_params are called during training.

class OptimizerWithWeightStashing(torch.optim.Optimizer): """Wrapper class that adds weight stashing to a vanilla torch.optim.Optimizer. Arguments: - optim_name: the name of optimizer, required to create the corresponding base_optimizer (torch.optim.{optim_name}). - optimizer_args: the keyword arguments passed to base_optimizer. """ def __init__(self, optim_name, modules, master_parameters, model_parameters, loss_scale, num_versions, verbose_freq=0, macrobatch=False, **optimizer_args): self.modules = modules self.master_parameters = master_parameters self.model_parameters = model_parameters # model_parameters is None if not fp16. self.loss_scale = loss_scale # Only need at most 2 versions if using macrobatching. if macrobatch: num_versions = min(2, num_versions) self.num_versions = num_versions self.base_optimizer = getattr(torch.optim, optim_name)( master_parameters, **optimizer_args) self.latest_version = Version() self.current_version = Version() self.initialize_queue() self.verbose_freq = verbose_freq self.batch_counter = 0 # If macrobatching, push and pop versions at the right rate. if macrobatch: self.update_interval = self.num_versions else: self.update_interval = 1 def initialize_queue(self): self.queue = deque(maxlen=self.num_versions) for i in range(self.num_versions): Append (self.get_params(clone=True)) self.buffered_state_dicts = self.queue[0][0] # stash weght def load_old_params(self): if self.num_versions > 1: Self.set_params (*self.queue[0]) # load_new_params(self) def load_new_params(self): if self.num_versions > 1: Self.set_params (*self.queue[-1]) # load new weight def zero_grad(self): Reset if self.batch_counter % self.update_interval == 0: self.base_optimizer.zero_grad() def step(self, closure=None): """Performs a single optimization step. Arguments: closure (callable, optional): A closure that reevaluates the model and returns the loss. """ # every update_interval steps updates A gradient if self.batch_counter % self.update_interval ! = self.update_interval - 1: Self.latest_version = self.latest_version.incr() self.latest_version.incr() So add the version number if self.num_versions > 1: Self.buffered_state_dicts = self.queue[0][0] self.queue. Append (self.get_params(clone=False) self.batch_counter += 1 return lossCopy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Lingvo framework day reading notes

Tensorflow implements the accumulation of multiple minibatch-computed gradients before propagating them back

Gradient accumulation is realized by tensorflow2

Tenfold model calculation time increased by only 20% : OpenAI open source gradient replacement plugin

PipeDream: Fast and Efficient Pipeline Parallel DNN Training

Paper interpretation series 5: Microsoft Stanford and other PipeDream fast training large-scale neural network

Cs231n. Making. IO/neural – netw…

www.cnblogs.com/geekfx/p/14…

Video memory optimization techniques during training — OP merge and Gradient Checkpoint

Pytorch Note 04- Customize torch. Autograd. Function

PyTorch tutorial Autograd

A simple definition and case study of Torch. Autograd. Function

Pytorch’s custom extension (2) — Torch. Autograd. Function completes the custom layer

Torch. Autograd: Gradient calculation in detail

Back Propagation

CS231n Course Notes Translation: Backpropagation notes

Pytorch Distributed training

torch.distributed

Why is the GPT-3 model difficult to reproduce? This is probably the best design for a distributed AI framework

Su Qidong – Pipedream