0 x00 the

GPipe is a neural network training parallel library based on Lingvo (Lingvo is Google’s tensorflow-based framework focusing on sequence models), supporting very large scale models. This paper introduces its basic functions and pipelining mechanism.

0 x01 overview

1.1 What is GPipe

GPipe is a framework based on Lingvo (Google’s TensorFlow secondary development framework focused on sequence models github.com/tensorflow/…). The neural network training parallel library is developed to support super-large scale models. Its characteristics are as follows:

  • GPipe divides an L layer network into K composite layers. Each composite layer runs on a separate TPU core.
  • The K core Composite layers can only be executed sequentially, but GPipe introduces the pipelined parallelism strategy to alleviate the performance problem of sequential execution, and subdivides the mini-batch into several smaller Macro-batches to improve the parallelism.
  • GPipe also uses a simple and effective technique called Recomputation to reduce memory, further allowing training of larger models.

1.2 the challenge

Deep learning framework is essentially a Graph compilation and execution engine based on Operator expressions between tensors. It provides a series of Tensor definitions, unary operations, binary operations and other mathematical primitives. Gradient derivation and model update are performed automatically according to Back Propagation algorithm. After a large number of data flow into the calculation graph in batches for model training, the model can learn the internal correlation of the data, so as to acquire the “intelligent” perception and judgment ability in the corresponding scene.

The goal of DNN training is to obtain a high-precision model in the shortest possible time. This goal can be achieved through two indicators:

  • Statistical efficiency, the number of calendar elements required to achieve the desired accuracy;
  • Hardware efficiency, the time required to complete a single calendar element. The total training time to achieve the desired level of accuracy is just the product of these two indicators;

GPU mainly provides two kinds of resources: computing resources and video memory bandwidth resources. So there are two basic challenges to training large models: video memory efficiency and computational efficiency.

The ultimate goal of performance optimization of deep learning framework is to train the deep learning model the fastest, so as to shorten the training time and save the model training development cycle and the time cost of users.

0x02 Parallel mechanism

The industry uses parallel mechanisms to achieve optimization.

2.1 Mechanism classification and trade-offs

This section mainly refers to the following articles:

Efficient Large-Scale Language Model Training on GPU Clusters

DeepSpeed: Extreme-scale model training for everyone

DeepSpeed: a large-scale model training tool available to all

PipeDream: Fast and Efficient Pipeline Parallel DNN Training

In the paper “Efficient Large-scale Language Model Training on GPU Clusters”, NVIDIA introduced three necessary parallel techniques for distributed Training of super-scale models:

  • Data Parallelism
  • Tensor Model Parallelism
  • Pipeline Model Parallelism

2.1.1 Data Parallelism

Data Parallelism is the most common method. Its characteristics are as follows:

  • The model is replicated on multiple worker machines, and each GPU maintains a full copy of the model.
  • The input data set can be partitioned across multiple Gpus. Each batch of input training data is divided among data parallel workers. Each worker processes a subset of the training data.
  • Use set communication primitives or parameter servers to periodically synchronize weights with other Gpus.
  • After backpropagation, communication and gradient specification are required to ensure that the optimizer performs the same update on each worker. That is, the weight updates calculated by a single worker are aggregated to obtain the final weight updates reflecting all input updates.
  • The amount of data passed by each aggregation is proportional to the size of the model.

Data parallelism has several obvious advantages, including high computational efficiency and low implementation workload, which makes data parallelism training work well on some popular models with high computational communication ratio, but there are several important trends that threaten its effectiveness:

  • Video memory efficiency: data parallelism will replicate models and optimizers among all workers, so the video memory efficiency is not high.
  • Calculation efficiency: As we improve the parallelism, the amount of calculation performed by each worker is constant. Data parallelism can scale almost linearly on a small scale. However, the communication cost of the regulated gradient between workers is positively correlated with the size of the model, so when the model is large or the communication bandwidth is low, the computing efficiency will be limited. The rapid growth of GPU computing power further shifts the bottleneck of training to cross-model communication. In addition, the frequency of parameter synchronization affects statistics and hardware efficiency.
  • Scalability: The increasing model size increases the traffic per aggregation. In fact, some widely used models are large enough that communication exceeds computation time, limiting scalability and dominating total training time. Moreover, the batch size of data parallelism increases with the number of workers, and we often cannot keep increasing the batch size without affecting the convergence.

2.1.2 Model parallelism

Model parallelism has traditionally been used during training for models that are too large to be kept in worker memory or cache. Its characteristics are as follows:

  • Model parallelism involves dividing the model among workers so that each worker evaluates and updates only a subset of model parameters. This can be divided into inter-layer parallelism and intra-layer model parallelism.
  • Interlayer model parallelism will divide each layer of the model among multiple workers.
  • The model parameters of each layer are segmented to multiple devices in parallel. In some papers, it’s called “Tensor level model parallelism, “which means that you have a Tensor segmentation of a Layer like Linear/Dense Layer Variable, So the large model Tensor is divided into several relatively small Tensor for parallel calculation;
  • Interlayer values (activation and gradient) are often the only parameters that need to be communicated across machines.

By its very nature, the computation and communication of model parallelism vary from model structure to model structure, so there is a lot of work to implement.

However, even if model parallelism can train very large models, traditional model parallelism can lead to severe underutilization of computing resources because it only actively uses one worker at a time (if each layer is assigned to one worker), or cannot overlap computation and communication (if each layer is partitioned).

  • Video memory efficiency: Model parallel DNN training leads to serious underutilization of GPU resources. Model parallelism can reduce video memory usage proportionally according to the number of workers by dividing and activating video memory among model parallel workers. Crucially, this is the only way to reduce the amount of active video memory in a single network layer.
  • Computational efficiency: Due to the need for additional communication activation values in each forward and back propagation, the computational efficiency of model parallelism is very low. Model parallelism requires high communication bandwidth and cannot be well extended to nodes with limited communication bandwidth. In addition, the parallel worker in each model will reduce the amount of computation executed between each communication stage, thus affecting the calculation efficiency. Model parallelism is often used in conjunction with data parallelism to create a trade-off between memory and computational efficiency.
  • Development efficiency: The burden of partitioning models across multiple Gpus is left to the programmer, and determining how to best partition DNN models among workers is a challenging task for even the most experienced machine learning practitioners, which often leads to additional inefficiencies. Some recent work has explored how enhanced learning can be used to automatically determine device locations for model parallelism. Unfortunately, such online decision techniques are time – and resource-intensive; Nor do they seamlessly combine pipelining, data parallelism, and model parallelism.

2.1.3 Pipeline Parallelism

Pipeline Model Parallelism is called pipeline-level Model Parallelism in some papers.

  • The whole network is divided into stages, with different segments on different devices. The flow of the previous and later stages works in batches and runs in parallel in a “relay” way.
  • Pipelining parallelism divides the layers of a model into stages that can be processed in parallel. When a phase completes a micro-batch forward transfer, the active memory is communicated to the next phase in the pipeline. Similarly, when the next phase completes the backpropagation, the gradient will be backcommunicated through the pipe. Multiple micro-batches must be computed simultaneously to ensure that the phases of the pipeline can be computed in parallel.
  • Pipeline parallel training may provide high DNN training performance when data parallelism is difficult. In particular, communication between workers can be limited to activations (on the forward channel) and gradients (on the back) between adjacent layers assigned to different workers.

But pipelined parallelism still has some problems:

  • Video memory efficiency: The video memory reduced by pipeline parallel is proportional to the number of stages of the pipeline, so that the size of the model can expand linearly with the number of workers. However, pipelining parallelism does not reduce the display memory footprint of the activation function at each layer. In addition, each worker must store activation values for each micro-batch running simultaneously. This results in roughly the same amount of active memory for the first phase of the pipeline as the total active memory for a single mirCO batch.
  • Computational efficiency: Pipelined parallelism has the lowest traffic because the traffic is only proportional to the level of activation at each stage boundary. However, it cannot be extended indefinitely. As with model parallelism, increasing the pipeline size reduces the amount of computation per pipeline stage, which reduces the ratio of computation to communication. In order to achieve good computational efficiency, pipelining parallelism also requires perfect balance of computational load at each stage. In addition, pipeline parallelism causes bubble overhead at the beginning and end of each batch due to the need to refill or empty the pipeline.
  • Development efficiency: THE bidirectional nature of DNN (forward passing and then reverse passing of the same layer) makes pipelining challenging. More importantly, a simple pipelining mechanism introduces up-to-date calculations of outdated weights, resulting in lower accuracy of the final model than data parallel training.

2.2 How to Use it

Given a specific neural network model and a batch of computing resources, there are many ways to map from task to device, but different mapping schemes have different running efficiency. Which solution is optimal depends both on the nature of the job itself and on the topology of the underlying hardware.

Neural network consists of many local computation structures. Generally speaking, different operators of the same neural network may be suitable for different parallel modes. Whether a local computation uses data parallelism or model parallelism depends on the computation-transfer ratio of the local task. Such as:

  • For some operators (such as convolution), the number of operation parameters is small, but the number of intermediate results is large. In order to reduce the amount of transmission, the most cost-effective method is data parallelism. That is, data is segmented, different devices process different data, and parameters are occasionally synchronized between devices.
  • Some operators are suitable for model parallelism if the intermediate calculation results are less than the number of parameters.
  • There are also some operators, the number of network parameters/intermediate calculation results are large, may be the flow parallel (that is, relay form) is optimal.
  • Compared with one operator using only a single parallel mode, one operator can also use multiple parallel modes at the same time to further reduce the transmission amount. For example, in a place with a large hidden layer, it may cut data matrix and model matrix at the same time.

Therefore, choosing the optimal parallel mode for each task is a very complex problem, which needs to be analyzed on a case-by-case basis.

0x03 Pytorch Manually specifies the parallel mode

At present, most of the existing deep learning frameworks provide native support for data parallelism, but the support for model parallelism is not perfect. If users want to assign model parameters to different devices, they often encounter problems such as manually specifying the model segmentation method and manually writing the data communication logic code.

Let’s take a look at how Pytorch can be manually specified.

Pytorch.org/tutorials/i…

3.1 Basic Knowledge

PyTorch takes the Tensor as its basic unit, which is more in line with the intuition of an algorithm engineer writing Python scripts. It models and trains in an object-oriented way. You’ll assign to the Tensor, slice it, and it’s as easy to use as Numpy.

PyTorch has a single-card perspective. The Tensor on one device has nothing to do with the Tensor on another device, and there are no obvious flaws in PyTorch’s design where the scripts on each device are perfectly symmetrical and Mirror the simplest data parallelism. Scripts on each device run to the same batch of Optimizer and AllReduce to complete data parallelism. This is PyTorch’s DDP (DistributedData aparallel) module.

But if you want to slice a Tensor across different machines in the distributed case, you have to build the transport manually, which is like programming the physics directly, so the threshold for distributed use is higher.

3.2 the characteristics of

The PyTorch model splits individual models into different Gpus in parallel, rather than replicating the entire model on each GPU (specifically, assuming model M contains 10 layers. With DataParallel, each GPU has a copy of each of the 10 layers, whereas with model parallelism on two Gpus, each GPU can host 5 layers.

The high-level idea of model parallelism is to place the different subnetworks of the model on different devices and implement the forward method accordingly to move the intermediate output across devices. Because part of the model only runs on any single device, a set of devices can collectively serve a larger model.

3.3 Basic Usage

Let’s start with a toy Model with two linear layers. To run this model on two Gpus, simply place each linear layer on a different GPU, and then move the input and intermediate outputs to match the layer devices.

import torch import torch.nn as nn import torch.optim as optim class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.net1 = torch.nn.Linear(10, Self.net = torch. Nn. Relu () self.net2 = torch. 5). To ('cuda:1') # def forward(self, x): x = self.relu(self.net1(x.to('cuda:0'))) return self.net2(x.to('cuda:1'))Copy the code

Note that for ToyModel, the above is very similar to implementing this functionality on a single GPU, except for the five TO (Device) calls for placing Linear layers and tensors on appropriate devices. This is the only part of the model that needs to be changed (to(Device)). Backward () and Torch. Optim automatically pay attention to gradients as if the model were a GPU. When calling the loss function, just make sure that the label and output are on the same device.

model = ToyModel() loss_fn = nn.MSELoss() optimizer = optim.SGD(model.paraeters(), Lr =0.001) optimizer.zero_grad() outputs = model(torch. Randn (20, 10)) labels = Torch. Randn (20, 10) outputs = Torch. 5). To ('cuda:1') # ToyMode output is on 'cuda:1' Here the label should also be placed on 'CUDa :1' loss_fn(outputs,labels).backward() optimizer.step()Copy the code

3.4 Apply model parallelization to existing modules

You can run an existing single GPU module on multiple Gpus with just a few line changes. The following code shows how to decompose torchvision. Models. Reset50 () into two GPU. The idea is to inherit from the existing ResNet module and split the layer into two Gpus during the build process. Then, overwrite the forward method to stitch the two subnets together by moving the intermediate output accordingly.

from torchvision.models.resnet import ResNet, Bottleneck num_classes = 1000 class ModelParallelResNet50(ResNet): def __init__(self, *args, **kwargs): super(ModelParallelResNet50, self).__init__( Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs) self.seq1 = nn.Sequential( self.conv1, self.bn1, self.relu, self.maxpool, self.layer1, Self. Seq2 = nn.Sequential(self.layer3, self.layer4, self.avgpool, self. ).to('cuda:1') # place on the 2nd GPU self.fc.to('cuda:1') def forward(self, x): x = self.seq2(self.seq1(x).to('cuda:1')) return self.fc(x.view(x.size(0), -1))Copy the code

During the entire execution, one of the two Gpus will be idle. To solve this problem, one option is to further divide each batch into a split pipeline so that when one split reaches the second subnet, the next split can be fed into the first subnet. This way, two consecutive splits can run simultaneously on both Gpus.

In cases where the model is too large to fit into a single GPU, the above implementation solves this problem. However, you may have noticed that if the model is right, it will run slower than running on a single GPU. This is because only one of the two Gpus is working at any one time, and the other is there doing nothing. Between Layer2 and Layer3, the intermediate output needs to be copied from CUDA :0 to CUDA :1, which further deteriorates performance.

3.5 Acceleration through Pipelining Inputs

In the following experiment, we further divided each batch of 120-image into 20-image. When PyTorch asynchronously starts CUDA operations, the implementation can achieve concurrency without generating multiple threads.

class PipelineParallelResNet50(ModelParallelResNet50):
    def __init__(self, split_size=20, *args, **kwargs):
        super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
        self.split_size = split_size
​
    def forward(self, x):
        splits = iter(x.split(self.split_size, dim=0))
        s_next = next(splits)
        s_prev = self.seq1(s_next).to('cuda:1')
        ret = []
​
        for s_next in splits:
            # A. s_prev runs on cuda:1
            s_prev = self.seq2(s_prev)
            ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
​
            # B. s_next runs on cuda:0, which can run concurrently with A
            s_prev = self.seq1(s_next).to('cuda:1')
​
        s_prev = self.seq2(s_prev)
        ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
​
        return torch.cat(ret)
​
​
setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
    stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)
​
plot([mp_mean, rn_mean, pp_mean],
     [mp_std, rn_std, pp_std],
     ['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
     'mp_vs_rn_vs_pp.png')
Copy the code

Note that the device-to-device tensor copy operation is synchronized over current Streams on both the source and target devices. If you create multiple streams, you must ensure that the replication operations are properly synchronized. Writing to the source tensor or reading/writing to the target tensor before completing the replication operation may result in uncertain behavior. The above implementation only uses the default stream on both source and target devices, so there is no need to force additional synchronizations.

0x04 Key Technologies

Because the candidate set of parallel strategies for each model is exponential, it takes a lot of time and computing resources for algorithm engineers to manually select an appropriate parallel strategy, and there are too many related issues for algorithm engineers to consider, such as: How to allocate memory, how to interact between layers, how to reduce the communication cost, how to determine the original mathematical model, how to determine the shape of the tensor, how to determine the input and output, etc.

Therefore, automatic parallelism (how to automatically solve the problem of parallel strategy selection from the framework level) has become a research hotspot.

Automatic parallelism is expected to liberate algorithm engineers from the selection and configuration of parallel strategies by building cost models to predict and select an optimal parallel strategy (it cannot be guaranteed to be the optimal strategy for the moment, because picking the optimal strategy is a NP-hard problem).

Therefore, there are several necessary parallel techniques for distributed model training at present:

  • Flow parallel, especially how to automatically set flow;
  • Gradient accumulation;
  • Backward recalculation;
  • 1F1B strategy (we will use PipeDream analysis);

Let’s look at how these techniques can be used with the Gpipe code.

0x05 Basics & Support Systems

5.1 Lingvo framework

5.1.1 Core Components

Lingvo’s core components are as follows:

  • Models: A Model is an abstract collection that contains one or more tasks. The Model acts as a wrapper layer for Task. For the multi-Tasks Model, the Model controls which variables will be shared between tasks and how each Task is sampled during training.
  • Tasks: A task is a description of a complete optimization problem, such as image classification or speech recognition. Contains the Input Generator.
  • Layers: A Layer represents a random function with trainable parameters. A Layer can contain other layers as children. Softmax, LSTM, Attension and even a task are examples of Layer.
  • Params: This object contains the hyperparameters of the model. Layers, Tasks, and Models are all constructed using specifications in Params. Params are hierarchical. An object’s Params can contain the Params of its child object.
  • NestedMap: A dictionary structure for passing data. Most of the Python objects in your code are either instances of Tensor, or a subclass of BaseLayer or NestedMap.

5.1.2 Model definition

In Lingvo, a network is a nested structure of layers. Most classes in Lingvo are subclasses of [Lingvo/core/base_layer.py] BaseLayer.

  • Params: Used to configure the class, defines the keys needed for the configuration, which should be defined when the object is created. Params objects can also contain Params objects that are used to configure child layers. Each Layer class has a Params classMethod, which creates a new Params object, configures the layer with defined keys, and gives reasonable defaults for keys.

    Properties in Params objects include:

    • clsThe Python class associated with the tParams object. This can be used to construct instances of classes;
    • name: The name of the layer;
    • dtype: The default data type to use when creating variables.
  • __init__ constructor: All sublayers and variables should be created here.

  • CreateVariable: Method for creating variables. Each Layer is responsible for creating and managing its own variable.

  • CreateChild: Method for creating child layers.

  • FProp: All layers have a FProp() function that propagates the layer forward and will be called when calculating the forward step. Because it can be performed on different devices during distributed training, Lingvo accesses variable through the theta argument, rather than through self.vars or self.theta, for performance reasons.

  • FPropMeta: Returns metadata for this layer about FProp calculations. Where META. Flops gives the estimated number of Floating point operations when getting an input tensor.

For the landing of the model algorithm, two indicators are particularly important:

  • The computing force required for forward propagation reflects the performance requirements of hardware such as GPU.
  • Parameter number, which reflects the amount of memory occupied.

Next, we need to see how to calculate the memory size of the model training, and how to calculate the computation force (required later for pipeline parallelism).

5.2 Computing Memory

5.2.1 Overall analysis

We mainly refer to the idea of ZeRO: Memory Optimization Towards Training A Trillion Parameter Models.

During model training, most of the memory is consumed in one of three ways:

  • I) activation.
  • Ii) OGP state, i.e., a tensor composed of optimizer state, parameter gradient and parameter itself.
  • Iii) Temporary buffer zone.

The amount of video memory consumed by the input data is actually small because we tend to use iterators to read the data, which means that we don’t actually read all the data into video memory at once. This ensures that the amount of video memory consumed by each input is insignificant compared to the entire network parameters.

Let’s go through them one by one.

5.2.2 Activation function

The activation function has the following characteristics:

  • The extra video memory consumed by the activation function increases with the batch size. When batch is set to 1, training the trillion-parameter model will generate more than 1 TB of video memory used by the activation function.
  • Existing solutions such as Training deep Nets with Sublinear Memory cost can eliminate almost all memory required for activation at the cost of 33% recalculation overhead. This technique is called gradient Checkpointing, or sometimes rematerialization, re-forward.

5.2.3 requires OGP state

5.2.3.1 Parameters of the model

The parameters of the model itself refer to the Weight and Bias of each network layer, which will be occupied after the model is loaded. It should also be noted that some layers have parameters, such as CNN, RNN; Some layers have no parameters, such as activation layer, pooling layer, etc.

5.2.3.2 Optimizer Parameters

Optimizer parameters refer to the parameters generated by the model in the optimization process, namely back propagation. These parameters mainly refer to DW, namely gradient. In SGD, its size is the same as that of parameters, so the video memory occupied by model parameters will be doubled during optimization.

It is worth noting that different optimizers need to save different optimization parameters. For Adam, because other parameters need to be saved, the number of parameters of the model will be quadrupled in the optimization interval.

5.2.3.3 sample

For OGP states, let’s take ADAM as an example. ADAM is used to train the mixed precision of the model with ψ parameters.

  • Model parameters: FP16 copy of the parameters, memory requirement 2 ψ bytes.
  • Gradient: The fp16 copy of the gradient requires 2 ψ bytes of memory.
  • Optimizer state: FP32 copies of parameters, Momentum, and Variance, with memory requirements of 4 ψ, 4 ψ, and 4 ψ bytes, respectively. Let’s express the memory multiplier of the optimizer states in terms of K, that is, the extra memory required to store them is K ψ bytes.

Overall, the OGP state has 2 ψ +2 ψ +K ψ =16 ψ bytes (mixing precision ADAM’s K=12) of memory requirements.

Details are as follows:

Blue is the parameter, orange is the gradient, and green is the optimizer state.

In the memory consumption formula, ψ represents the model size (number of parameters), K represents the memory multiplier of the optimizer state, and Nd represents the degree of data parallelism. In this example, we assume model size ψ = 7.5 billion, mixed precision training based on Adam optimizer, data parallelism Nd=64 (i.e. 64 Gpus), K=12.

For a model like GPT-2 with 1.5 billion parameters, this results in a memory requirement of at least 24GB, far more than the 3GB required to store fp16 parameters separately.

5.2.4 Temporary buffer

Temporary buffers are buffers used to store temporary results, for example, fp32 buffers would require 6GB of memory for a GPT-2 model with 1.5 billion parameters.

5.3 Calculate the force

5.3.1 Background

  • 2. FLOPS: Floating point operations per second Is a measure of hardware performance.
  • FLOPs: Note the s, small for Floating point Operations. Can be used to measure the complexity of the algorithm/model.

The computing power required for forward propagation is represented by FLOPs. What about FLOPs?

As we know, when a model is propagated forward, operations such as convolution, pooling, BatchNorm, Relu and Upsample will be carried out. All these operations will have their corresponding computational force consumption, among which the corresponding computational force consumption of convolution is the highest proportion. So, let’s take the convolution operation as an example, and look at the computational force associated with convolution.

Derivation process: The convolutional layer wx + b needs to be calculated in two parts. Firstly, the calculation amount of the first half of wx is considered:

To:

  • K represents the convolution kernel size;
  • C represents the number of input feature map;

For a single Unit on the output feature map, there is:

K times K times c multiplication, and k times k times c minus 1 addition

If the resolution of the output feature map is H * W and o feature maps are output, the total number of outputting feature maps including units is H * W * O.

Therefore, when calculating WX, the convolution layer:

K * K * C * H * W * O times multiplication -- (1) (k * k * c-1) * H * W * O times addition -- (2)Copy the code

Then consider the calculation quantity contained in bias term B:

As only addition operation exists in B, each Unit on the output feature map performs bias item addition once. Therefore, when computing bias terms, the convolution layer contains:

H times W times O addition -- (3)Copy the code

The calculation times of wx and B of the convolution layer are summed up as follows:

Formula (1) multiplication:

K times K times C times H times W times OCopy the code

Addition of Formula (2) + Formula (3) :

(k * k * c - 1) * H * W * o  + H * W * o  = k * k * c * H * W * o
Copy the code

It can be seen that Equation (2) + (3) = Equation (1)

Therefore, for the convolutional layer with a bias term, the computational force consumption of this layer is:

k * k * c * H * W * o

5.3.2 LingVO

In Lingvo, computing power is done by FPropMeta for each class, and these methods are implemented for each class according to its own characteristics. Let’s look at a couple of examples of how to compute FLOPS.

Conv2DLayerNoPadding is calculated as follows:

  @classmethod
  def FPropMeta(cls, p, inputs):
    py_utils.CheckShapes((inputs,))
    b, h, w, c = inputs
    fh, fw, ic, oc = p.filter_shape
    assert ic == c
    sh, sw = p.filter_stride
    if p.padding == 'SAME':
      oh = sympy.ceiling(h / sh)
      ow = sympy.ceiling(w / sw)
    else:
      oh = sympy.ceiling((h - fh + 1) / sh)
      ow = sympy.ceiling((w - fw + 1) / sw)
    flops = b * oh * ow * fh * fw * ic * oc * 2  # mul/add counts as 2 flop.
    outputs = tshape.Shape([b, oh, ow, oc])
    return py_utils.NestedMap(flops=flops, out_shapes=(outputs,))
Copy the code

The DropoutLayer is calculated as follows:

​
  @classmethod
  def FPropMeta(cls, p, inputs, *args):
    py_utils.CheckShapes((inputs,))
    flops_per_element = 10  # Approximately 10 flops per element.
    return py_utils.NestedMap(
        flops=inputs.num_elements() * flops_per_element, out_shapes=(inputs,))
​
Copy the code

The FLOPS of BatchNormLayer are calculated as follows.

  @classmethod
  def FPropMeta(cls, p, inputs, padding=None):
    py_utils.CheckShapes((inputs,))
    return py_utils.NestedMap(
        flops=inputs.num_elements() * _BN_FLOPS_PER_ELEMENT,
        out_shapes=(inputs,))
Copy the code

ActivationLayer is calculated as follows:

  @classmethod
  def FPropMeta(cls, p, inputs):
    py_utils.CheckShapes((inputs,))
    return py_utils.NestedMap(
        flops=inputs.num_elements() * GetFlops(p.activation),
        out_shapes=(inputs,))
Copy the code

0 x06 line

6.1 Background

6.1.1 trouble spots

Communication problems

Both data parallelism and model parallelism will carry out fully connected communication between corresponding machines. When the number of machines increases, the communication cost and delay will be unbearable.

For example, in parameter server usage, the three-step process is as follows: Pull weight > Compute new weight > Push new weight.

If the three-step process communicates and calculates sequentially, no matter how fast or slow the communication is, this time overhead will result in each iteration taking longer in a distributed environment than in a stand-alone version (Ethernet bandwidth or latency only affects this time, but does not reduce it to zero). Therefore, it is almost a necessary step to combine communication and calculation overlap so as to cover up communication time. Overlapping computing and transmission to improve device utilization is a challenge.

Underutilise

In principle, we can train larger DNN models on GPU or TPU through parallel computation. However, due to the sequential nature of DNN, this method may result in only one accelerator being active during the calculation, and the computing power of the equipment cannot be fully utilized.

6.1.2 How do I Design the SYSTEM

Going back to the training process of neural networks, how do you design systems to overlap computation and communication?

There are two features that can be used in backward propagation:

  • First of all, the computation of neural network is completed layer by layer, whether it is forward or backward propagation, the calculation of this layer can be finished before the next layer;
  • On the other hand, in the backward propagation process, once the later layer takes the input of the previous layer, the calculation of this layer is no longer dependent on the previous layer.

Therefore, pipelined parallelism is introduced according to this characteristic.

Both data parallelism and model parallelism allow devices to perform computation at the same level, while flow parallelism divides tasks into several stages with a clear sequence, and distributes different stages to different computing devices, so that a single device is only responsible for hierarchical computation in the middle of the network. The model network is asymmetrically distributed across devices that “relay” a portion of the network. Only one Tensor needs to be transmitted between each stage and the next. The amount of data transmitted by each machine is independent of the total network size, the number of machines, and the scale of parallelism.

In this mode where multiple devices relay to complete a network calculation, larger models or larger Batch sizes can be supported. If communication and calculation overlap is good, it can solve the problem of communication overhead between machines.

In general, flow parallel has advantages in general large model training. Flow parallel has a small amount of data transmission, only the sum of the amount of data to be transmitted between stages, unlike data parallelism and model parallelism, which is related to the whole calculation graph. Therefore, flow parallelism tends to be used for machines with small bandwidth. But in some cases, the combination of model parallelism and model parallelism is better than model parallelism and stream parallelism alone. At the same time, there is also an optimization in data parallelism and model parallelism that the computation time covers the transmission time.

6.2 Overview of Gpipe assembly line

6.2.1 Key Points

Conceptually, GPipe is a distributed machine learning library that is trained using synchronous stochastic gradient descent and pipelining parallelism, suitable for any Deep Neural Networks (DNN) consisting of multiple ordered layers. Gpipe splits the model across different accelerators and automatically divides a small batch of training examples into smaller batches. The model allows the accelerator of GPipe to operate in parallel, maximizing scalability during training.

GPipe has several key points:

  • Network partition: A Network at Layer N is divided into K partitions. Each partition runs on an independent TPU. Some Network communication operations need to be performed between the partitions.
  • Pipeline Parallelism: Deep learning uses the same Pipeline concurrency technology in cpus to better rearrange computing and network communication. That is, the training sample of mini-Batch is automatically divided into smaller micro-batch and run in pipeline so that the TPU core can operate in parallel.
  • Gradient Accumulation: Gradients are accumulated in micro-batch all the time, so the number of partitions does not affect model quality.
  • Re-materialization (re-calculation) : Re-materialization specifically means that in the process of forward calculation, GPipe only records the output at the stage division. When calculating the gradient, GPipe will re-execute the forward calculation logic to obtain the forward results of each operator, and then calculate the gradient results. Like OpenAI open source Gradient-CheckPointing, but GPipe is implemented on TPU, OpenAI can only run on GPU.

6.2.2 graphic

  • On the left is the original model.

  • The GPipe model, which shows a neural network with multiple ordered layers, is divided into four accelerators. Fk is the compound forward calculation function of KTH partition. Bk is its corresponding back propagation function. Bk depends on intermediate activation functions from upper Bk+1 and Fk.

    • In the model above, we can see how the sequential nature of the network can lead to under-utilization of resources.
    • The following model shows the GPipe approach, in which an example of a small batch input is divided into smaller microbatches that can be processed simultaneously by an accelerator.

6.2.3 problem

We put forward a few questions according to the idea of the paper, and strive for a steed in the future.

  • How are stages divided?

    • The model is divided into several successive stages, each corresponding to a device. This allows the model size to exceed the memory size of a single device, because a single device only needs to be able to hold part of the model’s parameters and calculations;
    • Because stages are divided, the stage with the slowest processing will become the bottleneck in the whole system. So you have to divide the force equally.
  • According to what segmentation do water?

    • How to further divide mini-Batch into smaller micro-batch, and use pipipline scheme to process one micro-batch data at a time, and send the micro-batch results to downstream equipment after obtaining the results. At the same time, the next micro-batch data is processed, and the Bubble in the device (the idle time of the device is called Bubble) is reduced through this scheme.
  • How to do Re-materialization?

6.3 Stage segmentation based on computational force

Neural network has a characteristic: for different input, its running time is not different, so it can predict its calculation power, time, parameter size and so on. Gpipe splits the graph based on computational forces to assign different layers to different devices.

6.3.1 PartitionSequentialLayers

PartitionSequentialLayers include a sequential the layers of layers of decomposition, the goal is to make each partition has roughly the same flops. The ultimate goal is for every GPU to have as much computing power as possible.

  • The input is: a Layer Param or a layer Param list;
  • The output is a FeatureExtractionLayer params list;

Logic is:

  • If params is just a layer, build the layer as a list subs containing sub-layers;

  • FPropMeta calculates the shapes and total FLOPS for the list of subs, which are assigned to Histo;

  • A normalized cumulative histogram of layer’s cost is calculated by histo.

  • Build a parts variable;

    • This variable is an array of num_PARTITIONS size; Array Each item is also an array;
    • Subs is divided into each item of parts according to the histogram, so that each parts[I] has part layers, and some operators with low computational power are combined into one part, in order to make the computational power of each item of final parts as same as possible.
  • Convert parts to a FeatureExtractionLayer Param list;

def PartitionSequentialLayers(params, num_partitions, *shapes): r"""Partition a layer composed of sequential layers. This routine strives to partition layers so that each partition costs roughly the same flops given the input shapes. Args: params: A layer param or a list of layer param. num_partitions: The desired number of partitions. *shapes: A tuple of tshape.Shape representing input tensors to the first layer. Returns: A list of FeatureExtractionLayer params. """ # Recursively concatenate SequentialLayer into a list. # SequentialLayer A flattener that links several layers in sequence def seq (p): if isinstance(p, list): Return p if p.ls not in [builder_layers.SequentialLayer, FeatureExtractionLayer]: Return [p.opy ()] subs = [] for _ in range(p.repeat): return [p.opy ()] subs = [] for _ in range(p.repeat): If params is a layer, build a new list subs that includes sub-layers based on that layer. If it is a list, return subs = FlattenSeq(params) assert Len (Shapes) == 1 tf.log. info(' num_PARTITIONS: {} input_shape: {}'. Format (num_partitions, shapes[0])) # FPropMeta calculates the shapes and total FLOPS for this list of subs, Histo # Computes the estimate cost for each sub layer. # if there are 7 sub-layers and its flops are 10,40,30,10,20, 50,10 total, histo, output_shapes = 0, [], [] for I, s in enumerate(subs): s.name = 'cell_%03d' % i meta = s.cls.FPropMeta(s, *shapes) # total += meta.flops histo.append(total) output_shapes.append(meta.out_shapes) shapes = meta.out_shapes Tf.logging. Vlog (1, 'len %d histogram = %s', len(subs), histo) # [10,50,80,90,110,160, 170] # calculate the normalized cumulative histogram of the layer's cost by histo # calculate the normalized cumulative histogram of the layer's cost cost. histo_pct = [float(x / total) for x in histo] tf.logging.vlog(1, 'cost pct = %s', Histo_pct) # histo_pct 1/17 [17, 8, 5 / / 17, 9, 17, 11/17 dec / 17, 1], # hypothesis num_partitions = 3 # building a variable parts. Partition subs into each item of parts according to the histogram, so that each part [I] has part layers. # i-th sub layer is put into partition j, where j is roughly i-th cumulative # histogram times num_partitions. parts = [[] for _ in range(num_partitions)] parts_cost = [0] * num_partitions pre_hist_cost = 0 for i, s in enumerate(subs): # Find the index of cost corresponding to S from the histogram array, j is the partition corresponding to S. For I and S, histo_pct[I] * num_partitions are as follows: [3/17, 15/17, 24/17, 27/17, 33/17, 48/17, 3] J = min(int(histo_pct[I] * num_partitions), num_partitions - 1) # The boundary at parts[j] where j > 0 if j > 0 and not parts[j]: parts_cost[j - 1] = histo_pct[i - 1] - pre_hist_cost pre_hist_cost = histo_pct[i - 1] parts[j].append(s) # [1,2],[3,4,5],[6,7] Parts_cost [num_PARTITIONS - 1] = 1.0-pre_HIST_cost seqs = [] for i, pa in enumerate(parts): tf.logging.info('Partition %d #subs %d #cost %.3f', i, len(pa), parts_cost[i]) seqs.append(FeatureExtractionLayer.Params().Set(name='d%d' % i, sub=pa)) return seqsCopy the code

6.3.2 FeatureExtractionLayer

The FeatureExtractionLayer is used in the code above, which simply returns a sequence of layers.

FeatureExtractionLayer extracts features from a layer sequence:

  • Join layers into a sequence;
  • Can obtain & transfer activation point;
class FeatureExtractionLayer(base_layer.BaseLayer): """A layer that extrac features from a sequence of layers. FeatureExtractionLayer is a layer which connects a few layers  in a sequence. It is also capable of fetching and forwarding activation endpoints. # TODO(huangyp): Make it a sublayer of builder_layers.SequentialLayer """ @classmethod def Params(cls): p = super().Params() p.Define('variable_name_prefix', '', 'Prefix for variable names in sub layers') p.Define('sub', [], 'A list of layers' params.') p.Define('num_act_inputs', 0, 'Number of activation inputs.') p.Define('num_act_outputs', 0, 'Number of activation outputs.') p.Define('act_fetch_layers', [], 'Names of fetch layers that cached extra activations') return p def __init__(self, params): super().__init__(params) p = self.params assert p.num_act_inputs >= 0 assert p.num_act_outputs >= 0 p.act_fetch_layers =  p.act_fetch_layers or [] assert p.num_act_outputs == p.num_act_inputs + len(p.act_fetch_layers) self._seq = [] for sub in p.sub: assert sub.name sub.name = p.variable_name_prefix + sub.name self.CreateChild(sub.name, Sub) self._seq.append((sub.name, self.children[sub.name])) def FProp(self, theta, *args): # To achieve forward propagation of this layer, P = self.params assert len(args) > p.um_act_inputs out_args = args[: -p.um_act_inputs] if p.num_act_inputs > 0 else args extra_args = args[-p.num_act_inputs:] if p.num_act_inputs > 0 else () for (name, ch) in self._seq: th = theta[name] out_args = _ToTuple(out_args) out_args = ch.FProp(th, *out_args) # Append fetched activations to fprop outputs. for fetch_layer in p.act_fetch_layers: Assert fetch_layer in self.children activation = self.children[fetch_layer]. Activation # If isinstance(activation, (tuple, list) : Activation = activation[0] # Add activation points extra_args += (activation,) # Add activation points extra_args: Return out_args @classmethod def FPropMeta(CLS, p, *args): out_args = _ToTuple(out_args) + extra_args Assert len(args) > p.um_act_inputs seq_args = args[: -p.um_act_inputs] if P.um_act_inputs > 0 else args extra_args = args[-p.num_act_inputs:] if p.num_act_inputs > 0 else () total = 0 act_fetch_metas = {} for sub in p.sub: meta = sub.cls.FPropMeta(sub, *seq_args) if sub.name in p.act_fetch_layers: act_fetch_metas[sub.name] = meta.out_shapes[0] total += meta.flops seq_args = meta.out_shapes for fetch_layer in p.act_fetch_layers: extra_args += (act_fetch_metas[fetch_layer],) return py_utils.NestedMap(flops=total, out_shapes=seq_args + extra_args)Copy the code

The calculation process is shown in the figure below. For specific values, please refer to the examples in the above codes:

+--------------+ +--------------+ +---------------+ | | | | | | | sub-layer 1 | | sub-layer 2 | | sub-layer n | | | | | | | +-------+------+ +-------+------+ +--------+------+ | | | |FPropMeta |FPropMeta |FPropMeta | | | v v v flops 1 flops  2 flops n + + + | | | | | | +--------------------------------------+ | | v for i, s in enumerate(subs): Total + = meta. Flops histo. Append (total) histo =,50,80,90,110,160,170 [10] + | | v Computes the normalized cumulative histogram of the layer's cost histo_pct = [float(x / total) for x in histo] Histo_pct = 17, 8 (1/17, 5 / / 17, 9, 17, 11/17 dec / 17, 1] + | | + the Assign the layers to partition -based on the histogram [1, 2], [three, four, five], [6, 7] + | | v +----------------------+----------------------------+ | parts | | | | +--------------+ +------------+ +-------------+ | | | sub-layer 1 | |sub-layer 3 | | sub-layer 6 | | | | | | | | | | | | sub-layer 2 | |sub-layer 4 | | sub-layer 7 | | | | | | | | | | | | | |sub-layer 5 | | | | | +--------------+ +------------+ +-------------+ | +---------------------------------------------------+Copy the code

6.4 Pipeline Allocation

6.4.1 Base class SeqLayer

The purpose of this layer is to deploy each child cell in CELL_TPL to the working device using a round-robin policy.

Params include:

  • Before_tpl: Configures the CNN layer that runs before the pipeline;
  • Cell_tpl: FeatureExtractionLayer list;
6.4.1.1 initialization

The logic for initializing the function is:

  • Iterate over before_tpl, calling CreateChild for each item to build its sublayer, adding the item to _before_layers.
  • Iterating cell_tpl, CreateChild is called for each item to build its sub-layer, adding item to _cells;
def __init__(self, params): Super ().__init__(params) p = self.params self._before_layers = [] self._cells = [] # traverse before_tpl, CreateChild is called for each item to build its sublayer, adding item to _before_layers for l in p.befiore_tpl: Self.createchild (l.name, l) self._before_layers.append((l.name, self.children[l.name])) # CreateChild is called for each item to build its sublayer and add item to _cells for l in p.cell_tpl: self.CreateChild(l.name, l) self._cells.append((l.name, self.children[l.name]))Copy the code
6.4.1.2 _CreateChildrenVariables

Build variables. The logic is as follows:

  • If TPU is used

    • usingcluster.WorkerDeviceInModelSplit(0)Build before_tpl_device, using the first device in the cluster as before_tpl_device;
    • Traverse other devices in the cluster and assign them to cell_devices;
  • Iterate over _before_layers and deploy each variable to before_tpl_device;

  • Iterate over _cells, deploying each of these variables to cell_devices;

def _CreateChildrenVariables(self): p = self.params num_cells = len(p.cell_tpl) before_tpl_device = '' cell_devices = [''] * num_cells if py_utils.use_tpu(): # # if you use tpu use ` cluster. WorkerDeviceInModelSplit (0) to build before_tpl_device `, In which the first cluster device as before_tpl_device cluster = self. Cluster before_tpl_device = cluster. WorkerDeviceInModelSplit (0) # Traversing the other devices in the cluster, Assigned to cell_devices cell_devices = [cluster WorkerDeviceInModelSplit (I) for I in range (num_cells)] # traversal _before_layers, Deploy each of these variables in the before_tpl_device for unused_name, L in self._before_layers: with tf.device(before_tpl_device): Cell_devices for I, (unused_name, L) in enumerate(self._cells): cell_devices for I, (unused_name, L) in enumerate(self. with tf.device(cell_devices[i]): l.InstantiateVariables() super()._CreateChildrenVariables()Copy the code
6.4.1.3 FProp

Forward propagation code, the specific logic is as follows:

  • Iterate over _before_layers, calling its FProp for each of them;
  • Iterate over _cells, for each layer, atcluster.WorkerDeviceInModelSplit(i)Call its FProp;
def FProp(self, theta, *args): """Round-robin every children cells in cell_tpl among worker devices. Args: theta: A NestedMap object containing weights' values of this layer and its children layers. *args: Input args Returns: A list contains one tensor of [batch_size, feature_height, feature_width, Channel]. """ num_layers = len(self.params.cell_tpl) cluster = self.cluster For each of these layers call its FProp for (name, L) in self._before_layers: L_theta = theta[name] args = _ToTuple(args) args = l.FProp(l_theta, *args) # In ` cluster. WorkerDeviceInModelSplit (I) above ` call its FProp for I in range (num_layers) : with tf.device(cluster.WorkerDeviceInModelSplit(i)): cell_name, cell = self._cells[i] args = _ToTuple(args) args = cell.FProp(theta[cell_name], *args) return argsCopy the code
6.4.1.4 Implementation

The code of SeqLayer is as follows:

class SeqLayer(base_layer.BaseLayer):
  """Round-robin every children cells in cell_tpl among worker devices."""
​
  @classmethod
  def Params(cls):
    p = super().Params()
    p.Define('before_tpl', [],
             'Config for the CNN layers that runs before pipelining.')
    p.Define('cell_tpl', [], 'A list of FeatureExtractionLayer layers.')
    return p
​
  def __init__(self, params):
    super().__init__(params)
    p = self.params
    self._before_layers = []
    self._cells = []
    for l in p.before_tpl:
      self.CreateChild(l.name, l)
      self._before_layers.append((l.name, self.children[l.name]))
    for l in p.cell_tpl:
      self.CreateChild(l.name, l)
      self._cells.append((l.name, self.children[l.name]))
​
  def _CreateChildrenVariables(self):
    p = self.params
​
    num_cells = len(p.cell_tpl)
    before_tpl_device = ''
    cell_devices = [''] * num_cells
    if py_utils.use_tpu():
      cluster = self.cluster
      before_tpl_device = cluster.WorkerDeviceInModelSplit(0)
      cell_devices = [
          cluster.WorkerDeviceInModelSplit(i) for i in range(num_cells)
      ]
​
    for unused_name, l in self._before_layers:
      with tf.device(before_tpl_device):
        l.InstantiateVariables()
​
    for i, (unused_name, l) in enumerate(self._cells):
      with tf.device(cell_devices[i]):
        l.InstantiateVariables()
​
    super()._CreateChildrenVariables()
​
  def FProp(self, theta, *args):
    """Round-robin every children cells in cell_tpl among worker devices.
​
    Args:
      theta: A NestedMap object containing weights' values of this layer and its
        children layers.
      *args: Input args
​
    Returns:
      A list contains one tensor of [batch_size, feature_height, feature_width,
        channel].
    """
    num_layers = len(self.params.cell_tpl)
    cluster = self.cluster
​
    for (name, l) in self._before_layers:
      l_theta = theta[name]
      args = _ToTuple(args)
      args = l.FProp(l_theta, *args)
    for i in range(num_layers):
      with tf.device(cluster.WorkerDeviceInModelSplit(i)):
        cell_name, cell = self._cells[i]
        args = _ToTuple(args)
        args = cell.FProp(theta[cell_name], *args)
​
    return args
Copy the code

6.4.2 Allocating PipeliningLayer

PipeliningLayer is a descendant of SeqLayer.

  • At the front of the pipeline is device[0], which handles the preconditions.
  • In the middle of the pipeline is a series of devices that handle specific Micro batches.
  • At the end of the pipeline is device[-1], which is responsible for sorting out shapes, and finally outputs a final tensor.
6.4.2.1 Get the output shape of the middle layer

_CalculateOutputShapes Computes the output shape of the intermediate layer. The specific logic is as follows:

  • Iterate over _before_layers and call its FPropMeta for each layer to get output shapes, which is inserted into the state_SHAPES array.
  • Iterate over _cells and call its FPropMeta for each layer to get Output shapes, which is inserted into the state_SHAPES array;
def _CalculateOutputShapes(self, input_shapes): """Calcuate the output shape of intermediate layers. Given the FPropMeta function in each FeatureExtractionLayer, calcuates the shapes of outputs of that layer. This is used to recover the shape information in StackedRecurrent. Args: input_shapes: NestedMap or tuple of input TensorShapes. Returns: Return a list of K + 1 NestedMaps or lists of tShape where K is the number of partitions. """ p = self.params shapes = [] # Converts TensorShape to tshape.Shape. def _ToTShape(x): if x is None: return None return tshape.Shape(x.as_list()) shapes = py_utils.Transform(_ToTShape, Input_shapes) shapes = _ToTuple(shapes) state_shapes = [] # walk over _before_layers, call its FPropMeta for each layer, get output shapes, Insert for (_, cell) in self._before_layers into state_SHAPES array: shapes = cell.FPropMeta(cell.params, *shapes).out_shapes state_SHAPES. Append (shapes[0] if p.nested_map_fprop else shapes) Insert into the state_SHAPES array for (_, cell) in self._cells: shapes = cell.FPropMeta(cell.params, *shapes).out_shapes state_shapes.append(shapes[0] if p.nested_map_fprop else shapes) return state_shapesCopy the code
6.4.2.2 Get the data type

The _get_state_dtype is used to get the data type.

  def _get_state_dtype(self, *args):
    if self.params.state_dtype:
      return self.params.state_dtype
    if self.params.nested_map_fprop:
      inputs = args[0].Filter(lambda x: x is not None)
      return py_utils.Flatten(inputs)[0].dtype
    return args[0].dtype
Copy the code
6.4.2.3 Get input shape

Gpipe will first split a mini-batch of batches into even smaller micro-batches and then pipe the execution of each small batch to the cell.

The _get_input_SHAPES function is to get the shapes of the input. The logic is as follows:

  • Input Input_tensors is obtained from ARGS;

  • Iterating through input_tensors, find the first tensor that is not empty, obtain the batch size of this tensor, and assign it to mini_batch_size;

  • Get micro_batCH_size from the parameter, set to micro_batch_size;

  • If micro_batCH_size does not make sense then:

    • If P.num _micro_batches is larger than MINI_batch_size, then P.NUM _micro_batches is mini_BATch_size.
    • Set micro_batch_size to mini_batch_size // P.num _micro_batches;
  • We set up a set of Input_tensors and traversed them. For each tensor, we got the shapes list input_SHAPE, and set the batch_DIM of input_SHAPE to micro_batch_size.

  • If p.ested_map_fprop is set, input_SHAPES is built into a recursive nested structure;

  • Return input_shapes;

def _get_input_shapes(self, *args): p = self.params if p.nested_map_fprop: assert len(args) == 1 assert isinstance(args[0], py_utils.NestedMap) input_tensors = py_utils.Flatten(args[0]) else: Input_tensors = _ToTuple(ARgs) # go through the input_tensors, find the first non-empty tensor, obtain the batch size of this tensor, Mini_batch_size = Get batch size from the first tensor which is not None. Mini_batch_size = None for input_tensor in input_tensors: if input_tensor is not None: mini_batch_size = input_tensor.get_shape().as_list()[p.batch_dim] assert mini_batch_size is not None micro_batch_size = p.micro_batch_size if not micro_batch_size: # If P.num micro_micro_batches is larger than MINI_BATch_size, Then P.num _micro_batches is a batch of mini_batch_size. If P.NUM _micro_batches > mini_batch_size: P.num _micro_batches = mini_batch_size // P.num _micro_micro_batches = mini_batch_size  mini_batch_size // p.num_micro_batches if mini_batch_size is not None: if micro_batch_size * p.num_micro_batches ! = mini_batch_size: raise ValueError('micro_batch_size * num_micro_batches ! = batch_size.') # go through the input_tensors, and obtain the shapes list input_shape for each tensor, And set input_shape batch_dim to micro_batch_size Input_SHAPES = () for input_tensor in Input_tensors: if input_tensor is not None: input_shape = input_tensor.get_shape().as_list() input_shape[p.batch_dim] = micro_batch_size input_shapes += (tf.TensorShape(input_shape),) else: Input_shapes += (None,) # Build input_shapes into a recursive nested structure if p.ested_map_fprop is set: input_shapes = py_utils.Pack(args[0], input_shapes) return input_shapesCopy the code
6.4.2.4 FProp

A forward propagation function that runs multiple children cells on multiple devices in pipelined fashion. The specific logic is as follows:

  • Do some preparatory work such as:

    • Get the number of children cells;
    • Get cluster;
    • The input is shapes, dtypes;
    • Calculate the output shapes from the input shapes;
  • Traverse the processing middle layer:

    • For specific cells, add the cells to the accumulation layer, and obtain the corresponding function for each cell.
    • Set up its initial state for subsequent StackedRecurrent runs;
    • Add cell_IDx corresponding devices to the device list;
  • Set some variables for each device in the middle of the line (without the heads and tails);

  • Do the following on the first device:

    • Iterate over _before_layers, run FProp on each layer, finally get previous;
    • To continue with previous, construct inputs, that is, split tensors using _StackAndSplit;
    • Set its input for pipeline follow-up equipment;
  • On the middleware execution recurrent. StackedRecurrent operation;

  • Aggregate the micro_batches shapes on the last batch to produce the output tensor:

    • If nested, the last shape is returned;

    • Otherwise, iterate over the output and aggregate the shapes of each output;

  def FProp(self, theta, *args):
    """Run multiple cells in different devices in a pipelining manner.

    Args:
      theta: A NestedMap object containing weights' values of this layer and its
        children layers.
      *args: Non-keyworded variable length argument list of input tensors.

    Returns:
      A list of output tensors
    """
    # TODO(huangyp): handle optional None inputs.
    p = self.params
    if self.do_eval and self.cluster.num_devices_per_split == 1: # 如果设置了 do_eval 并且集群的 num_devices_per_split 为 1
      outputs = copy.copy(args)
      # 就直接串行执行
      for (name, l) in self._before_layers + self._cells:
        outputs = _ToTuple(outputs)
        outputs = l.FProp(theta[name], *outputs)
      return outputs

    num_cells = len(p.cell_tpl) # 得到 children cell个数
    cluster = self.cluster # 得到集群

    # Compute shapes of input and output tensors.
    # 得到 输入shapes,dtypes
    input_shapes = self._get_input_shapes(*args)
    state_dtype = self._get_state_dtype(*args)
    # 利用 输入shapes 计算出 输出shapes
    state_shapes = self._CalculateOutputShapes(input_shapes)
    tf.logging.info('state_shapes={}'.format(state_shapes))

    def GetCellFn(i): # 对于第 i 个层,返回一个对应的函数,这个函数将在 StackedRecurrent 内部执行
      """Get the ith feature extraction layer."""

      def CellFn(theta, state0, inputs):
        """A cell fn is exectued inside of StackedRecurrent."""
        # 没有深入研究StackedRecurrent,只从此函数看,作用是利用cell.FProp计算输出,并且得到一个state,其中包括输出和micro batch tensor
        del state0

        def _FPropInputSetShape(name, t_shape): # 给输入设置shape
          if t_shape is None:
            return None
          inputs[name].set_shape(t_shape.ToTensorShape().as_list())
          return inputs[name]

        if p.nested_map_fprop:
          # pylint: disable=protected-access
          fprop_inputs = state_shapes[i]._RecursiveMap(_FPropInputSetShape)
          # pylint: enable=protected-access
        else:
          fprop_inputs = []
          for input_idx, input_shape in enumerate(state_shapes[i]):
            name = 's{}'.format(input_idx)
            fprop_inputs.append(_FPropInputSetShape(name, input_shape))

        with py_utils.RemoveAssertContext(remove=True):
          with CellFnFPropOpReplacementWrapper():
            tf.logging.info('cell {} input {}'.format(i, fprop_inputs))
            mb_tensor = inputs[_MICRO_BATCH_STATE_NAME] # 得到输入的 micro batch tensor
            SetOverWriteGlobalStep(mb_tensor)
            _, cell = self._cells[i]
            fprop_inputs = _ToTuple(fprop_inputs)
            outputs = cell.FProp(theta, *fprop_inputs) # 计算输出

        if p.nested_map_fprop:
          assert py_utils.IsCompatible(outputs, state_shapes[i + 1])
          state1 = outputs.Filter(lambda x: x is not None)
        else:
          state1 = py_utils.NestedMap()
          outputs = _ToTuple(outputs)
          assert len(outputs) == len(state_shapes[i + 1])
          for output_idx in range(len(outputs)):
            if outputs[output_idx] is not None:
              name = 's{}'.format(output_idx)
              state1[name] = outputs[output_idx]
        state1[_MICRO_BATCH_STATE_NAME] = mb_tensor
        return state1, py_utils.NestedMap()

      return CellFn

    cell_fns = []
    accumulator_layers = [] # 为了梯度累积
    thetas = []
    init_states = []
    devices = []
    # 遍历,把cell_idx对应的设备加入到devices列表
    for cell_idx in range(num_cells): # 遍历 children cell
      cell_name, cell = self._cells[cell_idx] # 得到具体一个 cell
      accumulator_layers.append(cell) # 把cell加入到累积层中
      cell_fns.append(GetCellFn(cell_idx)) # 对于每个cell,得到对应的function
      thetas.append(theta[cell_name]) # 添加 theta

      # 返回一个带有形状t_shape的,类型为state_dtype的张量,并且所有元素都设为零.
      def _TfZeros(t_shape):
        if t_shape is None:
          return None
        return tf.zeros(t_shape.ToTensorShape().as_list(), dtype=state_dtype)

      # 为后续的 StackedRecurrent 运行设置其初始状态
      if p.nested_map_fprop:
        init_state = py_utils.Transform(_TfZeros, state_shapes[cell_idx + 1])
        init_state = init_state.Filter(lambda x: x is not None)
      else:
        init_state = py_utils.NestedMap()
        for output_idx, state in enumerate(state_shapes[cell_idx + 1]):
          state = _TfZeros(state)
          if state is not None:
            name = 's{}'.format(output_idx)
            init_state[name] = state
      init_state[_MICRO_BATCH_STATE_NAME] = tf.cast(0, dtype=state_dtype)
      init_states.append(init_state)

      # 把cell_idx对应的设备加入到devices列表
      devices.append(cluster.WorkerDeviceInModelSplit(cell_idx))

    # 为流水线中间(去除头尾)的各个设备设定一些变量
    cell_grads = [None] * num_cells
    cell_outs = [lambda x: x] * num_cells
    cell_out_grads = [lambda x: x] * num_cells

    # 在第一个设备上执行如下操作
    with tf.device(devices[0]): 
      previous = _ToTuple(args)
      for (name, l) in self._before_layers: # 遍历_before_layers,运行每层的FProp,最终得到 previous
        previous = l.FProp(theta[name], *previous)
        previous = _ToTuple(previous)

      def _StackAndSplit(x): # 把张量分割成
        # Split tensors into microbatches.
        if x is None:
          return None
        # tf.split按照行或者列分割一个矩阵
        return tf.stack(tf.split(x, p.num_micro_batches, axis=p.batch_dim))

      # 对于 previous 继续操作,构建出 inputs,即利用_StackAndSplit分割张量
      if p.nested_map_fprop: # 嵌套情况,只选取previous[0]做处理
        inputs = py_utils.Transform(_StackAndSplit, previous[0]) #利用_StackAndSplit分割张量
        inputs = inputs.Filter(lambda x: x is not None)
      else: # 非嵌套
        inputs = py_utils.NestedMap()
        for output_idx, output_tensor in enumerate(previous): # 遍历第一层的输出
          output_tensor = _StackAndSplit(output_tensor) # 利用_StackAndSplit分割张量
          if output_tensor is not None:
            name = 's{}'.format(output_idx)
            inputs[name] = output_tensor
      gs_tensor = py_utils.GetGlobalStep()
      # 为流水线后续设备设置其输入
      inputs[_MICRO_BATCH_STATE_NAME] = tf.stack([
          tf.cast(gs_tensor * p.num_micro_batches + t, dtype=state_dtype)
          for t in range(p.num_micro_batches)
      ])
      
    # 在中间设备上执行操作    
    tf.logging.info('pipeline input = {}'.format(inputs))
    output_state, _ = recurrent.StackedRecurrent( 
        devices=devices,
        cell_fns=cell_fns,
        cell_grads=cell_grads,
        cell_outs=cell_outs,
        cell_out_grads=cell_out_grads,
        thetas=thetas,
        init_states=init_states,
        inputs=inputs,
        accumulator_layers=accumulator_layers,
        unused_acc_state=True)

    # 在最后一个设备上执行如下操作,最终得到输出张量
    with tf.device(devices[-1]):
      def _ReshapeRetVal(name, t_shape): # 把micro_batches的形状聚合,得到最终输出
        """Restore shape for tensors in microbatches."""
        if t_shape is None:
          return None
        output_tensor = output_state[name]
        if p.batch_dim != 0:
          perm = list(range(1, p.batch_dim + 1)) + [0]
          perm += list(range(p.batch_dim + 1, t_shape.rank + 1))
          output_tensor = tf.transpose(output_tensor, perm=perm)
        output_shape = t_shape.ToTensorShape().as_list()
        output_shape[p.batch_dim] *= p.num_micro_batches
        output_tensor = tf.reshape(output_tensor, output_shape)
        return output_tensor

      # Construct the final return values from output_state.
      if p.nested_map_fprop: # 如果嵌套,则返回最后一个形状
        # pylint: disable=protected-access
        output_tensors = state_shapes[-1]._RecursiveMap(_ReshapeRetVal) # 聚合形状
        # pylint: enable=protected-access
      else:
        output_tensors = []
        # 遍历输出,聚合各个输出的形状
        for output_idx, state_shape in enumerate(state_shapes[-1]): 
          output_name = 's{}'.format(output_idx)
          output_tensor = _ReshapeRetVal(output_name, state_shape) # 聚合形状
          output_tensors.append(output_tensor)
        if len(output_tensors) == 1:
          output_tensors = output_tensors[0]
        else:
          output_tensors = tuple(output_tensors)
        
      tf.logging.info('pipeline output = {}'.format(output_tensors))
      return output_tensors

Copy the code
6.4.2.5 class definition

The specific code is as follows:

class PipeliningLayer(SeqLayer):
  """Pipelining a sequence of layers on multiple devices."""
​
  @classmethod
  def Params(cls):
    p = super().Params()
    p.Define('num_micro_batches', 1, 'Number of micro batches.')
    p.Define('micro_batch_size', None, 'Size of a micro batch.')
    p.Define('batch_dim', 0, 'The batch dimension.')
    p.Define('state_dtype', None, 'Externally specify dtype for states.')
    p.Define(
        'nested_map_fprop', False, 'Whether arguments and returns of '
        'cell fprop functions are nested maps')
    return p
Copy the code

The specific FProp function logic is shown below:

+--------------------------------------------------------------+ | FProp _CalculateOutputShapes | | + | | | | | | | | v | | state_shapes | | + | | | | | | | | | | | v | | for cell_idx in range(num_cells): | | + | | | | | | | | v | | devices.append(WorkerDeviceInModelSplit(cell_idx)) | | + | | | | | | | | v | | with tf.device(devices[0]) | | + | | | | | | | | v | | recurrent.StackedRecurrent(cell_outs) | | + | | | | | | | | v | | with  tf.device(devices[-1]) | | + | | | | | | | | v | | output_tensors | | | +--------------------------------------------------------------+Copy the code

The Device pipeline logic is as follows:

devices[0] + | | | v +----------------------+-------------------------+ |Pipeline | | devices[1] | | + | | | | | | | | v  | | cell_grads[1~n] devices[2] | | + | | cell_outs[1~n] | | | | | | cell_out_grads[1~n] v | | devices[3] | | + | | | | | | | | v | | devices[4] | | | +----------------------+-------------------------+ | | | v devices[-1]Copy the code
6.4.2.6 use

For example is given in the source GPipeBatchMajorTransformerStack, so far, inheritance PipeliningLayer can.

class GPipeBatchMajorTransformerStack(PipeliningLayer): """Stacked self- multi-head attention and fully connected layers. With optional layer normalization applied to the final The output. See 'Attention Is All You Need' https://arxiv.org/abs/1706.03762 for the details. Implements a gipe stack for batch major transformer variant. """Copy the code

GPipeBatchMajorTransformerStack FProp returns a list of output tensor, among them the following code calls the PipeliningLayer function.

logits = super().FProp(theta, source_input, source_paddings, target_input,
                       target_paddings, encoder_self_atten_segment_mask,
                       decoder_self_atten_segment_mask,
                       decoder_cross_atten_segment_mask, source_segment_pos,
                       target_segment_pos)
Copy the code

The specific code is as follows:

def FProp(self, theta, source_input, source_paddings, target_input=None, target_paddings=None, source_segment_id=None, target_segment_id=None, labels=None, label_weights=None, source_segment_pos=None, target_segment_pos=None): p = self.params if p.num_decoder_layers > 0: assert target_input is not None assert target_paddings is not None target_time = tf.shape(target_input)[1] batch = tf.shape(target_input)[0] encoder_self_atten_segment_mask = None decoder_self_atten_segment_mask = None decoder_cross_atten_segment_mask = None # Prepare segment masks from segment ids. if p.packed_input: dtype = py_utils.FPropDtype(p) assert source_segment_id is not None, ( 'Need to specify src_segment_id if packed input is supported.') assert source_segment_pos is not None, ( 'Need to specify src_segment_pos for packed input and embeddings.') encoder_self_atten_segment_mask = batch_major_attention.SegmentMask( source_segment_id, source_segment_id, dtype, False) if target_segment_id is not None: decoder_self_atten_segment_mask = batch_major_attention.SegmentMask( target_segment_id, target_segment_id, dtype, False) causal_padding = tf.expand_dims( tf.tile( tf.expand_dims( batch_major_attention.CausalPadding( target_time, dtype=dtype), 0), [batch, 1, 1]), 1) decoder_self_atten_segment_mask = tf.math.maximum( causal_padding, decoder_self_atten_segment_mask) decoder_cross_atten_segment_mask = batch_major_attention.SegmentMask( Target_segment_id, source_Segment_id, dTYPE, False) # FProp through the PipeliningLayer Complete assembly line operation. logits = super().FProp(theta, source_input, source_paddings, target_input, target_paddings, encoder_self_atten_segment_mask, decoder_self_atten_segment_mask, decoder_cross_atten_segment_mask, source_segment_pos, target_segment_pos) label_weights = tf.reshape(label_weights, [-1]) target_probs = None if p.label_smoothing: target_probs = self.smoother.FProp( theta.smoother, target_paddings, labels, target_ids=None) target_probs = tf.reshape(target_probs, [-1, p.softmax_tpl.num_classes]) reshaped_logits = tf.reshape(logits, [-1, p.softmax_tpl.num_classes]) tgt_labels = tf.reshape(labels, [-1]) num_splits = len(p.splits) softmax = self.children['cell_{}'.format(num_splits - 1)].softmax softmax_theta = theta['cell_{}'.format(num_splits - 1)].softmax per_example_xent, _ = softmax.XentLossFromLogits( softmax_theta, reshaped_logits, class_weights=tf.reshape(label_weights, [-1]), class_ids=tgt_labels, class_probabilities=target_probs) xent_shape = tf.shape(logits)[:2] per_example_xent = tf.reshape(per_example_xent, xent_shape) return per_example_xent, logitsCopy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

This article will let you master the system design of OneFlow framework (Part 1)

DeepSpeed: Extreme-scale model training for everyone

DeepSpeed: a large-scale model training tool available to all

Training GPT-3, why does the original deep learning framework not work?

Why is the GPT-3 model difficult to reproduce? This is probably the best design for a distributed AI framework

FLOPs and model inference speed

Calculation of the number of Parameters and FLOPS in deep learning (taking the classic AlexNet network structure in CNN as an example)

The computational power required for the CNN model What are flops? How do you calculate that?

Computation computation of FLOPs in CNN

About the definition and calculation of FLOPS

ZeRO: Model training method for trillion-level parameters

Model parallelism Best Practices (PyTorch)

Tensorflow: Model Parallelism

Pytorch.org/tutorials/i…

Pytorch model parallel

Arxiv.org/pdf/1802.09…

What is the principle of model parallelism in deep learning?

www.microsoft.com/en-us/resea…

Paper interpretation series 5: Microsoft Stanford and other PipeDream fast training large-scale neural network

Microsoft has proposed a new DNN parallel training method called PipeDream, which is four times faster than traditional methods

How to evaluate Google’s open source parallel acceleration library GPipe?

Paper interpretation series 4: Google GPipe training very large scale neural network

How to reduce neural network memory?

Video memory optimization techniques during training — OP merge and Gradient Checkpoint

How to use GPU training data when video memory is insufficient

How to deal with insufficient GPU memory?

Model training too slow? Not enough video memory? This method lets your GPU work with the CPU

Comparison of tF-Replicator, GPipe and Mesh-Tensorflow libraries

Summary of Data Parallelism in deep neural network training

[New] Facebook’s deep learning recommendation system

Pytorch.org/tutorials/i…

Github.com/pytorch/tut…

Introduction to Distributed TensorFlow

Why do gradients in PyTorch have to be manually cleared before backpropagation?

Model.zero_grad() or optimizer.zero_grad()?

A trick to use bigger batches for training: gradient accumulation

Training Neural Nets on Larger Batches: Practical Tips for 1-GPU, Multi-GPU & Distributed setups

Lingvo framework day reading notes

Distributed training from starter to quit