0 x00 the

Gradient accumulation is a technique that increases the batch size during training. Micro-batch is used locally to accumulate gradient through forward and back propagation for many times, and then gradient protocol and optimizer update are carried out, which is a common strategy for equalizing communication costs. This article compares the implementation of several frameworks/libraries to give you a further understanding of this technology.

Other articles in this series are as follows:

Deep learning pipeline parallel Gpipe(1)– pipeline basic implementation

0 x01 overview

1.1 Review

As mentioned above, there are several necessary parallel techniques for distributed model training at present:

  • Flow parallel, especially how to automatically set flow;
  • Gradient Accumulation;
  • Backward recalculation;
  • 1F1B strategy (we will use PipeDream analysis);

In the previous article, we described how Gpipe implements pipeline parallelism. In this paper, we introduce Gradient Accumulation.

0x02 Basic Concepts

Gradient accumulation is a common strategy to evenly distribute communication costs. It locally uses Micro-Batch to accumulate gradients for forward and back propagation several times, and then carries out gradient specification and optimizer update, which is equivalent to expanding the batch size by N times.

2.1 Background

The deep learning model is composed of many interconnected layers, in which the sample is transmitted, and the specific transmission consists of two processes: forward process and backword process.

  • The forward process computes the input to the output. The samples are propagated by forward propagation at each step. After propagating through all layers, the network generates predictions for the samples and then calculates the loss value for each sample, which means “How much is the network wrong about this sample?” .
  • And then there’s the reverse process. In this process, the neural network calculates the gradient of these loss values relative to the model parameters. You can think of it as a gradient accumulation process.
  • Finally, these gradients are used to calculate the update of each model parameter.

In the training, the size of each sample is specified by the super parameter Batch size, which will have a great influence on the final model effect. Under certain conditions, the larger the batch size is set, the more stable the model will be.

2.2 Causes

The cumulative gradient is, as the name implies, the cumulative gradient. Why do you add them up? I’m running out of memory.

During the training model, if all the training data are input into the model at one time, it will often cause insufficient memory. At this time, it is necessary to split a large Batch into several small batches of data (the technical term is mini-batch). The problem with small batches is that instead of calculating gradients and updating parameters after all the data has been sent in, gradients are now being calculated for each small batch. In order to calculate gradients less frequently, cumulative gradients have been introduced. In other words:

  • The whole dataset is divided into multiple batches;
  • Each batch was divided into several small batches, and each small batch was fed to the neural network.
  • Although the gradient is calculated for each small batch, the iterative update of the optimizer is not carried out after each back propagation.
  • After several small batches (i.e., all small batches in one batch), the cumulative sum of gradients calculated by each small batch is used to carry out the optimizer iterative parameter update and gradient zero clearing operations.

This is the same as feeding all the data into the model at once for training.

2.3 nature

Accumulation_steps is essentially an accumulation_BATCH_size /accumulation_steps gradient. Then, network parameters are updated according to the accumulation_steps gradient to achieve the effect similar to batch_size. In use, attention needs to be paid to the appropriate expanded learning rate.

In other words:

  • First, the entire dataset was divided into multiple batches, each batch size = 32, and assumedaccumulation steps = 8
  • becausebatch size = 32It was too large to run on a single graphics card, so we used it in forward propagationbatch_size = 32 / 8 = 4To calculate the gradient;
  • In this way, each batch is divided into multiple batch sizes of 4, and each batch is fed to the neural network one by one.
  • The gradient is calculated for each small batch, but iteration of the optimizer is not performed after each backpropagation (mean_loss is also divided by 8 during backpropagation).
  • afteraccumulation stepsAfter ten small batches (that is, all the small batches in a batch), the gradient accumulation is calculated with each small batch and the optimizer iterates to update the parameters.
  • Finally, the operation of gradient zero clearing is carried out.
  • Process the next batch.

This is the same as sending 32 batch size into the model for training effect.

Details are as follows:

+-------------------+ | GLOBAL BATCH +--------------------------+ +-------------------+ | | | +<---------------------------------------------------------------------------------+ | | | +--------------+ +--------------+ +--------------+ +--------------+ +--> | MINI BATCH 0 +---->+ MINI BATCH 1 +---->+ MINI BATCH 2 +---->+  MINI BATCH 3 | +-----+--------+ +-------+------+ +------+-------+ +-------+------+ | | | | | | | | | | | | v v v v +----+-----+ +-----+-----+ +-----+-----+ +----+-----+ | grad 0 | | grad 1 | | grad 2 | | grad 3 | +----+-----+ +-----+-----+ +-----+-----+ +----+-----+ | | | | | | | | | | | | v v v v +------+----------------------+-------------------+---------------------+------+ | | | GLOBAL BATCHGRADIENTS | | | +------------------------------------------------------------------------------+ +------------------------------------------------------------------------------------> TimeCopy the code

2.4 VS Data parallelism

Micro-batch has a high degree of similarity to data parallelism:

  • Data parallelism is spatial, the data is broken down into tensor, fed to devices at the same time for parallel calculation, and then the gradients are added together to update.
  • Micro-batch is the parallelism of data in time. The tensor is divided into a number of tensor’s. The tensor goes into the same device in sequence and calculates them in sequence.

When the total batch size is the same, and the parallelism of data parallelism is equal to the Accumulation times of Micro-batch, data parallelism and Gradient Accumulation are mathematically equivalent.

The Gradient Accumulation of multiple micro-batch gradients makes the forward calculation of the next micro-batch not dependent on the reverse calculation of the previous micro-batch. Therefore, this dependency can proceed smoothly (of course, the last micro-batch in a large batch will still trigger this dependency).

2.5 Troubleshooting

Gradient Accumulation solved a number of problems:

  • Under a single card, Gradient Accumulation can split a large batch size into several equivalent micro-batches, so as to save video memory.
  • In the case of data parallelism, Gradient Accumulation solved the problem that the overhead ratio of reverse Gradient synchronization was too large (with the increase of machines and devices, the AllReduce synchronization cost of Gradient increased), because Gradient synchronization became a sparse operation, thus improving the acceleration ratio of data parallelism.
  • Under pipeline parallelism, Gradient Accumulation enables different stages to execute different micro-batches in parallel. Through the accumulation of multiple micro-batch gradients, the forward calculation of the next micro-batch does not need to rely on the reverse calculation of the previous micro-batch, so that the calculation of each stage is not blocked. It can proceed unimpeded (of course, this dependency will still be triggered in the last micro-batch of a large batch) to achieve the purpose of assembly line.

0x03 PyTorch Gradient accumulation

3.1 Automatic Accumulation

PyTorch accumulates gradients by default. That is, PyTorch calculates the gradient after each backward(), but the gradient is not automatically zeroed out, and if it is not manually zeroed out, the gradient is continuously accumulated.

As for why there are such characteristics PyTorch, discuss.pytorch.org/t/why-do-we… Here’s an explanation. In combination with other explanations, we can roughly reach the following conclusions:

  • From the design principle of PyTorch, every time a forward calculation is made to get the predicted value, a calculation graph for gradient backsending is generated, which stores the intermediate results needed for backpropagation, and when.BACKWARD () is called, the graph is released from memory.
  • Using gradient accumulation, multi-task training can be performed with at most one computational graph saved. In multitasking, after multiple computations of previously shared tensors, calls BACKWARD () of different tasks, and the gradient of those tensors automatically accumulates.
  • Another reason is to stack multiple batch grad as a large batch to iterate when the memory size is insufficient, because the gradients obtained by the two are equivalent.
  • Due to PyTorch’s dynamogram and Autograd, there is no exact point at which you know when to stop a forward operation, because you don’t know when a calculation will end and when a new one will start. So it’s tricky to automatically set the gradient to zero.

3.2 Code Examples

Here is an example of traditional code:

for i,(images,target) in enumerate(train_loader):
    # 1. input output
    images = images.cuda(non_blocking=True)
    target = torch.from_numpy(np.array(target)).float().cuda(non_blocking=True)
    outputs = model(images)
    loss = criterion(outputs,target)
  
    # 2. backward
    optimizer.zero_grad()   # reset gradient
    loss.backward()
    optimizer.step()
Copy the code

Then an example of gradient accumulation is given:

  • Obtain loss: input image and label, obtain predicted value through calculation, and calculate loss function;
  • loss.backward()Back propagation, calculate the current gradient;
  • Repeat steps 1-2 for many times, without clearing the gradient, so that the gradient is accumulated on the existing gradient;
  • The gradient is accumulated a certain number of times, firstoptimizer.step()Update the network parameters based on the cumulative gradient, and thenoptimizer.zero_grad()Clear past gradients to prepare for the next gradient accumulation;
for i, (images, target) in enumerate(train_loader): # 1. input output images = images.cuda(non_blocking=True) target = Torch.from_numpy(Np.array (target)).float().cuda(non_blocking=True) outputs = model(images) # forward propagation loss = Criterion (input, target) # Calculate loss # 2. Backward loss. Backward () # backpropagation, Net parameters if ((I +1)%accumulation)==0: # optimizer The net optimizer.step() # update the network parameter optimizer.zero_grad() # reset grdient # reset the gradientCopy the code

3.3 Gradient accumulation of distributed data apartmentis

DistributedDataParallel (DDP) implements data parallelism at the Module level. It uses the Torch. Distributed package communication Collectives to synchronize gradients, parameters, and buffers. Parallelism is useful both within a single process and across processes.

In this case, gradient Accumulation can also be used, but it needs to be adjusted to improve efficiency.

3.3.1 Gradient accumulation of single card model

We first recall the single-card model, that is, how gradient accumulation is done in the ordinary case.

Train_loader optimizer.zero_grad() for _ in range(K): Prediction = Model (data/K) loss = LOss_FN (prediction, label)/K loss. Backward () # Accumulate gradient, do not apply gradient change, Optimizer.step () # Apply gradient update to update network parameters onceCopy the code

At the loss.Backward () statement, DDP performs the gradient protocol ALL_reduce.

Because there are K steps in each gradient accumulation cycle, there are K all_reduces. But in fact, in each gradient accumulation cycle, optimizer.step() has only one time, which means that in the K times of loss. Backward (), we can only do all_reduce once, and the previous K-1 all_reduce is useless.

3.3.2 How can DDP be accelerated

Then we think whether there can be a switch in loss. Backward () so that we can only do backpropagation without gradient synchronization (accumulation) in the previous K-1 loss. Backward ().

DDP has this problem in mind by providing a context function no_sync() that temporarily unsynchronizes gradient synchronization. DDP does not perform gradient synchronization under no_sync()context. But the first forward-backward after the end of the no_sync() context is synchronized.

The final code is as follows:

Model = DDP(model) for data in enumerate(train_loader # each gradient accumulation loop optimizer.zero_grad() for _ in range(k-1):# first k-1 step Gradient synchronization (cumulative gradient) is not performed. with model.no_sync(): Prediction = model(data/K) loss = LOss_FN (prediction, label)/K loss. Backward () Prediction = model(data/K) Loss = LOss_fn (prediction, Label)/K loss.backward() # Gradient synchronization (cumulative gradient) with the KTH step optimizer.step() # Apply gradient update to update network parametersCopy the code

3.3.3 no_sync implementation

The code for no_sync is as follows:

    @contextmanager
    def no_sync(self):
        r"""
        A context manager to disable gradient synchronizations across DDP
        processes. Within this context, gradients will be accumulated on module
        variables, which will later be synchronized in the first
        forward-backward pass exiting the context.
​
        Example::
​
            >>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg)
            >>> with ddp.no_sync():
            >>>   for input in inputs:
            >>>     ddp(input).backward()  # no synchronization, accumulate grads
            >>> ddp(another_input).backward()  # synchronize grads
        """
        old_require_backward_grad_sync = self.require_backward_grad_sync
        self.require_backward_grad_sync = False
        try:
            yield
        finally:
            self.require_backward_grad_sync = old_require_backward_grad_sync
Copy the code

How to use it? Require_backward_grad_sync = conditional forward, conditional forward, conditional forward, conditional forward, conditional forward, conditional forward, conditional forward, conditional forward, conditional forward, conditional forward, conditional forward Prepare_for_forward () and reducer.prepare_for_BACKWARD are called, and require_forward_param_sync is set to True.

def forward(self, *inputs, **kwargs): with torch.autograd.profiler.record_function("DistributedDataParallel.forward"): self.reducer.save_thread_local_state() if torch.is_grad_enabled() and self.require_backward_grad_sync: Set_runtime_stats_and_log () self.num_iterations += 1 self.prepare.for_for_forward () # If torch.is_grad_enabled() and self.require_backward_grad_sync: Self. Require_forward_param_sync = True if self. Find_unused_parameters and not self. # Do not need to populate this for static graph. self.reducer.prepare_for_backward(list(_find_tensors(output))) else: Prepare_for_backward ([]) else: self.require_forward_param_sync = False # Leave out some codeCopy the code

Look again at the two approaches to Reducer.

Prepare_for_forward is just statistical work and can be ignored.

void Reducer::prepare_for_forward() { std::lock_guard<std::mutex> lock(mutex_); num_iterations_++; if (should_collect_runtime_stats()) { record_forward_compute_start_time(); }}Copy the code

Prepare_for_backward does the resetting and preparation work, related to gradient accumulation are expect_autograd_hooks_ = true.

void Reducer::prepare_for_backward( const std::vector<torch::autograd::Variable>& outputs) { std::lock_guard<std::mutex>  lock(mutex_); // Reset accounting. expect_autograd_hooks_ = true; Reset_bucket_counting (); // Reset unused parameter accounting. has_marked_unused_parameters_ = false; // Reset per iteration marked ready parameters. perIterationReadyParams_.clear(); // If static graph is not set, search graph to detect unused parameters. // When static graph is set, unused_parameters_ will be detected and will // not change after 1st iteration. // If static_graph_ = false and find_unused_parameters_ is false, // we assume that autograd hooks for ALL variables will be called, // and we don't have to search the autograd graph for presence of these hooks. if (dynamic_graph_find_unused()) { unused_parameters_.clear(); search_unused_parameters(outputs); }}Copy the code

Expect_autograd_hooks_ = true How to use it? Reducer::autograd_hook. If no all-reduce operation is required, return the Reducer directly.

void Reducer::autograd_hook(VariableIndex index) { std::lock_guard<std::mutex> lock(this->mutex_); // Carry over thread local state from main thread. This allows for // thread-local flags such as profiler enabled to be configure correctly. at::ThreadLocalStateGuard g(thread_local_state_); // Ignore if we don't expect to be called. // This may be the case if the user wants to accumulate gradients // for number of iterations before reducing them. if (! Expect_autograd_hooks_) {// If the all-reduce operation is not required, return it directly. return; } // omit subsequent codeCopy the code

It’s a little convoluted, so let’s sort it out:

A step has two operations: forward and backward.

  • Require_backward_grad_sync = True Indicates forward

    • Set require_forward_param_sync = True.
    • The reducer.prepare_for_forward() and reducer.prepare_for_BACKWARD are called
    • Prepare.prepare_for_backward means that expect_autograd_HOOks_ = true will be set. Expect_autograd_hooks_ is the key.
  • When operating with BACKWARD:

    • Expect_autograd_hooks_ = true means that all-reduce operations can be performed when BACKWARD.
    • Otherwise, the system returns directly and does not perform the All-reduce operation.

As shown below,

  • The top half is the logic of the forward, which is the forward() function,
  • The lower part is the logic with backward, which is the Reducer::autograd_hook() function.
  • Expect_autograd_hooks_ is the key to cascading forward and backward.
forward +---------------------------------------------------------------------------------+ | forward() | | | | require_backward_grad_sync == True?? +---------+ | | + | | | | | | | | Yes | | | | | No | | v | | | reducer.prepare_for_forward | | | + | | | | | | | | | | |  v | | | reducer.prepare_for_backward | | | + | | | | | | | | | | | v v | | expect_autograd_hooks_ = true expect_autograd_hooks_ = false | | + + | | | | | +---------------------------------------------------------------------------------+ | | +--------------------------------------------------------------------------------+ backward | | | | +--------------------------------------------------------------------------------+ | | | | | Reducer::autograd_hook() | | | | | | | | | +----------------------------+ | | | | | | | | | | v v | | expect_autograd_hooks_ == True?? +------------+ | | + | | | | Yes | | | | | No | | v v | | Do All-Reduce Return | | | | | +--------------------------------------------------------------------------------+Copy the code

The no_sync operation means setting require_backward_grad_sync = False, and eventually expect_autograd_HOOks_ = False. In this way, all-reduce operations are not performed when BACKWARD.

0 x04 Tensorflow implementation

In PyTorch, gradients are accumulated by default as long as they are not zeroed, so this is easy to do. But in Tensorflow, it’s not so easy.

We get the following sample code from StackOverflow:

TVS = tf.trainable_variables() # To record the cumulative gradient of each variable, Accum_vars = [tf.variable (tf.zerOS_like (tv.initialized_value())), accum_vars = [tf.variable (tf.zerOS_like (tv.initialized_value())), Trainable =False) for TV in TVS] # define zero_OPS = [tv.assign(tf.zerOS_like (TV)) for TV in accum_vars] ## Use compute_gradients of the optimizer to calculate gradients GVS = opt.compute_gradients(RMSE, TVS) ## accumulative current gradient on previously defined variables accum_ops = [Accum_vars [I]. Assign_add (GV [0]) for I, gV in enumerate(GVS)] ## Train_step = opt.apply_gradients([(Accum_vars [I], GV [1]) for I, GV in enumerate(GVS)]) ## Train_step = opt.apply_gradients([(Accum_vars [I], GV [1]) for I, GV in enumerate(GVS)]) Accum_ops is used for accum_ACCUM_VARS 'N_minibatches' sub-gradient accumulation for I in xrange(N_minibatches): Sess.run (Accum_ops, feed_dict=dict(X: Xs[I], Y: ys[I])) # Update sess.run(train_step) with accumulated gradientsCopy the code

0 x05 Gpipe implementation

In GPipe’s pipelining parallelism example, each “point in time” can do different micro-batches at multiple stages at the same time. The number in each box in the figure indicates the number of micro-batches; The same micro-batch or serial passes through all stages. In this case, the idle time of each device is only about 25%.

\

The specific code is as follows:

5.1 the optimizer

In lingvo/core/optimizer. Py GradientAggregationOptimizer in concrete implementation, the key code for apply_gradients, logic is:

  • If _num_micro_batches is batches 1, apply_gradients is used instead of gradient.

  • Iterate over the list of grads_and_vars, accumulating gradients;

  • Accum_step is a gradient accumulation condition:

    • If the number of small batch iterations is reached, _ApplyAndReset is called:

      • Call apply_gradients to apply gradients;
      • Call zero_op to clear the gradient;
    • Otherwise, _Accum is called, no_op does nothing;

The specific code is as follows:

def apply_gradients(self, grads_and_vars, global_step=None, name=None): if self._num_micro_batches == 1: return self._opt.apply_gradients(grads_and_vars, global_step) global_step = global_step or py_utils.GetOrCreateGlobalStepVar() with tf.init_scope(): Self. _create_slots([v for (_, v) in grads_and_vars]) Accums = [] variables = [] # accum = self.get_slot(v, 'grad_accum') variables.append(v) # pytype: disable=attribute-error if isinstance(g, tf.IndexedSlices): scaled_grad = tf.IndexedSlices( g.values / self._num_micro_batches, g.indices, dense_shape=g.dense_shape) else: scaled_grad = g / self._num_micro_batches accum_tensor = accum.read_value() accums.append(accum.assign(accum_tensor + Scaled_grad) # pytype: enable=attribute-error def _ApplyAndReset(): normalized_accums = accums if self._apply_crs_to_grad: normalized_accums = [ tf.tpu.cross_replica_sum(accum.read_value()) for accum in accums ] apply_op = self._opt.apply_gradients( list(zip(normalized_accums, variables))) with tf.control_dependencies([apply_op]): zero_op = [tf.assign(accum, tf.zeros_like(accum)) for accum in accums] return tf.group(zero_op, Tf. assign_add(global_step, 1)) def _Accum(): Return tf.no_op() # gradient accumulation condition, if small batch iteration number is reached, apply gradient, clear gradient, Accum_step = TF.cond (tF.math.floormod (self._counter + 1, self._num_micro_batches), 0), _ApplyAndReset, # Apply the accumulated gradients and reset. _Accum) # Accumulate gradients. with tf.control_dependencies([tf.group(accums)]): return tf.group(accum_step, tf.assign_add(self._counter, 1))Copy the code

5.2 the wrapper

ShardedAdam gave GradientAggregationOptimizer and ShardedAdamOptimizer packaging, user can be used directly.

class ShardedAdam(optimizer.Adam): """Adam optimizer wrapper that shards the slot variables.""" @classmethod def Params(cls): params = super().Params() params.Define('num_micro_batches', 1, 'Number of accumulated batches.') return params def GetOptimizer(self, lr): p = self.params opt = ShardedAdamOptimizer( learning_rate=lr, beta1=p.beta1, beta2=p.beta2, epsilon=p.epsilon, name=p.name) if p.num_micro_batches > 1: Tf. Logging. The info (' Applying gradient aggregation. ') opt = optimizer. GradientAggregationOptimizer (# cumulative opt application of gradient, p.num_micro_batches, apply_crs_to_grad=True) self._cached_opt = opt return optCopy the code

5.3 applications

There are how to use ShardedAdam DenseLm12kWide41BAdam16x16.

@model_registry.RegisterSingleTaskModel class DenseLm12kWide41BAdam16x16(DenseLm128B16x16): ""41B params LM model with 2D split and ADAM optimizer on v3-512.""" # Each layer has 1.6875b parameters. SEQUENCE_LENGTH = 2048 NUM_DEVICES_PER_SPLIT = 512 BATCH_DIM_PER_DEVICE = 0.5 # Total Batch size 256 DEVICE_MESH_SHAPE =  [16, 32] DEVICE_MESH = gshard_utils.GetNonPod2dMesh(DEVICE_MESH_SHAPE, [16, 16, 2]) NUM_TRANSFORMER_LAYERS = 24 HIDDEN_DIM = 48 * 1024 MODEL_DIM = 12 * 1024 NUM_HEADS = 96 ATTENTION_KEY_VALUE_DIM = 128 GATED_GELU = False POSITIONAL_EMBEDDING = True NUM_MICRO_BATCHES = 1 def Task(self): P = super().task () # shardedadam.params ().set (beta1=0.9, beta2=0.999, epsilon=1e-6, num_micro_batches=self.NUM_MICRO_BATCHES) return pCopy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

PyTorch DDP series 3: Combat and Skills