In the MegEngine Meetup on March 25th, Professor Zhou Yizhuang from Megvii shared “Implementing Complex parallel training with MegEngine Distributed Communication operator”.

Live playback links: Complex parallel training with MegEngine distributed communication operators – MegEngine Meetup no.2_ goippy-゜) analysis ロ Cheers ~-bilibili

The content is mainly divided into four parts: 1. Introduced the distributed communication operator of MegEngine; 2. Simple parameter parallelism, used to get familiar with some basic concepts of model parallelism; 3. Intra-layer model parallelism; 4. Interlayer model parallelism and pipeline parallelism, and how to implement a simple GPipe. The following is a transcript of the sharing, Enjoy~

The background,

Parallel training is a very important part of deep learning research and business. Many basic studies need large-scale computing clusters or even supercomputers to complete. For example, we know DeepMind’s AlphaGo for playing Go, and OpenAI’s 175-billion-parameter gpT-3 language model, which OpenAI recently developed with CLIP and All-E, They all use very large clusters for distributed training. And because megvii research institute has Brain++, a distributed computing platform, we also have a lot of good results. Large models have significant advantages over small models in all kinds of visual and language tasks. Therefore, a recent trend is that the larger the model size and data size, the better. “Large is justice”, so large-scale parallel training is needed more.

Parallel training, on the one hand, can mobilize hundreds or even thousands of Gpus (graphics processor, also known as “graphics card”, referred to as “card”, is the most common computing equipment of deep learning) for training, and on the other hand, we can design the most efficient parallel mode according to the characteristics of business or model. This is one of the practical implications of parallel training that I talked about today.

First, there are three commonly used parallel modes in deep learning. The relationship between the three parallel modes can be clearly expressed in the following figure.

The first (intra-layer model parallelism) takes advantage of the natural parallelism characteristic of matrix multiplication to separate the matrix multiplication calculation inside each layer (such as the fully connected layer or the convolution layer) and performs grouping calculation along the input/output channel, which is called intra-layer model parallelism.

The second type (inter-layer model parallelism) takes advantage of the serial execution of neural network to disassemble the network according to the execution sequence and place it on different devices for calculation. For example, we have a ResNet18, which has 17 convolutional layers and the last full-connection layer. If we put the first nine and the last nine layers of computing on two cards (i.e. GPU/ graphics card), it is called interlayer model parallelism. The parallel modes of the two models are “orthogonal”, which do not affect each other and can exist simultaneously.

The above said two parallel, the model parameters are open to it, each compute node (compute node is an abstraction of the underlying computing equipment, it can be a card, it can be a single or a group of eight card machine, namely loading of 8 pieces of GPU computer) only responsible for part of the management of the entire network parameters and to participate in this part of the parameters of the corresponding calculation.

The last is the most commonly used data parallelism, which is another dimension in which model parameters are shared, but the data received is different. By adding computing devices, we can increase the batch size (i.e. the number of training images) of a single iteration approximately linearly, thus saving the time of training the model.

The three parallel dimensions are biorthogonal, which means that in practical training we will use both model parallelism and data parallelism. For small models, data parallelism may be enough, but for large models, it is difficult to complete the calculation with a single GPU due to the large number of parameters and large amount of computation. In this case, the calculation must be disassembled to different Gpus, namely model parallelism.

Two, the communication operator of MegEngine

Next, let’s get to the main topic of today’s lecture. Let’s start with the communication operator.

The history of mankind is actually a history of information exchange, it’s also a history of communication — people talking to each other is communication, and I’m going to broadcast it to you today, and that’s communication, and television and radio are certainly communication.

For deep learning framework, communication is one of the most important functions, otherwise data parallelism and model parallelism are difficult to achieve. To put it simply, I have many computing devices (Gpus), and I need to make information interact among all computing devices, which requires set communication — set communication is a set of communication rules with complete derivation.

There are eight set operators and two point-to-point operators listed in the table, which are all MegEngine operators. Eight kinds of set communication operators constitute a complete set of communication rules for derivation, and they are derivatives of each other. MegEngine provides automatic derivatives of communication operators, so like all other operators used for computation (convolution, ReLU, transpose, etc.) we are free to add communication operators to the forward graph and the framework will take care of the derivatives.

Considering that some students do not have background knowledge, we introduce the function of set communication operator one by one.

Broadcast

Broadcast is Broadcast.

It represents a process of data synchronization, synchronizing information from one GPU to all other Gpus. This is useful in data parallelism, where the parameters on each card are guaranteed to be the same, so we synchronize the parameters via Broadcast during initialization, and we periodically synchronize some cache information (buffers, such as BatchNorm statistics).

ReduceSum

The second one is ReduceSum. ReduceSum, called summation or reduction, collects data from all gpus onto one GPU and adds them together.

Broadcast and ReduceSum, the two communication operators we just talked about, are the cornerstone of Parameter Server, which is central, in which GPU0 plays a central role. I first send the central parameters to each card through Broadcast synchronization for pretransmission, and then collect the gradient of each card through Reduce for parameter update. Broadcast and ReduceSum are derivatives of each other. The derivative of ReduceSum is Broadcast, and the derivative of Broadcast is ReduceSum.

AllReduce

Let’s introduce AllReduce. Originally Reduce is reduced to one card, while AllReduce is reduced to every card. It can be understood as the combination of Reduce Broadcast. That is, I Reduce one card first, and then Rroadcast all cards. It can also be understood that each card invokes Reduce at the same time, and the derivative of AllReduce is AllReduce itself.

Although AllReduce can be realized only by Reduce and Broadcast, the efficient implementation of AllReduce (ring-AllReduce) is the cornerstone of modern distributed deep learning framework, and its communication time basically does not increase with the increase of GPU number. Therefore, the scale of distributed training can be realized efficiently. In data parallelism, we sum all gradients with AllReduce and use it for model parameter updates.

Gather

Gather simply means to Gather all the different pieces of information on each card and Concatenate them along the first dimension.

AllGather

AllGather is a full collection. Similar to AllReduce, It can be understood as Gather followed by Broadcast.

AllGather is a very important operation in our in-layer model combination, because your parameters are on different cards and your data is on different cards. When I do model parallel, I need to collect all the data or parameters and put them on a card before I can do the following calculation. This is what AllGather does.

AllToAll

AllToAll is also a common operation used in intra-layer model parallelism, especially when switching between model parallelism and data parallelism. It essentially transposes a matrix, as we will explain in more detail later. The derivative of AllToAll is itself.

Scatter and ReduceScatter

Finally, Scatter and ReduceScatter are combined together. Scatter is distribution, which splits the data on a card into different cards, and Gather and Gather are derivatives of each other.

ReduceScatter can be understood as summation before distribution, and it and AllGather are derivatives of each other.

3. Simple parameter parallelism

Having introduced MegEngine’s communication operators, let’s see how they are used. First, let’s start with simple parameter parallelism, which involves only the AllGather communication operator.

What about simple parameter parallelism? Let’s start by reviewing data parallelism with a simple full-join layer (that is, matrix multiplication) — in data parallelism, W is our model (that is, our weight, each card has the same copy) and x is the data. Data parallelism requires us to split the data evenly into each card, with 2 cards divided into 2 pieces, namely x0 and X1, and 4 cards divided into 4 pieces. And so on, matrix multiplication calculation is carried out for each card to obtain the corresponding result Y.

Simple parameter parallelism is essentially an optimization of data parallelism. Instead of putting the entire model on each card, we put only parts of the model, and only AllGather the parameters scattered across the cards to participate in the calculations when we need them (i.e. prequels).

How to do that? Before we do matrix multiplication operation, we AllGather the parameters, which we disassemble from each node. After AllGather, each card has all the weight, and the calculation becomes exactly the same as data parallelism. So the core operation of simple parameter parallelism is AllGather, essentially using communication to save video memory.

Why save video memory? Now that we’ve drawn the whole derivation, we know that when we train a parameter, it actually takes up three pieces of video memory — one parameter, one gradient, and one momentum of the optimizer, so if we use data parallelism for a model with one million parameters, It will take up 3 * 4G = 12G of video memory (1 million FP32 type data takes up 4G), so we have no video memory for training with a 2080Ti.

Let’s study this picture again. We did an AllGather in the prequel, and in the reverse transmission, we know that the derivative of AllGather is ReduceScatter, so it will carry out a ReduceScatter in the reverse transmission. This is different from data parallelism, where data parallelism pretransmission does not require communication, and backtransmission requires AllReduce, which is the difference.

We wrote a code for data parallelism and simple parameter parallelism in MegEngine, and there are three differences:

  • One difference is that their prequels are different — the right (simple parameter parallelism) is to do an AllGather;
  • Another difference is when they initialize parameters. In data parallelism we need parameter synchronization, so we need Broadcast, but in simple parameter parallelism we need parameter distribution, so we use Scatter to distribute them.
  • The last difference is that when we differentiate, we need AllReduce in data parallelism (MegEngine uses AllReduce Callback to support data parallelism), but we don’t need AllReduce in simple parameter parallelism, The automatic differentiator will be responsible for calling the ReduceScatter correctly when backloading.

4. Parallelism of models within layers

In – layer model parallelism is more complex in principle. The parameter parallelism we just talked about is actually a special case of intra-layer model parallelism, because it is very simple, just AllGather parameters. There are actually several different implementations of our in-layer model parallelism.

The figure above shows the complete implementation of matrix multiplication, data parallelism, and parallelism of the two models.

We know that both matrix multiplication and convolution layer in convolutional neural network (convolution layer can be regarded as matrix multiplication of channel dimension) are naturally parallel. In our matrix multiplication in the mathematical sense, each row and column operation can be carried out independently, and data parallelism makes full use of this feature. We divide the data evenly, put them on different devices for matrix multiplication respectively, and finally combine them to get a complete result.

In intra-layer model parallelism, the parameter matrix W of each layer (fully connected/convolution layer) is segmented. One way is to shard by output dimensions (sashes). The second type is shard by input dimension (crosscutting). The former gets corresponding results of partial output dimensions on each card; The latter makes use of the low-rank feature of the matrix. The result of each card is the low-rank component of the final result, which must be summed up through AllReduce or ReduceScatter in the future.

Next we apply in-layer model parallelism in multi-layer neural networks — we implement pure in-layer model parallelism, or use it in conjunction with data parallelism to achieve hybrid parallelism.

The first row in the figure above is pure data parallelism. Data is shard to each card at the beginning, and no exchange or information exchange is required. Therefore, no special operation is required for data parallelism followed by data parallelism.

The second kind of pure intralayer model parallelism. First of all, you need the input feature “X” of the complete sample number, and finally the matrix multiplication is the feature “Y” of the complete sample number but part of the output channel number. In order to continue the matrix multiplication of the model parallel, I must do an AllGather, Collect “Y” along the channel, turn it into “Y” with complete sample number and channel number, and multiply it by “V” that is parallel to the model. If the network continues to deepen, an AllGather operation is performed at the end of each matrix multiplication.

The third type of hybrid parallelism mixes data parallelism with in-layer model parallelism. We’ll start with model parallelism. The full Tensor of model parallelism outputs a sliced Y, but we need a sliced Y along the output channel at the same time. How do we do that? When introducing the MegEngine communication operator we mentioned a transpose operation called AllToAll, which directly converts the sash “Y” to the crosscut “Y”. Then we can restore data parallelism, and after we do a matrix multiplication of data parallelism, we want to do a matrix multiplication of model parallelism, so we do AllGather again, and we get the complete eigentensor for all samples and all channels. Once you’ve mastered the “toggle” with AllToAll and AllGather, you can design and train your own hybrid models in parallel.

Let’s illustrate two application scenarios.

Scenario 1: Fully connected intra-layer model parallelism

Let’s move into a specific scenario where fully connected intra-layer model parallelism is applied to face recognition tasks.

In face recognition tasks, there may be millions and millions of Identity (Identity, the same person is an ID), equivalent to doing an output dimension of millions/millions of classification tasks, so, the last layer, the classification of this layer FC layer (full connection layer) it may be particularly large parameters, For example, we have one million (1 million) ID, and the extracted face features are a vector of 1024 dimensions, which together will occupy 4G video memory. We just mentioned that the model with 4G parameters will occupy 3 times video memory in actual training, that is, 12G, which is not fit for the general video card. I can only put this fully connected to each card, if we have 8 cards, each card will only receive 1.5G, then it is acceptable. What are the characteristics of this scene? The face feature dimension is actually very small compared with my parameter matrix, so the cost of AllGather (data communication) is much smaller than AllReduce (weight communication), so it is particularly suitable for model parallelism in this scene.

What is the specific meaning of the result Y output by classifier W under model parallelism? We know that Y is vertically segmented, and the vertical dimension is the batch dimension, which is how many training samples it has. The horizontal dimension is actually the ID dimension, which is the category dimension, indicating the probability that the sample belongs to each ID. However, in the case of model parallelism, it only outputs the probability of a part of labels. When we find the loss function, we often use CrossEntropy, which requires all the class probabilities. Yes, by using the AllToAll operator we introduced before, we put AllToAll transpose the probability matrix of the output model parallelism, and it changed back to the format of data parallelism. (Lecturer’s note: You don’t actually need AllToAll. In the special case of classification tasks, you don’t need AllToAll, because communication is very expensive. You can calculate the cross entropy by two very low cost communication, but this is beyond the scope, but not too difficult, and I’ll leave it to you to think about.)

Let’s go straight to the code.

There are three steps to the process, the first is AllGather, the second is matrix multiplication, and the third is AllToAll.

So what is the code in the box above? We’re 0 0 doing this many new shapes what transpose it’s called data rearrangement and we’re going to spend another 5 minutes talking about what data rearrangement is

After we finish AllToAll, what we get is not a result of partial data plus all classification that we want. In fact, it is not what we expect from the bottom data layout. Above is an example of a simplified version with 8 categories ranging from 0 to 7, and its sample is 4 images of human faces. After model parallelism, the output of card 0 is 0-3, and card 1 is 4-7. 0 0,1,2,3,10,11,12,13 is not what we wanted after AllToAll we wanted 0,1,2,3,4,5,6,7 the next one is 0 0,1 0 0,1,2,3 the numbers are 0 0,1,2 and 3 are continuous along this direction we’re 0 0,10,4,14 the outside dimensions are 0 0,1 For future use, I have simply implemented the following two packages, the above package is called MP2DP, which is a package from model Parallelism to Data Parallelism. The following package is dp2MP, with these two packages, our above prequel code becomes simple.

Scenario 2: Group convolution model parallelism

Now that we’ve done full Convolution, let’s talk about Group Convolution,

Group Convolution is particularly common in our mobile terminal model. The difference between Group Convolution and ordinary Convolution lies in that Group Convolution is equivalent to K ordinary Convolution. Let’s say you have three sets of convolution, which is the equivalent of three ordinary convolution, but each ordinary convolution is smaller than its own, and you can also see that this is naturally parallel, and the red, green, and yellow ones in the figure above can actually be done separately, on different devices.

The following figure abstracts the difference between convolution and group convolution using the previous two-dimensional representation — the model of group convolution is different from convolution. Group convolution is equivalent to a sparse matrix multiplication, which is not a dense matrix.

In the case of parallel data, just like normal convolution, we slice the data; Model parallelism we can directly separate these three groups by color. We make the first group on the first card, the second group on the second card, and the third group on the third card. For each card, the original group convolution calculation becomes ordinary convolution operation.

If we have normal convolution in front, and we want to insert a group convolution with parallel models in between, how do we switch between these two data configurations?

Very simple, we do a data rearrangement (AllToAll), since it is data parallelism to model parallelism, so we call transpose_dp2mp.

If we have multiple groups convolved, and they’re joined together, we don’t actually have to switch back and forth between data and model parallelism, we just have to focus on heads and tails. So, our group convolution has a name is_head and is_tail in the prepass function, we communicate once at is_head, we communicate again at is_tail, and we don’t need to communicate at all.

5. Interlayer model parallelism

We move into interlayer model parallelism, where we introduced the principles and applications (full join and group convolution). Interlayer model parallelism is very different from in-layer model parallelism, which is mainly simple model parallelism and pipeline parallelism. Interlayer model parallelism simply means separating the front, middle and back parts of the network (or even more), like a fish, head, tail and tail.

Let’s take a quick look at a schematic comparison between data parallelism and interlayer model parallelism.

Data parallelism is to split the data. The parallel model between layers does not cut the data, but distributes the first and last parts of the model to different Gpus. This involves a problem: how to “put” the output results of the first “Y” GPU to the second GPU, which requires the send operation. MegEngine provides eight set communication operators, plus two point-to-point communication operators — send and receive. These two operators constitute the core operation of interlayer model parallelism. Next, we mainly talk about send and receive.

If the models are parallel between layers and we use a chart to abstract them (as shown in the lower part of the figure above), the horizontal axis is the calculation time, and the vertical axis is our computing device (GPU) as the calculation progresses. We find that there are dependencies between tasks, so send operation must be performed after GPU 0 is calculated. Card 1 receives card 0, and then performs its own calculation. Card 2 receives… That’s how you get through a process.

For convenience, we have done another encapsulation. The first function is to send our calculation results to the next GPU. This function is called by the next CPU and it is taken out from the previous GPU. Support is coming in the next version of MegEngine), so I implemented it briefly when it was packaged.

Simple model parallelism

Let’s go straight to the code. In normal data parallelism, this is a simple ResNet 18 model, which has 17 convolution layers plus one full connection layer. In simple model parallelism, if it’s GPU 1, it’s responsible for 5 convolution layers in the first part, and 4 convolution layers in the second and third parts, The last GPU is responsible for 4 layers of convolution and the last layer of full linking.

Judge first in the prequel — when we were not on the first GPU, we took data from the first card. And then you do your own convolution calculation. After getting the result, we will judge again — if it is not the last GPU, we will send my data to the next GPU; if it is the last GPU, we will directly return.

We can use code to show the results of parallel reasoning and training for simple models:

In the process of inference, input a group of 32 images with 224 resolution, the first three Gpus output the intermediate features of the network, and the last GPU outputs the predicted values of the network. It is worth mentioning in the training: first, because the model is parallel, we do not need AllReduce; Second, the first three Gpus pass a None when calling gM. backward, in fact when we were designing the API, anything backward can do, what happens with backward None here? Since the prepass has a SEND, automatic differentiation inserts a RECV, which waits for a gradient from the downstream, and then does the normal backpass.

Pipeline parallelism

We’re going to talk about pipeline parallelism. A simple parallel model needs to calculate all the data of the same batch and then give the data of the next batch. In fact, each card has a long idle period, either waiting for the completion of a card, or completing its own batch of tasks, waiting for the next batch of data.

If we divide the data of a batch into many smaller pieces, we can make the 0 card count a small piece first, then send it to the next card immediately after the calculation, and then calculate the next small piece. In this way, card 0 and card 1 can be counted at the same time, and the vacancy rate will go down.

That’s one of the core ideas of pipelining parallelism, and let’s see how it works in code.

For example, in this case, we want to split a piece of data into 4 pieces, we use F.split to split it into 4 pieces, and then iterate over the 4 pieces of data, if it is the first card, it takes that data, otherwise it will wait, waiting to receive the calculation results of the previous card. Anyway, after we get the data, we do the calculation, and then we process the result — just like with the simple model parallel, if it’s not the last GPU, I send it to the next one, and if it’s the last GPU, I just come out and return the result.

This is pipelined parallelism. Of course, pipelined parallel code in the actual scenario needs to consider the execution efficiency, which is not so simple. For example, asynchronous send/ RECV will be introduced to reduce the waiting time.

Not only do we have to reason, we also have to train, and training involves a backpass. In ordinary model parallelism, our backpass and prepass timelines are shown in the following figure:

We’ll do the prequel, then the reverse. But in our pipeline parallel inside, in fact, backpass is also a pipeline process. But there is a special feature in this, so watch out for the re-prequel (or recalculation). If we don’t to prequel, means that we in front of all these intermediate results will be kept waiting for the end of the back propagation can be discarded/release, this means that my precious memory will be wasted, this way we might as well calculate out it all away, because I have handed the result to the next piece of GPU, you don’t need to for a while. When we need the intermediate result, I can only recalculate it (in other words, each card only needs to retain its own input). After recalculation we can do the normal reverse transmission, get the gradient with respect to the input, and then pass that gradient to the last card. The previous card also performs recalculation, backtransmission, and gradient sending until all cards have completed the gradient calculation.

The reprequel operation is called checkpoint or Sublinear, there’s checkpoint in PyTorch, there’s sublinear in MegEngine, and what we’ve implemented so far is a very coarse-grained sublinear, Instead of just leaving a few results in the middle to recalculate, it actually recalculates the whole thing. That’s what GPipe is.

The prequel is the same code, as shown on the left for you to do a reference.

The first thing is GradManager. This is a very important feature of MegEngine. GradManager can differentiate intermediate features (intermediate results). So we can attach the intermediate variable during the calculation. In the case of GPipe, what we need is the derivative with respect to the input, so we attach the input data X at the beginning and then prepass (or recalse). If it’s the last card, we calculate the loss and figure out the gradient. With grad_to_prev_GPU, we pass the gradient on the input to the previous GPU. The gradient with respect to the input of the latter card is the gradient dy of the output of the former card. We manually specify the gradient through gM.Backward (dy=grad), thus completing the derivation process of the intermediate GPU. This is a simple GPipe.

If you want to try out GPipe, I wrote the MegEngine Parallel Tutorial on GitHub and you can run it.

Welcome to join us on our QQ group: 1029741705

Please visit the forum: discuss.megengine.org.cn; GitHub project address: github.com/MegEngine/M…