In my last article, TensorFlow 2.x Overview of Distributed Training, I gave a brief introduction to the distributed training strategies supported by TensorFlow and how distributed training can be implemented. On the basis of the previous article, this paper will focus on the implementation of kerAS-based multi-node distributed training and some details that need to be paid attention to in the process of distributed training.

End-to-end examples

First of all, we will implement an end-to-end example of multi-node distributed training based on Keras, and have a general understanding of multi-node distributed training from a macro perspective. Here use MultiWorkerMirroredStrategy distributed strategy, its code is shown below:

import os
import json
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from absl import app, flags
import numpy as np

FLAGS = flags.FLAGS
flags.DEFINE_string("logs"."logs"."logs dir")
flags.DEFINE_integer("index".0."worker index")

class ThreeLayerMLP(keras.Model) :
    def __init__(self, name=None) :
        super().__init__(name=name)
        self.dense_1 = layers.Dense(32, activation='relu', name='dense_1')
        self.dense_2 = layers.Dense(16, activation='relu', name='dense_2')
        self.pred_layer = layers.Dense(
            1,
            activation='sigmoid',
            name='predictions'.)def call(self, inputs) :
        x = self.dense_1(inputs)
        x = self.dense_2(x)
        return self.pred_layer(x)

def prepare_data() :
    np.random.seed(0)

    x_train, y_train = (
        np.random.random((6000.32)),
        np.random.randint(2, size=(6000.1)),
    )
    x_val, y_val = (
        np.random.random((1000.32)),
        np.random.randint(2, size=(1000.1)))return ((x_train, y_train), (x_val, y_val))

def main(argv) :
    del argv  # Unused args
    tf_config = {
        "cluster": {
            "worker": ["localhost:12345"."localhost:12346"],},"task": {
            "index": FLAGS.index,
            "type": "worker"
        }
    }
    os.environ["TF_CONFIG"] = json.dumps(tf_config)
    print(json.loads(os.environ["TF_CONFIG"]))

    # distributed strategy
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    BATCH_SIZE_PER_REPLICA = 64
    BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
    print('Number of devices: %d' % strategy.num_replicas_in_sync)

    with strategy.scope():
        model = model = ThreeLayerMLP(name='3_layer_mlp')
        model.compile(
            loss=tf.keras.losses.BinaryCrossentropy(),
            optimizer=keras.optimizers.RMSprop(),
            metrics=["AUC"],
        )

    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=FLAGS.logs,
        histogram_freq=1,
        update_freq='batch',
    )

    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=os.path.join(FLAGS.logs, "checkpoints"."ckpt"), )

    ((x_train, y_train), (x_val, y_val)) = prepare_data()
    model.fit(
        x_train,
        y_train,
        epochs=5,
        batch_size=BATCH_SIZE,
        validation_data=(x_val, y_val),
        callbacks=[tensorboard_callback, checkpoint_callback],
    )

    model_dir = os.path.join(FLAGS.logs, "models".str(FLAGS.index))
    model.save(model_dir)

if __name__ == '__main__':
    app.run(main)
Copy the code

In distributed training, all worker nodes perform the same training code, with the only difference being the value of the TF_CONFIG environment variable. In the example, we first define two worker nodes, and then control the task index of each worker node by passing the index parameter, so that the two worker nodes can have different TF_CONFIG values at run time. Thus, distributed training can be carried out correctly.

I describe the principle of multi-node distributed training in distributed Training Overview: First, each worker node in the cluster will start its OWN GRPC service according to IP :port information in TF_CONFIG. Then they will listen to GRPC service of other worker nodes and try to establish communication. Only when all worker nodes are ready, Distributed training will begin in earnest.

Based on the above principle, we need to execute the following statements in the two terminal Windows respectively to start the local distributed training. Note that TensorFlow must be installed in the system in advance. TensorFlow version 2.2.0 is used here (the latest stable version as of the end of this article).

python worker.py --index=0
python worker.py --index=1
Copy the code

After the distributed training, the following operations, such as model analysis and Serving, can be performed. Let me expand on the details of multi-node distributed training using Keras.

Preparation before training

There are three important preparations that need to be completed before multi-node distributed training based on Keras, they are model construction, TF_CONFIG environment variable definition and data preparation.

Model building

The model construction of distributed training is basically consistent with that of local training. In the example above, I used subclassed subclassing to build a Keras model with three fully connected layers. , of course, you can also use the Sequential or way to build the model of the Functional API, MultiWorkerMirroredStrategy strategy can be supported. More details on model building can be found in TensorFlow 2.x Model Building based on Keras, which will not be covered here.

After model construction is completed, we should first train the model locally to verify its correctness, and then apply it to distributed training after there are no problems.

TF_CONFIG definition

Because MultiWorkerMirroredStrategy is a data parallel distributed training strategy, so every worker node has all the parameters of the model, at the end of each batch of training, the worker between nodes need to communicate with each other to update the variable values of the model, This requires a way to define the communication address of each node so that a connection can be established between them.

The TF_CONFIG environment variable is used to solve the communication problem between nodes. It is a JSON-formatted string consisting of two parts, cluster and Task. The Cluster part defines a list of different roles. The value in the list is the GRPC address of each node. The Task part defines the role type and index information of the current node. Includes only the worker role in MultiWorkerMirroredStrategy strategy, so only need to define the worker role related information. The TF_CONFIG example is as follows:

{
  "cluster": {
    "worker": ["localhost:12345"."localhost:12346"]},"task": {
    "index": 0."type": "worker"}}Copy the code

When used in code, os.environ[“TF_CONFIG”] = json.dumps(TF_CONFIG) is assigned, and the TF_CONFIG information is automatically parsed by TensorFlow.

The cluster part of TF_CONFIG is consistent for all nodes in distributed training, while the Task part is different because each node has a different role and index. TensorFlow searches the GRPC address of the node in the role list based on the index and starts the GRPC service. Therefore, the value of the index should be in the range of [0, Len (role)-1].

In the worker role list, the node whose index is 0 is automatically regarded as the Chief node, which is responsible for other tasks besides training, such as saving Checkpoint files and recording Tensorboard logs.

In the example above, I only defined two local worker nodes to facilitate local distributed training tests. In real distributed training, we would typically create multiple nodes with different IP addresses and ports, and each node should have an appropriate TF_CONFIG value. We can either manually set the TF_CONFIG value of each node, or we can use some distributed training frameworks such as XLearning to assist us to complete this work.

Note TF_CONFIG environment variables in MultiWorkerMirroredStrategy instance creation will automatically be TensorFlow resolved to launch its GRPC service, Therefore, the value of TF_CONFIG must be set before the tF.distribute. Strategy instance is created to avoid distributed training startup failures.

Data preparation

Like local training, distributed training data can be either numpy or dataset type data. Generally, numPY data is used in testing, while in real training scenarios, considering the efficiency of data processing, data of dataset type is generally used as the input of model.

In the above example, I used the nP.random.seed (0) method to specify a random number seed in the data generating function prepare_data(), whose function is to make the data generated by each worker node at runtime be the same, so as to better simulate real training scenarios.

In general, data of a dataset type can be generated from NUMPY data using the from_tensor_slices() method, whereas in practice we read more from files, such as TFRecord files or TextLine files. Tf.data. TFRecordDataset and tf.Data. TextLineDataset there are two functions in the tf.data module that can be used to process the above two types of data files to generate dataset respectively, which are tF.data. TFRecordDataset and tf.Data. TextLineDataset. The basic usage methods are as follows:

dataset = tf.data.TFRecordDataset(["file1.tfrecords"."file2.tfrecords"])
dataset = tf.data.TextLineDataset(["file1.txt"."file2.txt"])
Copy the code

Generally speaking, the data files used in training are mostly generated after cleaning distributed tasks such as MapReduce. The number of files is related to the number of Mapper and Reducer tasks (there may be many), and they are finally distributed to HDFS and other distributed file systems.

TensorFlow’s Dataset API supports reading data directly from the HDFS path. However, we may only know the HDFS path but not the names of all data files in the path. Even if we know the HDFS path, It is also unlikely that all file names will be listed and passed as parameters to the TextLineDataset function.

To solve this problem, we can use the list_Files method of the Dataset to help us. The method retrieves the names of all files matching the specified pattern in the specified path based on the regular expression and returns them as the Dataset. Then the dataset can be used as the input parameter of the TextLineDataset to read the data in the file, as shown in the following example:

filenames = tf.data.Dataset.list_files(FLAGS.input + '/part-*')
dataset = tf.data.TextLineDataset(filenames, num_parallel_reads=4)
Copy the code

For the data read by using TextLineDataset, each row of data may contain multiple data values representing different features, so it generally needs to be segmented to meet the input requirements of model training. In this case, the map method of dataset can be used to complete data conversion.

The map method processes each row of data in the dataset and converts it to the desired data format. Assuming that each row in the data file contains 33 columns, separated by TAB, with the first 32 columns representing features and the last one representing labels, to convert the data into the input format available for the example above, use the following code:

def labeler(record) :
    fields = tf.io.decode_csv(
        record,
        record_defaults=['0.0'] * 33,
        field_delim='\t',
    )
    data = tf.strings.to_number(fields[0:32], out_type=tf.float32)
    label = tf.strings.to_number([fields[32]], out_type=tf.float32)
    return data, label

dataset = dataset.map(labeler)
Copy the code

Because the model in the example above is single-input, I consolidated 32 columns of different features as a single model input when I did the data transformation. For multi-input model, we need to separate multiple characteristics, make the data of each column has the only field to identify, at this time, we still can use the above method to segment each field and return a dataset dictionary, but the more convenient method is used directly below the Load CSV, the method of comparison, The latter is more straightforward and easier to understand. The sample code is as follows (see the official documentation for parameter descriptions) :

column_names = [str(i) for i in range(33)]
dataset = tf.data.experimental.make_csv_dataset(
    FLAGS.input + '/part-*',
    column_names=column_names,
    label_name='32'
    batch_size=64,
    field_delim='\t',
    header=False.)Copy the code

In addition, if you want to use MapReduce tasks to generate TFRecord files, you can use the TensorFlow-Hadoop function library provided by TensorFlow ecology to do this task. Similarly, the TFRecordDataset can use the list_Files () method to read data files in a specified directory.

For the division of training data and test data, if we will generate the data files on the two separate, so we directly when the data load to specify different input paths, such as training data from the “train” directory to load, and validate data from the validation directory load, which is more recommended data partitioning approach.

However, sometimes we may not separate the training and validation data, so we can use the following method as an alternative to achieve the same goal. Taking the training and verification data partition ratio of 80/20 as an example, its code is as follows:

def is_validation(x, y) :
    return x % 5= =0

def is_train(x, y) :
    return not is_validation(x, y)

recover = lambda x, y: y

filenames = tf.data.Dataset.list_files(FLAGS.input + '/part-*')
dataset = tf.data.TextLineDataset(filenames, num_parallel_reads=4)
train_dataset = dataset.enumerate().filter(is_train).map(recover)
validation_dataset = dataset.enumerate().filter(is_validation).map(recover)
Copy the code

Enumerate and filter methods of Dataset are used here. The principle is that enumerate method is used to number Dataset first, and filter method is used to screen training and verification data according to specified rules. Finally, the map method is used to restore the data to its original format.

However, it should be noted that during the training, each worker node will traverse all the data of the dataset, which will have a great impact on the performance of the training. Therefore, this data division method is not recommended in practical use.

Start training

With all the preparation done, we can start our distributed training. First we need to place the model building (model) and compiling (model.pile ()) processes under the distributed strategy strategy.scope(), which determines how and where model variables are created. We can then start the training by calling the model’s FIT method and specifying the data for the training.

The training process of each worker node is not significantly different from the local training process. However, as the overall training is distributed, some details are slightly different from the local training. In order to better grasp the distributed training process, we need to have a general understanding of these details.

Data sharding strategy

In multi-node distributed training, it is necessary to segment input data according to the number of worker nodes, which can ensure the convergence of the model and the efficiency of training, which is also in line with the concept of data parallelism mentioned in the previous article.

In the example above, we did not manually shard the data, but passed it directly to the FIT method for training, because TensorFlow automatically shards the input data across multiple Woker nodes without our manual intervention.

There are mainly two DATA sharding strategies in TensorFlow distributed training, FILE strategy and DATA strategy. The former refers to file-based sharding, while the latter refers to data-based sharding. They can be used as follows:

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Copy the code

By default, if not explicitly set, the policy is set to autoshardPolicy. AUTO, which means that the TensorFlow runtime automatically selects the appropriate data sharding policy.

In AUTO mode, TensorFlow first tries to shard files based on the FILE policy. If no files can be shelled, TensorFlow shards files based on the DATA policy. If a dataset is read from a FILE and generated using functions such as TFRecordDataset or TextLineDataset, it is shard based on the FILE policy. If there were other generation methods, such as from_tensor_slices(), it would choose to slice using the DATA policy.

When the FILE strategy is used for data segmentation, each worker node will evenly obtain part of all data files and process them. However, if a worker node is not allocated to at least one data FILE, the training program will report an error and the whole training process will be terminated accordingly. Therefore, when using a FILE as the data input source and selecting a FILE sharding strategy (either manually specified or automatically selected by TensorFlow), ensure that each worker node can be assigned at least one data FILE.

When the DATA strategy is used for DATA segmentation, TensorFlow will perform shard operation on the dataset. At this time, each worker node will traverse all the DATA, then discard the DATA that does not belong to it, and find the part of DATA in its charge for training. The shard operation is similar to enumerate + filter above. For dataset dataset, dataset. Shard (n, I) returns the index % n = I in dataset. Where n represents the number of worker nodes, and I represents the index of worker nodes.

It can be seen that compared with the FILE sharding strategy, the DATA sharding strategy has a great impact on the overall training performance because each worker node performs redundant DATA traversal. Therefore, it is better to FILE the input data in advance and use the file-based data sharding strategy for distributed training.

For dataset type DATA generated from text files, we can specify the DATA sharding policy as DATA (generally used when we cannot ensure that each worker node will be allocated to at least one file), but for dataset type DATA generated from non-text files, We can’t specify the data sharding policy as FILE, and TensorFlow will report an error when it runs because it can’t find a FILE to shard.

If you do not want to use the above data sharding policy, you can also disable automatic data sharding by setting the policy to Autoshardpolicy. OFF. At this point, each worker node will receive all the data, and how to process the data is completely up to you.

In addition, for each worker node, if there is more than one device on the node, such as multiple Gpus, the data of the node will also be automatically sharpened among these Gpus. Even if you have disabled the data sharpened policy between worker nodes, the data sharpened between multiple devices on the same node will not be affected.

Note that the data sharding strategy can only be manually specified for data of the dataset type. Numpy DATA is automatically shard by TensorFlow, and it can only use the DATA policy.

Because cross-validation is also distributed across multiple nodes, validation data is shelled using the same strategy as training data, although different shard strategies can be specified in the manner described above. Currently, the model.predict() operation does not support distributed running, so if this method is called during distributed training, the worker will report an error.

Data batch processing

In the paper “Local Training and Evaluation of Keras model”, we have introduced the processing mode of numpy and dataset type data in the local training of the model. However, the processing mode is slightly different in the distributed training, which is mainly reflected in some Settings related to batch processing.

In the above example, numpy type data is used as the input of the model. When setting BATCH_SIZE, I first set the batch size of each worker node (BATCH_SIZE_PER_REPLICA). Then multiply by the number of worker nodes to get the global batch size (BATCH_SIZE). This setting is because the effective batch size of each worker node in distributed training is the global batch size/node number, that is to say, the actual batch size of each worker node in distributed training is BATCH_SIZE_PER_REPLICA. First, set an appropriate BATCH_SIZE_PER_REPLICA for each worker, which can avoid the problem that the calculation efficiency of each worker node is not high due to improper setting of global BATCH_SIZE, thus affecting the overall distributed training performance.

Similarly, if data of type DATASET is used as input, BATCH_SIZE should be set in the same way.

In the use of the dataset types of data as the training of the model input, because the MultiWorkerMirroredStrategy strategy temporarily does not support the last incomplete batch processing, Therefore, the steps_per_EPOCH parameter must be specified to limit the number of steps performed in each training round. If this parameter is not specified, the training program will report errors during execution, which needs to be noted.

After specifying the steps_per_EPOCH parameter, TensorFlow will continue to read data from the dataset location at the end of the previous training round and train until the end of the training process at the beginning of each epoch training round. Therefore, it is best to use the.repeat() method to copy the training data to ensure that there is enough data to complete N rounds of training.

If the training data is not replicated, then the size of steps_per_epoch should be set according to the actual training data amount of each worker and the size of BATCH_SIZE_PER_REPLICA. For distributed training with imbalanced data segmentation among worker nodes, The steps_per_EPOCH parameter should be set based on the worker node with the minimum amount of data to avoid the impact of the worker’s overall training due to data exhaustion.

The same is true for validation data of the dataset type. The validation_steps parameter must be specified, and its size must be set according to the validation data volume and BATCH_SIZE. Different from training operations, verification operations use the same verification data in each rotation to ensure the fairness and validity of verification results. Although validation data of the dataset type does not need to be repeated, the repeat method is officially recommended to avoid data exhaustion caused by improper validation_steps Settings.

Fault-tolerant processing

In the synchronous distributed training, if there is no fault tolerance mechanism, the whole distributed training task will fail when a worker node breaks down or resources are occupied by other tasks. Fortunately, TensorFlow provides fault tolerance for kerAS-based distributed training. By saving the training state of nodes in a file, the whole cluster can be restored to the previous training state after the worker node fails to restart, thus ensuring that the training can continue.

To use the fault tolerance mechanism of Keras multi-node distributed training, you need to use the ModelCheckpoint callback function. This callback saves the model’s training state and checkpoint files to the specified file directory. The specific usage is as follows:

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=os.path.join(FLAGS.logs, "checkpoints"."ckpt"))

model.fit(
    train_dataset,
    epochs=5,
    steps_per_epoch=10,
    validation_data=validation_dataset,
    validation_steps=5,
    callbacks=[tensorboard_callback, checkpoint_callback],
)
Copy the code

Because it is a synchronous training mode, during the restart of the failed node, all nodes participating in the distributed training will stop training until the restart of the failed node is complete. After the failed node is added to the cluster again, other nodes will restart, and all nodes will restore the training status from the checkpoint file at the same time, so that the synchronization between nodes is restored and training can continue as usual.

In the newly generated checkpoint directory, we provide some temporary checkpoint files that are used during node recovery. After the training is complete, these files are deleted automatically.

Path to save the model file

In multi-node distributed training, when you save the model using model.save() or tf.saved_model.save(), you might expect the Chief node to do it alone, but that’s not the case. All worker nodes participating in the training will perform the model saving operation. If the model storage path of each worker node is the same, all worker nodes will try to write files to this location when saving the model, resulting in race and some strange errors.

The correct approach is to provide different storage paths for each worker node, for example, worker index can be used as part of the storage path of each node model. In fact, the model files saved by all worker nodes are the same, but usually we choose to use the model file saved by the Chief node to reconstruct the model and serve the model. We should also set up some cleanup logic to automatically clean up model files saved by non-Chief nodes after the training is complete.

For callbacks such as Checkpoint and Tensorboard, we can set only a global path, because TensorFlow automatically handles file paths for us. In the actual distributed training, TensorFlow will automatically create a temporary directory for each worker node under the global directory to store their respective information, and will automatically clear the temporary directory after the training is completed. Finally, only the files related to the Chief node are kept (you can watch for changes in file structure in related directories during training). This is also done because if more than one node writes to a directory at the same time, some read/write errors may occur.

Q&A

During multi-node distributed training, the following problems may be encountered. Here are some solutions:

  1. RuntimeError: Collective OPS must be configured at program startup.

    Solution: the runtime error description in the distributed strategy instance (here for MultiWorkerMirroredStrategy) creation and initialization before, there are other TensorFlow node is created, such as create the dataset type of input ahead of time. The solution is to move the creation of other TensorFlow nodes until after the distributed policy is initialized.

  2. Problem: ValueError: When dataset is sharded across workers, please specify a reasonable steps_per_epoch such that all workers will train the same number of steps and each step can get data from dataset without EOF. This is required for allreduce to succeed. We will handle the last partial batch in the future.

    Solution: For input data of dataset type, steps_per_EPOCH parameter of training data in fit method and validation_steps parameter of validation data need to be specified. Otherwise, training error will be reported. The value of STEps_per_EPOCH can be roughly estimated based on the overall data volume.

  3. WARNING: Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least steps_per_epoch * epochs batches (in this case, 1000 batches). You may need to use the repeat() function when building your dataset.

    UnboundLocalError: local variable 'logs' referenced before assignment.

    Solution: This problem indicates that the training data or validation data has been exhausted and the training cannot continue. At this point, you can perform the repeat operation on the training data and validation data dataset to resolve the problem.

  4. Q: Why are the loss and metrics values printed in the training logs of multiple worker nodes exactly the same?

    Answer: At the end of each batch training, the model variable values of each worker node will be updated synchronously, while the log printing is after the completion of the variable update, so the loss value and metrics value printed by each batch will be the same.

  5. Question: How to tune the performance of distributed training?

    Solution: The performance of distributed training is directly related to the speed of variable updating between nodes. You can choose a suitable communication strategy such as Ring AllReduce or NCCL first. If the network bandwidth between nodes becomes a bottleneck, upgrade the NIC to a 10ge NIC. In addition, it is recommended that all variables in the model be converted to tF.FLOAT32, which may improve the efficiency of calculation.

The resources

  1. Multi-worker training with Keras
  2. MapReduce TFRecords OutputFormat
  3. Build TensorFlow input pipelines
  4. tf.data.Dataset
  5. tf.data.TextLineDataset
  6. Load CSV data
  7. tf.data.experimental.make_csv_dataset
  8. tf.data.Options
  9. tf.data.experimental.DistributeOptions
  10. tf.data.experimental.AutoShardPolicy
  11. tf.distribute.Strategy