Kashif Rasul, Research scientist, Zalando Research

Source | TensorFlow public number

Like most AI departments, Zalando Research recognizes the importance of experimenting with ideas and rapid prototyping. As data sets get bigger and bigger, it becomes useful to understand how to train deep learning models efficiently and quickly with the shared resources we have.

TensorFlow’s estimator API is useful for training models with multiple Gpus in a distributed environment. This article focuses on this workflow. We first train a custom estimator written in TF.keras using a fashion-Mnist small data set, and then conclude with a more practical use case.

Note: The TensorFlow team has been working on another cool new feature (still in the Master stage as I write this) that allows you to train the TF.Keras model with just a few more lines of code, without having to convert the model into an estimator first! The workflow is awesome. Let me focus on the estimator API. Which one you choose is up to you! Note: Functional links github.com/tensorflow/…

TL; DR: Basically, we need to remember that for the TF.keras. Model, we just convert it to tF.estimator.model_to_estimator, You can use the TF.Estimator API for training. Once the transformation is complete, we can use the mechanism provided by the estimator to train the model with different hardware configurations.

You can download the code in this article from your laptop and run it yourself. Note: Notebook links github.com/kashif/tf-k…


import os
import time

#! pip install -q -U tensorflow-gpu
import tensorflow as tf

import numpy as np
Copy the code

Import the Fashion-MNIST data set

Let’s replace MNIST with the fashion-Mnist dataset, which contains thousands of grayscale images of Zalando Fashion articles. Getting training and test data is as simple as this:

(train_images, train_labels), (test_images, test_labels) = 
   tf.keras.datasets.fashion_mnist.load_data()
Copy the code

We want to convert the pixel values of these images from a number between 0 and 255 to a number between 0 and 1, and convert the data set to [B, H, W,C] format, where B represents the number of images processed in the batch, H and W are height and width respectively, C is the number of channels in our data set (gray level is 1) :

TRAINING_SIZE = len(train_images)
TEST_SIZE = len(test_images)

train_images = np.asarray(train_images, dtype=np.float32) / 255
# Convert the train images and add channels
train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1))

test_images = np.asarray(test_images, dtype=np.float32) / 255
# Convert the test images and add channels
test_images = test_images.reshape((TEST_SIZE, 28, 28, 1))
Copy the code

Next, we want to convert the label from an integer number (for example, 2 or a jumper) to a unique thermal code (for example, 0,0,1,0,0,0,0,0,0,0,0,0). To do this, we use tf.keras.utils.to_categorical:

# How many categories we are predicting from (0-9)
LABEL_DIMENSIONS = 10

train_labels = tf.keras.utils.to_categorical(train_labels, 
                                            LABEL_DIMENSIONS)

test_labels = tf.keras.utils.to_categorical(test_labels,
                                           LABEL_DIMENSIONS)

# Cast the labels to floats, needed later
train_labels = train_labels.astype(np.float32)
test_labels = test_labels.astype(np.float32)
Copy the code

Build the TF.Keras model

We will use the Keras function API to create the neural network. Keras is a high-level API for building and training deep learning models with a modular design that is easy to use and easy to extend. Tf.keras is TensorFlow’s implementation of this API with support for Eager Execution, tF.Data pipelines, and estimators.

For architecture, we will use ConvNet. A very general statement is that ConvNet is a stack of the convolution layer (Conv2D) and the pooling layer (MaxPooling2D). But most importantly, ConvNet treats each training example as a 3D shape tensor (height, width, channel), which for grayscale images starts with channel = 1 and returns a 3D tensor.

So, after the ConvNet part, we need to flatten the tensor and add dense layers, where the last one returns a vector of LABEL_DIMENSIONS size, with tF.nn.softmax activation:

Inputs = tf. Keras. Input (shape = (28,28,1))# Returns a placeholder

x = tf.keras.layers.Conv2D(filters=32, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(inputs)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(x)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(x)

x = tf.keras.layers.Flatten()(x)

x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)
predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS,
                                   activation=tf.nn.softmax)(x)
Copy the code

Now that we can define our learning model, select the optimizer (we chose one from TensorFlow instead of using optimizers from TF.keras.Optimizers) and compile:

model = tf.keras.Model(inputs=inputs, Outputs =predictions) Optimizer = tf.train.adamoptimizer (Learning_rate =0.001) model.compile(Loss ='categorical_crossentropy',
             optimizer=optimizer,
             metrics=['accuracy'])
Copy the code

Creating an estimator

Use the compiled Keras model to create the estimator, which we call the model_to_estimator method. Note that the initial model state of the Keras model is stored in the estimator created.

So what are the advantages of an estimator? Firstly, the following points should be mentioned:

You can run estimator based models in a localhost or distributed multi-GPU environment without changing your model; Estimators simplify shared implementations among model developers; The estimator builds the graph for you, so it’s a bit like Eager Execution, with no explicit session.

So how do we train a simple TF.Keras model to use multiple Gpus? We can use the tf. Contrib. Distribute. MirroredStrategy paradigm, through the synchronous training replication in graphics. For more information on this strategy, watch the Distributed TensorFlow training lecture. Note: Distributed TensorFlow links www.youtube.com/watch?v=bRM…

Basically, each worker GPU has a copy of the network, takes a subset of the data, calculates the local gradient, and waits for all the workers to end synchronously. Workers then pass their local gradients to each other through Ring all-reduce operations, which are often optimized to reduce network bandwidth and increase throughput. After all gradients have arrived, each worker calculates its average and updates the parameters before moving on to the next step. Ideally, you have multiple high-speed interconnected Gpus on a single node.

To use this strategy, we first created an estimator with the compiled TF.Keras model and then assigned its MirroredStrategy configuration through RunConfig Config. By default, this configuration uses all Gpus, but you can also give it a num_gPUS option to use a specific number of Gpus:

NUM_GPUS = 2

strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)

estimator = tf.keras.estimator.model_to_estimator(model,
                                                 config=config)
Copy the code

Create the estimator input function

To pipe data to the estimator, we need to define a data import function that returns a tF.data dataset (image, label) of bulk data. The following function takes the NUMPY array and returns the data set through the ETL procedure.

Note that finally we also called the prefetch method, which buffered the data to the GPU during training so that the next batch of data was ready and waiting for the GPU, rather than making the GPU wait for the data during each iteration. The GPU may still be underutilized, and to improve this we can use a converged version of the transformation operation (such as shuffle_AND_repeat) instead of two separate operations. However, I have chosen a simple use case here.

def input_fn(images, labels, epochs, batch_size):
  
     # Convert the inputs to a Dataset. (E)
    ds = tf.data.Dataset.from_tensor_slices((images, labels))    

    # Shuffle, repeat, and batch the examples. (T)
    SHUFFLE_SIZE = 5000
    ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size)
    ds = ds.prefetch(2)    
    
    # Return the dataset. (L)
    return ds
Copy the code

Training estimator

First, we define a SessionRunHook class to record the number of iterations of the STOCHASTIC gradient descent method:

class TimeHistory(tf.train.SessionRunHook):
    def begin(self):
       self.times = []    

    def before_run(self, run_context):
       self.iter_time_start = time.time()    

    def after_run(self, run_context, run_values):
       self.times.append(time.time() - self.iter_time_start)
Copy the code

Here’s the highlight! We can call the train function on the estimator and give it an instance of input_FN (which contains the batch size and the number of training sessions we want) and TimeHistory that we defined, via the hooks argument:

time_hist = TimeHistory()

BATCH_SIZE = 512
EPOCHS = 5

estimator.train(lambda:input_fn(train_images,
                               train_labels,
                               epochs=EPOCHS,
                               batch_size=BATCH_SIZE),
               hooks=[time_hist])
Copy the code

performance

Now we can use the time hook to calculate the total time of training and the average number of images trained per second (average throughput) :

total_time = sum(time_hist.times)
print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds")

avg_time_per_batch = np.mean(time_hist.times)
print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with {NUM_GPUS} GPU(s)")
Copy the code

Fashion MNIST training throughput and total time when using two K80 Gpus for training, using different NUM_GPUS, showing poor scaling

Evaluation estimator

To verify the performance of the model, we call the evaluation method on the estimator:

estimator.evaluate(lambda:input_fn(test_images, 
                                  test_labels,
                                  epochs=1,
                                  batch_size=BATCH_SIZE))
Copy the code

Examples of OCT (optical coherence tomography) images of the retina

To test the scaling performance of the model when dealing with larger data sets, we used the retinal OCT image dataset, one of Kaggle’s many large data sets. The dataset consists of cross-sectional X-ray images of the living retina divided into four categories: NORMAL, CNV, DME and DRUSEN:

The representative image of optical coherence tomography, From Identifying Medical Diagnoses and Treatable Diseases by Image-based Deep by Kermany et al Learning)

The dataset consists of 84,495 X-ray images in JPEG format, most of which are 512×496 in size and can be downloaded via Kaggle CLI: Note: CLI link github.com/Kaggle/kagg…

#! pip install kaggle
#! kaggle datasets download -d paultimothymooney/kermany2018
Copy the code

After downloading, the training set and test set image classes are in their respective folders, so we can define the pattern as:

labels = ['CNV'.'DME'.'DRUSEN'.'NORMAL']

train_folder = os.path.join('OCT2017'.'train'.'* *'.'*.jpeg')
test_folder = os.path.join('OCT2017'.'test'.'* *'.'*.jpeg')
Copy the code

Next, we’ll write an estimator input function that can extract any file schema and return a scaled image and a single heat-encoded label as tf.data.dataset. This time, we follow the best practices in the input pipeline performance guide. Please pay special attention to, if the prefetch buffer_size to None, TensorFlow will automatically use the optimal pre-reading take buffer size: note: the input pipe performance guide link www.tensorflow.org/performance…

1    def input_fn(file_pattern, labels, 
2                        image_size=(224,224), 
3                        shuffle=False,
4                        batch_size=64, 
5                        num_epochs=None,
6                        buffer_size=4096,
7                        prefetch_buffer_size=None): 
8
9            table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels))
10          num_classes = len(labels) 
11
12          def _map_func(filename):
13                label = tf.string_split([filename], delimiter=os.sep).values[-2]
14                image = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
15                image = tf.image.convert_image_dtype(image, dtype=tf.float32) 
16                image = tf.image.resize_images(image, size=image_size)
17                return (image, tf.one_hot(table.lookup(label), num_classes))
18
19          dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)
20
21          if num_epochs is not None and shuffle:
22                dataset = dataset.apply(
23                    tf.contrib.data.shuffle_and_repeat(buffer_size, num_epochs))
24          elif shuffle:
25                dataset = dataset.shuffle(buffer_size)
26          elif num_epochs is not None:
27                dataset = dataset.repeat(num_epochs)
28
29          dataset = dataset.apply(
30                tf.contrib.data.map_and_batch(map_func=_map_func,
31                                        batch_size=batch_size,
32                                        num_parallel_calls=os.cpu_count()))
33          dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)
34
35          return dataset 
Copy the code

To train the model this time, we will use a pre-trained VGG16 and retrain only the last 5 layers:

Keras_vgg16 = tf. Keras. Applications. VGG16 (input_shape = (224224, 3), include_top=False) output = keras_vgg16.output output = tf.keras.layers.Flatten()(output) prediction = tf.keras.layers.Dense(len(labels), activation=tf.nn.softmax)(output) model = tf.keras.Model(inputs=keras_vgg16.input, outputs=prediction)for layer in keras_vgg16.layers[:-4]:
   layer.trainable = False
Copy the code

Now we have everything we need to follow the steps above and train our model in minutes using the NUM_GPUS GPU:

model.compile(loss='categorical_crossentropy',               optimizer=tf.train.AdamOptimizer(),              metrics=['accuracy'])

NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model,                                                  config=config)
BATCH_SIZE = 64
EPOCHS = 1

estimator.train(input_fn=lambda:input_fn(train_folder,                                         labels,                                         shuffle=True,                                         batch_size=BATCH_SIZE,                                         buffer_size=2048,                                         num_epochs=EPOCHS,                                         prefetch_buffer_size=4),                hooks=[time_hist])
Copy the code

After the training, we can assess the accuracy of the test set, which should be around 95% (not bad for the initial baseline) :

estimator.evaluate(input_fn=lambda:input_fn(test_folder,
                                           labels, 
                                           shuffle=False,
                                           batch_size=BATCH_SIZE,
                                           buffer_size=1024,
                                           num_epochs=1))
Copy the code

The fashion-Mnist training throughput and total time when two K80 Gpus are used for training, and different NUM_GPUS are used to display linear scaling

conclusion

We’ve covered how to easily train the Keras deep learning model on multiple Gpus using the estimator API, how to write input pipes that follow best practices to make the most of our resources (linear scaling), and how to time our training throughput with hooks.

It is important to note that in the end we are primarily concerned with test set errors. You may notice that the accuracy of the test set decreases as the NUM_GPUS value increases. One reason might have been that MirroredStrategy was pretty effective at training models when BATCH_SIZE*NUM_GPUS batch sizes, while when we increased the number of Gpus we probably needed to adjust BATCH_SIZE or MirroredStrategy’s learning rate. All hyperparameters except NUM_GPUS are left unchanged for mapping purposes, but we actually need to adjust these hyperparameters.

The size of the data set and model also affects the scaling effect of these schemes. The GPU has poor bandwidth when reading or writing small data, especially if it is an older GPU (such as K80), and may result in the situation shown in the above Fashion MNIST figure.

Thank you

Thanks to the TensorFlow team, especially Josh Gordon, and colleagues at Zalando Research, especially Duncan Blythe, Gokhan Yildirim and Sebastian Heinz, Thank them for helping with the draft.