0 x00 the

This article is based on the following two official documents to learn how TensorFlow performs distributed training:

Tensorflow. Google. Cn/guide/distr…

Github.com/tensorflow/… (This is the latest official update.)

The other articles in this series are:

Heterogeneous Distribute Learning based on TensorFlow distributed thesis [翻译

Implementation of Control Flow in TensorFlow

TensorFlow Distributed environment (1) — overall architecture

TensorFlow distributed environment (2)– Master static logic

TensorFlow distributed environment (3)- Worker static logic

TensorFlow distributed environment (4) — WorkerCache

TensorFlow distributed environment (5) — Session

TensorFlow distributed environment (6) — Master dynamic logic

TensorFlow distributed environment (7) — Worker dynamic logic

TensorFlow distributed environment (8) — communication mechanism

1. An overview of the

Tf.distribute.Strategy is a TensorFlow API for distributed training on multiple Gpus, multiple machines, or TPUS. Using this API, you can implement distributed training based on existing models and training code for single-machine multi-card, multi-machine multi-card, and so on with few code changes.

Tf.distribute.Strategy aims to achieve the following objectives:

  • User use cases covering different dimensions.
  • Easy to use, support a variety of users (including researchers and ML engineers, etc.).
  • Provides high performance out of the box.
  • Decoupled from user model code so that policies can be easily switched.
  • Support Custom Training Loop, Estimator, Keras.
  • Eager Excution is supported.

Tf.distribut.strategy can be used with advanced apis such as Keras, Model.fit, and can also be used to distribute custom training loops (and (in general) any computation using TensorFlow). For example, model building and model.pile () calls are wrapped inside strategy.scope().

In TensorFlow 2.x, you can execute the program immediately, or you can use tF.function to execute it in a computational graph. Although tF.distribute.Strategy supports both execution patterns, tF.function works best. Suggestion only Eager model used for debugging, and tf. Distribute. TPUStrategy support this pattern. Although much of this guide is devoted to training, the API can also be used to distribute estimates and predictions across different platforms.

You only need to change a little bit of code when using TF.distribut.Strategy, because we modified the underlying component of TensorFlow to make it policy-aware. These components include variables, layers, models, optimizers, metrics, summaries, and checkpoints.

In this guide, we will cover the various types of policies and how to use them in different situations.

2. Policy type

Tf.distribute.Strategy is intended to cover many use cases on different axes. Some of these combinations are currently supported, and others will be added in the future. Some of these axes include:

  • Synchronous and asynchronous training: These are two common methods of distributed training through data parallelism. In synchronous training, all worker processes synchronously train different pieces of input data and aggregate gradients at each step. In asynchronous training, all worker processes independently train input data and asynchronously update variables. In general, synchronous training is implemented through all-reduce, while asynchronous training is implemented through parametric server architecture.
  • Hardware platform: You may need to extend training to multiple Gpus on a single machine, or to multiple machines in a network (each machine has zero or more Gpus), or to Cloud TPUS.

To support these use cases, MirroredStrategy TPUStrategy, MultiWorkerMirroredStrategy ParameterServerStrategy, optional CentralStorageStrategy these six kinds of strategy. In the next section, we will explain which policies are currently supported in which scenarios. Here’s a quick overview:

Note: Experimental support means that compatibility of the API is not guaranteed.

Note: Support for Estimator is limited. Basic training and evaluation is experimental and advanced features such as scaffolding are not implemented. If a use case is not covered, you should use Keras or custom training loops. Estimator is not recommended for new code, because Estimato R’s code style is “v1.session”, which is difficult to write correctly and can have unexpected behavior, especially when combined with TF 2 code.

2.1 MirroredStrategy

Tf. Distribute. MirroredStrategy support on a machine more than one GPU to synchronize distributed training (single card data parallel). This policy creates a copy of the model for each GPU device. Each variable in the model is mirrored across all copies. These variables came together into a conceptual MirroredVariable called MirroredVariable. These variables stay in sync with each other by applying the same updates.

MirroredVariable’s synchronized update only accelerated magic, but didn’t really share variables in memory the way CPU parallelism did. That is, graphics card parallel computing only increases speed, does not double the amount of user data. Adding data still throws out of memory errors.

MirroredStrategy used an efficient all-reduce MirroredStrategy to pass changes from device to device. The all-reduce algorithm aggregates tensors on each device by adding them up and making them available on each device. This is a very efficient fusion algorithm and can significantly reduce synchronization overhead. Depending on the type of communication available between devices, there are many all-reduce algorithms and implementations that can be used. NVIDIA NCCL is used as all-reduce by default. You can choose from other options we provide, or you can write your own.

Details are as follows:

Figure 1 MirroredStrategy comes from TensorFlow

Here’s the easiest way to create MirroredStrategy:

mirrored_strategy = tf.distribute.MirroredStrategy()
Copy the code

This created a MirroredStrategy instance that used all gpus visible to TensorFlow and used NCCL for cross-device communication. If you only want to use part of the GPU on your machine, you can do this:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0"."/gpu:1"])
Copy the code

If you want to rewrite the communications across devices, can be achieved by using cross_device_ops parameters to provide tf. Distribute. CrossDeviceOps instance.

At present, in addition to the default option tf. Distribute. NcclAllReduce, And tf. Distribute. HierarchicalCopyAllReduce and tf. Distribute. ReductionToOneDevice two options.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
Copy the code

2.2 TPUStrategy

. You can use the tf. Distribute. Experimental TPUStrategy run on tensor processing unit (TPU) TensorFlow training. TPU is Google’s proprietary ASIC designed to significantly speed up machine learning workloads. You can use it through Google Colab, TensorFlow Research Cloud, and Cloud TPU platforms.

TPUStrategy and MirroredStrategy were similar when it came to distributed training architectures, implementing synchronized distributed training. TPU implements efficient all-reduce and other set operations across multiple TPU cores and applies them to TPUStrategy.

Here’s how to instantiate TPUStrategy:

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(     tpu=tpu_address) tf.config.experimental_connect_to_cluster(cluster_resolver) tf.tpu.experimental.initialize_tpu_system(cluster_resolver) tpu_strategy = tf.distribute.experimental.TPUStrategy(cluster_resolver)
Copy the code

The TPUClusterResolver instance helps locate tpus. In Colab, you do not need to specify any parameters for it.

To use it for Cloud TPU, you must:

  • Specify the name of the TPU resource in tPU parameter.
  • In the programstartTo explicitly initialize the TPU system. This is a necessary step before performing calculations using TPU. TPU initialization also clears TPU memory. Therefore, complete this step first to avoid status loss.

2.3 MultiWorkerMirroredStrategy

Tf. Distribute. Experimental. MultiWorkerMirroredStrategy and MirroredStrategy are very similar. It enables synchronous distributed training (multi-machine, multi-card distributed version) across multiple worker processes, each of which may have multiple Gpus. Similar to MirroredStrategy, it also created copies of all variables in each device’s model across all work processes.

Figure 2 MultiWorkerMirroredStrategy from TensorFlow

It uses CollectiveOps as an all-reduce communication method for multi-worker processes to keep variables in sync. The set operation is a single operation in the TensorFlow computation graph, which can automatically select an all-reduce algorithm during TensorFlow operation based on hardware, network topology, and tensor size.

Figure 3 MultiWorkerMirroredStrategy data flow. From TensorFlow

It also implements other performance optimizations. For example, static optimization can transform multiple all-reduces on small tensors into fewer all-reduces on large tensors. In addition, we are designing a plug-in architecture for it so that you can use algorithms that are better optimized for your hardware in the future as plug-ins.

The following is the simplest way to create MultiWorkerMirroredStrategy:

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
Copy the code

MultiWorkerMirroredStrategy currently provides you with two different set operation method. CollectiveCommunication. RING by RPC would act as a communication layer based on the set of RING, support the CPU and GPU. CollectiveCommunication NCCL NCCL use to implement collection. CollectiveCommunication. AUTO will choose until runtime. The best choice for a collection implementation depends on the number and variety of Gpus, as well as network interconnections in the cluster. You can specify this in the following ways:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
Copy the code

or

if distribution_strategy == "multi_worker_mirrored":
  return tf.distribute.experimental.MultiWorkerMirroredStrategy(
      communication=_collective_communication(all_reduce_alg))
Copy the code

Details are as follows:

def _collective_communication(all_reduce_alg) :
  """Return a CollectiveCommunication based on all_reduce_alg. Args: all_reduce_alg: a string specifying which collective communication to pick, or None. Returns: tf.distribute.experimental.CollectiveCommunication object Raises: ValueError: if all_reduce_alg not in [None, "ring", "nccl"] """
  collective_communication_options = {
      None: tf.distribute.experimental.CollectiveCommunication.AUTO,
      "ring": tf.distribute.experimental.CollectiveCommunication.RING,
      "nccl": tf.distribute.experimental.CollectiveCommunication.NCCL
  }
  if all_reduce_alg not in collective_communication_options:
    raise ValueError(
        "When used with multi_worker_mirrored, valid values for "
        "all_reduce_alg are [ring, nccl]. Supplied value: {}".format(
            all_reduce_alg))
  return collective_communication_options[all_reduce_alg]
Copy the code

One major difference in multi-worker process training compared to multi-GPU training is the setup of multi-worker process. The TF_CONFIG environment variable is the standard way to specify cluster configuration in TensorFlow for each worker process that is part of a cluster.

2.4 CentralStorageStrategy

Tf. Distribute. Experimental. CentralStorageStrategy also perform synchronous training. Variables are not mirrored, but are placed uniformly on the CPU, and models and calculations are copied to all local Gpus (this is in-graph copying, where a calculation graph covers multiple copies of the model). If there is only one GPU, all variables and calculations will be placed on that GPU. This can handle the case where the embedding cannot be placed on a GPU. For example, the following figure shows multiple Gpus in a single machine.

Figure 4 CentralStorageStrategy. From TensorFlow

You can create an instance of CentralStorageStrategy with the following code:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
Copy the code

This creates an instance of CentralStorageStrategy that will use all visible Gpus and cpus. Updates to variables on replicas are aggregated before being applied to variables.

Note: This strategy is experimental, and we are currently working to make it available for more scenarios. Stay tuned for future API changes. The advantage of CentralStorageStrategy is that the GPU load is balanced, but the communication cost between CPU and GPU is high.

2.5 ParameterServerStrategy

Parameter server training is a common data parallel method that can be extended to multiple machines. A parameter server training cluster consists of workers and parameter servers. The parameter server is used to create/manage variables uniformly during the training process (each variable of the model is placed on the parameter server) and the variables are read and updated by the worker at each step. Calculations are copied to all gpus of all worker processes (note: this V1 policy only applies to the Estimator API).

In TensorFlow 2, parameter server training uses a central coordinator-based architecture, Through this tf. Distribute. Experimental. The coordinator. ClusterCoordinator class to complete.

The TensorFlow 2 parameter server updates asynchronously, that is, variables are read and updated independently on each working node without any synchronous operation. Because the work nodes are not dependent on each other, this strategy can be fault-tolerant for workers, which can be helpful when using preemptive servers.

In this implementation, the worker and parameter Servers run tF.distribution.Servers to hear the coordinator’s tasks. The coordinator is responsible for creating resources, assigning training tasks, writing checkpoints, and handling task failures.

Figure 5 ParameterServerStrategy. From TensorFlow

To run on the coordinator, you need to define the training steps using the ParameterServerStrategy object and dispatch the training steps to remote workers using the ClusterCoordinator. Here is the easiest way to create them.

strategy = tf.distribute.experimental.ParameterServerStrategy( tf.distribute.cluster_resolver.TFConfigClusterResolver(),  variable_partitioner=variable_partitioner) coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator( strategy)Copy the code

Note: If you use TFConfigClusterResolver, you will need to configure the TF_CONFIG environment variable. It is similar to the ‘TF_CONFIG MultiWorkerMirroredStrategy, but there are additional considerations. In TensorFlow 1, Pat ParameterServerStrategy only by tf.com. V1. Distribution. Experimental. ParameterServerStrategy symbols used with Estimator.

Note: This strategy is experimental and is currently under active development.

2.6 Other Policies

In addition to the above strategies, there are two other strategies that might be helpful for prototyping and debugging using the TF.Distribute API.

2.6.1 Default Policy

The Default Strategy is a distributed policy that is used when no explicit distribution policy is specified in the scope. This policy implements the TF.distribute.Strategy interface, but only has the pass-through function, and does not provide actual distribution. For example, strategy.run(fn) only calls fn. Code written with this policy is exactly the same as code written without any policy. You can think of this as a “no operation no-op” policy.

The default policy is a single instance and no more instances of it can be created. This policy can be obtained by using tf.distribut.get_strategy () outside the scope of any explicit policy (the same API that can be used to obtain the current policy within the scope of the explicit policy).

default_strategy = tf.distribute.get_strategy()
Copy the code

This strategy has two main uses:

  • It allows you to write distribution-aware library code unconditionally. For example, in the optimizer, we could execute tF.distribute.get_strategy () and use the policy to specify the gradient, and it will always return a policy object on which we can call the Strategy.reduce API.
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM".1.)  # reduce some values
Copy the code
  • Similar to library code, it allows a user program to work with or without a distribution strategy, without conditional logic. The following sample code snippet shows this:
if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
Copy the code

2.6.2 OneDeviceStrategy

Tf. Distribute. OneDeviceStrategy will is a kind of all variables and calculation on individual specified device strategy.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
Copy the code

This policy differs from the default in a number of ways. In the default policy, the variable placement logic remains unchanged when compared to a TensorFlow run without any distribution policy. However, when OneDeviceStrategy is used, all variables created in its scope are explicitly placed on the specified device. In addition, any functions called through oneDeviceStrategy.run will also be placed on the specified device.

The input distributed through this policy will be pre-extracted to the specified device. In the default policy, there is no input distribution. Similar to the default policy, you can use this policy to test your code before switching to other policies that are actually distributed across multiple devices/machines. This makes more use of distributed strategy mechanics than the default strategy, but it’s not nearly as useful as using strategies like MirroredStrategy or TPUStrategy. If you want your code to behave like there is no policy, use the default policy.

So far, we’ve discussed the different strategies available and how to instantiate them. In the next few sections, we’ll discuss different ways to use them for distributed training. We’ll show short snippets of code in this guide, along with links to the full tutorial that you can run from start to finish.

3. Use in tF.keras.model.fit

We have integrated tF.distribut.Strategy into TF.keras. Tf.keras is a high-level API for building and training models. With this policy integrated into the TF.keras backend, you can use Model.fit for seamless distributed training within the KerAS training framework. You need to make the following changes to the code:

  1. Create a suitable tF.distribut.Strategy instance.
  2. Move the creation of the Keras model, optimizer, and metrics to Strategy.scope.

We support all types of Keras models: Sequential, Functional, and subclassed. Here is the code that creates a very simple Keras model with a Dense layer:

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
Copy the code

In this example we used MirroredStrategy, so we could run MirroredStrategy on machines with multiple Gpus. Strategy.scope () instructs Keras which strategy to use for distributed training. We can create distributed variables instead of unconventional variables by creating models/optimizers/metrics within this scope. Once set up, you can fit the model as usual. MirroredStrategy was responsible for copying the model’s training to available Gpus, as well as for aggregation gradients.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Copy the code

We use tf.data.Dataset here to provide training and evaluation inputs. You can also use Numpy arrays:

import numpy as np
inputs, targets = np.ones((100.1)), np.ones((100.1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Copy the code

In both cases (Dataset or Numpy), each batch of a given input is averaged across multiple copies. For example, if MirroredStrategy was used on two Gpus, each batch of size 10 was equally MirroredStrategy divided into two Gpus, each of which received five input samples per move. If you add more Gpus, the training per cycle will be faster. When adding more accelerators, it is often necessary to increase the batch size to effectively utilize the additional computing power. You also need to readjust the learning rate based on the model. You can use strategy.num_replicas_in_sync to get the number of replicas.

# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1.10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
Copy the code

Currently, the following policies are supported:

4. Use in custom training loops

As you can see, using tF.distribut.strategy in Keras Model.fit requires only a few lines of code change. With a little more effort, you can also use tF.distribut.strategy in a custom training loop.

If you need more flexibility and control over training loops with Estimator or Keras, you can write custom training loops. For example, when using a GAN, you might want to use a different number of generator or discriminator steps per round. Similarly, high-level frameworks are not well suited to intensive learning training. To support custom training loops, we provide a set of core methods through the tF.distribut.Strategy class. Using these methods might require a slight refactoring of the code at first, but once you’re done, you can switch between gpus, TPus, and multiple machines simply by changing the policy instance.

We’ll illustrate this use case with a short code snippet with a simple training sample using the same Keras model as before. First, create the model and optimizer within the scope of the policy. This ensures that any variables created using this model and the optimizer are mirror variables.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()
Copy the code

Next, we create the input data set and invoke the tf. Distribute. Strategy. Experimental_distribute_dataset to distribution data set according to the Strategy.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
Copy the code

Then, we define a training step. We will use TF.gradienttape to calculate gradients and use the optimizer to apply these gradients to update model variables. Distribution for this training step, we add a train_step function, the function and the data obtained from the previously created dist_dataset set input is passed to the tf together. Distrbute. Strategy. Run:

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions) :
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs) :
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs) :
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)
Copy the code

Note the following points in the above code:

  1. We used tF.nn.com pute_average_Loss to calculate the damage. Tf.nn.com pute_average_loss adds up the losses for each sample and then divides the sum by global_batch_size. This is important because later, after the gradient is calculated on each copy, it is aggregated across copies by summing them.
  2. We use the tf. Distribute. Strategy. ReduceAPI to aggregate tf. Distribute. Strategy. Return to the result of the run. Tf. Distribute. Strategy. Will run from the Strategy of each local copy to return the result. There are several ways to use the results, such as reducing them to get aggregate values. Can also by performing a tf. Distribute. Strategy. Experimental_local_results obtain the values contained in the result list, each local copy a list.
  3. When apply_gradients is called within a distribution policy scope, its behavior is modified. Specifically, it sums up (sum-over-all-replicas) all copies of the gradient before applying it to each parallel instance during synchronous training.

Finally, when we have defined the training steps, we can iterate over the dist_dataset and run the training in a loop:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
Copy the code

In the above example, we provided input to the training by iterating over the dist_dataset. We also provide tf. Distribute. Strategy. Make_experimental_numpy_dataset input to support the Numpy. You can call tf. Distribute. Strategy. Experimental_distribute_dataset before use this API to create data sets.

Another way to iterate over data is to use iterators explicitly. You might use this approach when you want to run a given number of steps instead of iterating through the entire data set. You can now modify the above iteration to create an iterator and then explicitly call Next on it to get the input data.

iterator = iter(dist_dataset)
for _ in range(10) :print(distributed_train_step(next(iterator)))
Copy the code

Below is the use of tf. Distribute. StrategyAPI distribution to customize training cycle (custom training loops) the simplest case.

5. Other topics

In this section, we cover topics related to multiple use cases.

5.1 Setting the TF_CONFIG Environment Variable

For multi-worker training, as mentioned earlier, you need to set the TF_CONFIG environment variable for each binary running in the cluster. The TF_CONFIG environment variable is a JSON string that specifies the tasks that make up the cluster, their addresses, and the role of each task in the cluster. We provide a Kubernetes template in the TensorFlow /ecosystem repository to set TF_CONFIG for your training task.

TF_CONFIG has two components: Cluster and Task.

  • The cluster provides information about the training cluster, which is a dictionary of different types of jobs, such as worker processes. In multi-worker process training, there is usually one worker process that has more responsibilities than the regular worker process, such as saving checkpoints and writing summary files for TensorBoard. This type of worker process is called the “Chief” worker process, and a worker process with index 0 is customarily designated as the Chief worker process (in fact, this is how TF.distribute.Strategy is implemented).
  • Task, on the other hand, provides information about the current task. The first component cluster is the same for all worker processes, while the second component Task is different for each worker process and specifies the type and index of that worker process.

An example of TF_CONFIG is as follows:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port"."host2:port"."host3:port"]."ps": ["host4:port"."host5:port"]},"task": {"type": "worker"."index": 1}})Copy the code

This TF_CONFIG specifies that the cluster contains three worker processes and two PS tasks, along with their hosts and ports. The “Task” section specifies the role of the current task in the cluster, namely worker 1 (the second worker process). The valid roles in the cluster are “Chief”, “worker”, “ps”, and “evaluator”. . In addition to the use of tf. Distribute. Experimental ParameterServerStrategy when, should not be “ps” homework.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

0 XFF reference

Use TensorFlow for distributed training

Github.com/tensorflow/…

Getting started with Tensorflow 4: Initial Distributed training