Author: Ye Hu

Editor: Zhao Yifan

Before the speech

When training model, we first have to deal with is the load of training data and pretreatment, here has called the process for the input line (input pipelines, or input pipe, [reference: www.tensorflow.org/performance…

  1. Extract: To read data from a storage medium, such as a hard disk, either locally or remotely (such as in a distributed storage system, HDFS)

  2. Transform: Use CPU processor to parse and preprocess extracted data, such as image decompression, data amplification or transformation, and then do random shuffle and form Batch.

  3. Load: Preprocessed data is loaded into an acceleration device (such as a GPUs) to perform training of the model.

The input pipeline is very important to speed up model training. If your CPU’s data processing ability can’t keep up with the GPU’s processing speed, then the CPU’s pre-processing of data will become the bottleneck of training model. In addition, the input pipeline itself has a lot of optimization. For example, in a typical model training process, the GPU is idle when the CPU preprocesses data, and the CPU is idle when the GPU trains the model, as shown below:

The time spent in such a training step is the sum of CPU preprocessing data and GPU training model time. Obviously, there is a waste of resources in this process. An improved method is to cross the two processes of CPU data processing and GPU model training. When GPU is in the first training phase, the CPU is preparing the data required for step N+1, as shown in the figure below:

Obviously, the above design can make full use of CPU and GPU, thus reducing idle resources. In addition, when there are multiple CPU cores, the parallelization technology of CPU (multithreading) is involved to speed up the data preprocessing process, because the preprocessing process of each training sample is often independent of each other. Can refer to optimization of input flow line TensorFlow website of Pipeline Performance Guide (www.tensorflow.org/performance…

Fortunately, the latest TensorFlow release provides a set of APIs called Tf.data to help us quickly implement efficient and flexible input pipelines. The most common way to load training data in TensorFlow is through Feeding (www.tensorflow.org/api\_guides…

Here we use mnIST example to explain how to use Tf.data to establish a concise and efficient input pipeline. Before introducing, we first introduce how to make TFRecords file, which is a standard file format supported by TensorFlow

1

Make TFRecords file

TFRecords file is a standard data format in TensorFlow, which is a binary file based on protobuf. The basic element of each TFRecord file is tf.train.Example, which corresponds to a sample data in the data set. Each Example contains Features and stores each feature of the sample. Each feature contains a key-value pair corresponding to the feature name and actual value of the feature. Here is an Example:

// An Example for a movie recommendation application: features { feature { key: "age" value { float_list { value: 29.0}}} feature {key: "movie" value {bytes_list {value: "The Shawshank Redemption" value: "Fight Club"}} feature {key: "movie_ratings" value {float_list {value: 9.0 value: 9.7}} feature {key: "suggestion" value { bytes_list { value: "Inception" }} } feature { key: "Suggestion_purchased" value {float_list {value: 1.0}}} feature {key: "purchase_price" value {float_list {value: 9.99}}}}Copy the code

The above is a sample of a movie recommendation system. It can be seen that it contains 6 features, each of which is of key-value type. Key is the feature name and value is the feature value. Bytes_list, float_LIST, and INT64_list, which store byte, float, and integer types respectively (see here: github.com/tensorflow/…

As a standard data format, TensorFlow of course provides a Python interface for creating TFRecords files. Let’s create TFRecords files corresponding to mnIST datasets. For MNIST data set, each Example needs to store two features, one is the pixel value of the image, which can be used as bytes, because one pixel point can be stored in one byte, and the other is the label value of the image, which can only be stored in int64 type. Therefore, we define these two types of interface functions first:

# int64 def _int64_feature(value): return tf.train.Feature(int64_list=tf.train.Int64List(value=[value])) # bytes def _bytes_feature(value): Return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) **tf.python\_io.T****FRecordWriter** function in TF to achieve, the specific code is as follows:Copy the code

def convert_to_TFRecords(dataset, name):    “””Convert mnist dataset to TFRecords”””    images, labels = dataset.images, dataset.labels    n_examples = dataset.num_examples

   filename = os.path.join(DIR, name + “.tfrecords”)    print(“Writing”, filename)    with tf.python_io.TFRecordWriter(filename) as writer:            for index in range(n_examples):            image_bytes = images[index].tostring()            label = labels[index]            example = tf.train.Example(features=tf.train.Features(                feature={“image”: _bytes_feature(image_bytes),                         “label”: _int64_feature(label)}))            writer.write(example.SerializeToString())

For mnIST data set, it is mainly divided into train, Validation, and test. Use the above function to create three different TFRecords files respectively:Copy the code

mnist_datasets = mnist.read_data_sets(“mnist_data”, dtype=tf.uint8, reshape=False) convert_to_TFRecords(mnist_datasets.train, “train”) convert_to_TFRecords(mnist_datasets.validation, “validation”) convert_to_TFRecords(mnist_datasets.test, “test”)

Ok, so we have created three TFRecords files. We created the TFRecords file above, but how to read them, of course TF provides the interface function to read the TFRecords file, Tf.python \_io. Tf._record \_iterator**, which inputs the TFRecord file but returns an iterator. Each element is an Example, but it is a string, which can be parsed with **tf.train.Example** :Copy the code

def read_TFRecords_test(name):    filename = os.path.join(DIR, name + “.tfrecords”)    record_itr = tf.python_io.tf_record_iterator(path=filename)    for r in record_itr:        example = tf.train.Example()        example.ParseFromString(r)

label = example.features.feature[“label”].int64_list.value[0] print(“Label”, label) image_bytes = example.features.feature[“image”].bytes_list.value[0] img = np.fromstring(image_bytes, New (28, 28) print(img) plt.imshow(img, cmap=”gray”) plt.show() break # 0

The above is pure Python reading, which is not the correct way to use the TFRecords file. Since it is an official standard data format, TF also provides a way to set up an input pipeline using TFRecords files. Before tf.data**, it was **QueueRunner**, the file queuing mechanism, which works as shown below:! [images] (https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/99d1c519af5c4fb4b2d569255934a515~tplv-k3u1fbpfcp-zoom-1.image) The file queue mechanism is divided into two phases: The first stage scrambles the input file and enlists it in the file queue, then reads a file from the file queue and the file queue enlists the file, then decodes the file at the same time, then produces a sample of data and enlists the sample in the sample queue, You can define multiple ** readers ** to read data concurrently from multiple files. A certain amount of sample data from the sample queue can be used in a training process. TF provides the accompanying API to accomplish this process, noting that the input pipeline is directly embedded in the trained Graph, which is part of the entire Graph model. For TFRecord files, you can use ** tf.tfRecordreader **. The following is the specific implementation code:Copy the code

def read_example(filename_queue):    “””Read one example from filename_queue”””    reader = tf.TFRecordReader()    key, value = reader.read(filename_queue)    features = tf.parse_single_example(value, features={“image”: tf.FixedLenFeature([], tf.string),                                                            “label”: tf.FixedLenFeature([], tf.int64)})    image = tf.decode_raw(features[“image”], tf.uint8)    image = tf.reshape(image, [28, 28])    label = tf.cast(features[“label”], tf.int32)    return image, label     if name == “main“:    queue = tf.train.string_input_producer([“TFRecords/train.tfrecords”], num_epochs=10)    image, label = read_example(queue)

   img_batch, label_batch = tf.train.shuffle_batch([image, label], batch_size=32, capacity=5000,                                                        min_after_dequeue=2000, num_threads=4)    with tf.Session() as sess:        sess.run(tf.local_variables_initializer())        sess.run(tf.global_variables_initializer())

       coord = tf.train.Coordinator()        threads = tf.train.start_queue_runners(sess=sess, coord=coord)        try:                while not coord.should_stop():                # Run training steps or whatever                images, labels = sess.run([img_batch, label_batch])                print(images.shape, labels.shape)        except tf.errors.OutOfRangeError:            print(‘Done training — epoch limit reached’)

       coord.request_stop()        coord.join(threads)

As for the queue mechanism, the code is really confusing, but in fact, as long as the official provided standard code, it is easy to modify your own data set. Now, however, with **tf.data**, the above process can be implemented more elegantly. Using **tf.data** makes it easier to create efficient input pipelines, but it is friendlier than the queue mechanism API, mainly because **tf.data** provides high-level abstractions. The first abstraction is to use **tf.data.Dataset** to represent aset of data. Each element in the set has one or more Tensor, which normally corresponds to a training sample. Iterator** is an Iterator object that produces a sample from the **Dataset** by ** iterator.get \_next()**. Using these two abstractions, the **Dataset** can be simplified into three steps: 1. Create **Dataset** instance object; 2. Create a **Iterator** instance object that iterates the **Dataset**; 3. Continuously generate samples from **Iterator** and send them into the model for training. ** TF provides many ways to create a Dataset. Here are some of the ways:Copy the code

From Numpy’s arraydataset1 = tf.data.dataset. From_tensor_slices (np.random.randn((5, 10))

Print (dataset1.output_types) # ==> “tf.float32″print(dataset1.output_shapes) # ==> “(10,)”# from Tensor

dataset2 = tf.data.Dataset.from_tensor_slices((tf.random_uniform([4]), tf.random_uniform([4, 100], maxval=100, dtype=tf.int32))) print(dataset2.output_types) # ==> “(tf.float32, tf.int32)”print(dataset2.output_shapes) # ==> “((), (100,))”# filenames = [“/var/data/ file1.tfRecord “, “/var/data/file2.tfrecord”] dataset3 = tf.data.TFRecordDataset(filenames)

More importantly, **Dataset** can perform a series of transformations and support chain calls, which are important for data preprocessing:Copy the code

dataset = tf.data.TFRecordDataset(filenames) dataset = dataset.map(…) Parse data or preprocess data, such as normalize.

Dataset = dataset. Repeat () # dataset = dataset

Batch (32) # batch = dataset

After the Dataset is created, we need to create the Iterator to iterate over the Dataset. The Iterator object is returned and can be used to generate data for model training. TF supports four iterator types: **one-shot**, **initializable**, ** Reinitializable **, and **feedable**. Let's look at them one by one. **One-shot Iterator** This is the simplest Iterator that iterates through the entire data set only once and does not require display initialization. Here is an example:Copy the code

dataset = tf.data.Dataset.from_tensor_slices(np.arange(10)) iterator = dataset.make_one_shot_iterator() next_element = iterator.get_next() with tf.Session() as sess: for i in range(10): sess.run(next_element) # 0, 1, … 9,

**Initializable Iterator** Compared to one-shot Iterator, it needs to display initialization before use, so that it can support parameterization. Each time a different parameter is sent to initialization, it can support simple parameterization of data sets.Copy the code

max_value = tf.placeholder(tf.int64, []) dataset = tf.data.Dataset.range(max_value) iterator = dataset.make_initializable_iterator() next_element = Iterator.get_next () with tf.session () as sess: # Command to initialize sess. Run (iterator.initializer, feed_dict={max_value: 10}) for i in range(10): print(sess.run(next_element)) # 0, 1, … 9,

**Reinitializable Iterator** Compared to initializable Iterator, it can be initialized from different datasets. sometimes you need a training set and a test set, but they are not the same, then you can define two different datasets. Reinitializable Iterator** Reinitializable Iterator can be initialized from different datasets. Reinitializable Iterator And with the reinitializable Iterator to define a general Iterator, before using only need to input different Dataset initialization, the following is an example:Copy the code

train_data = np.random.randn(100, 5) test_data = np.random.randn(20, 5) train_dataset = tf.data.Dataset.from_tensor_slices(train_data) test_dataset = Tf.data.dataset. From_tensor_slices (test_data)# Create a Reinitializable iterator

re_iterator = tf.data.Iterator.from_structure(train_dataset.output_types, train_dataset.output_shapes) next_element = re_iterator.get_next() train_init_op = re_iterator.make_initializer(train_dataset) test_init_op = re_iterator.make_initializer(test_dataset) with tf.Session() Sess. Run (train_init_op) for j in range(100): Print (sess.run(next_element)) # Test sess.run(test_init_op) for I in range(20): print(sess.run(next_element))

**Feedable Iterator** for the reinitializable Iterator, it can be fed to different datasets to complete the Dataset switch, but must be reinitialized each time switch. For Feedable iterators, it can be thought of as supporting feeding into different iterators, switching between different sets of data by toggling the Iterator's string handle, and preserving the Iterator's state during the switch. This is more flexible than the Reinitializable iterator. Here is an example:Copy the code

train_data = np.random.randn(100, 5) val_data = np.random.randn(20, 5) n_epochs = 20train_dataset = tf.data.Dataset.from_tensor_slices(train_data).repeat(n_epochs) val_dataset = Tf.data.dataset. From_tensor_slices (val_data)# Create a feedable iterator

handle = tf.placeholder(tf.string, []) feed_iterator = tf.data.Iterator.from_string_handle(handle, Train_dataset. Output_types, train_dataset. Output_shapes) next_element = feed_iterator.get_next()# create different iterators

train_iterator = train_dataset.make_one_shot_iterator() val_iterator = val_dataset.make_initializable_iterator() with tf.Session() as sess: Train_handle = sess.run(train_iterator.string_handle()) val_handle = Sess.run (val_iterator.string_handle()) # train for n in range(n_epochs): for I in range(100): Print (I, sess.run(next_element, feed_dict={handle: train_handle})) sess.run(val_iterator.initializer) for i in range(20): print(sess.run(next_element, feed_dict={handle: val_handle}))

Basic knowledge about tf. The data is so much, more content can refer to the official document, in addition to say here is that for an iterator object, after its elements exhausted, throws tf. Errors. OutOfRangeError error, under normal circumstances, of course, you know your own iterator object of prime Numbers, Then you don't have to catch the error to implement the termination condition. Next, we will use TF.data to realize the complete training process of MNIST. **4** MNIST complete instance We use feedable Iterator to implement the training process of MNIST Dataset. Two datasets are created, one for training set and the other for verification set. Shuffle operation is not required for verification set. First, we created the auxiliary function of the Dataset object, which mainly parsed the TFRecords file and normalized the image:Copy the code

def decode(serialized_example): “””decode the serialized example””” features = tf.parse_single_example(serialized_example, features={“image”: tf.FixedLenFeature([], tf.string), “label”: tf.FixedLenFeature([], tf.int64)}) image = tf.decode_raw(features[“image”], tf.uint8) image = tf.cast(image, tf.float32) image = tf.reshape(image, [784]) label = tf.cast(features[“label”], tf.int64) return image, label def normalize(image, label): Normalize the image to [-0.5, 0.5]”” image = image / 255.0-0.5 return image, label

Then define the function to create the Dataset. For the training set and validation set, the parameters will be different:Copy the code

def create_dataset(filename, batch_size=64, is_shuffle=False, n_repeats=0):    “””create dataset for train and validation dataset”””    dataset = tf.data.TFRecordDataset(filename)    if n_repeats > 0:        dataset = dataset.repeat(n_repeats) # for train

   dataset = dataset.map(decode).map(normalize) # decode and normalize

   if is_shuffle:        dataset = dataset.shuffle(1000 + 3 * batch_size) # shuffle    dataset = dataset.batch(batch_size)    return dataset

We use a simple full-connection layer network to implement mnIST classification model:Copy the code

def model(inputs, hidden_sizes=(500, 500)):    h1, h2 = hidden_sizes    net = tf.layers.dense(inputs, h1, activation=tf.nn.relu)    net = tf.layers.dense(net, h2, activation=tf.nn.relu)    net = tf.layers.dense(net, 10, activation=None)    return net

Then there is the body code for the training:Copy the code

n_train_examples = 55000n_val_examples = 5000n_epochs = 50batch_size = 64train_dataset = create_dataset(“TFRecords/train.tfrecords”, batch_size=batch_size, is_shuffle=True,                               n_repeats=n_epochs) val_dataset = create_dataset(“TFRecords/validation.tfrecords”, batch_size=batch_size)

Create a feedable iterator

handle = tf.placeholder(tf.string, []) feed_iterator = tf.data.Iterator.from_string_handle(handle, train_dataset.output_types,                                                  train_dataset.output_shapes) images, labels = feed_iterator.get_next()

Create different iterators

train_iterator = train_dataset.make_one_shot_iterator() val_iterator = val_dataset.make_initializable_iterator()

Create the model

logits = model(images, [500, 500]) loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits) loss = tf.reduce_mean(loss) train_op = tf.train.AdamOptimizer(learning_rate=1e-04).minimize(loss) predictions = tf.argmax(logits, axis=1) accuracy = tf.reduce_mean(tf.cast(tf.equal(predictions, labels), tf.float32))

init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) with tf.Session() as sess: Sess.run (init_op) # Generate the corresponding handle train_handle = sess.run(train_iterator.string_handle()) val_handle = Sess.run (val_iterator.string_handle()) # train for n in range(n_epochs): ls = [] for i in range(n_train_examples // batch_size): _, l = sess.run([train_op, loss], feed_dict={handle: train_handle}) ls.append(l) print(“Epoch %d, train loss: %f” % (n, np.mean(ls))) if (n + 1) % 10 == 0: sess.run(val_iterator.initializer) accs = [] for i in range(n_val_examples // batch_size): acc = sess.run(accuracy, feed_dict={handle: val_handle}) accs.append(acc) print(“\t validation accuracy: %f” % (np.mean(accs)))

About 98% accuracy can be achieved on the verification set. If you are a TensorFlow user, you can try it out. Of course, the above examples don't cover everything about tf.data. If you want to go further, you can visit the TF.data website. 1. [Programmers guide:... import data\](https://www.tensorflow.org/programmers\_guide/datasets). 2. \[How to use Dataset in TensorFlow\](https://towardsdatascience.com/how-to-use-dataset-in-tensorflow-c758ef9e4428). 3. \[Reading data\](https://www.tensorflow.org/api\_guides/python/reading\_data). 4. \[Performance: datasets performance\](https://www.tensorflow.org/performance/datasets\_performance). 5. \[Introduction to Artificial Neural Networks and Deep Learning: A Practical Guide with Applications in Python \] (https://github.com/rasbt/deep-learning-book/). The end * * * * * * * machine learning algorithm engineer A heart of the public number! [images] (https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/3bd963e92f3f441aa8e7199d29ae89dc~tplv-k3u1fbpfcp-zoom-1.image) * * *! [images] (https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ba4981108b7344fc845d0feb66c7ae9c~tplv-k3u1fbpfcp-zoom-1.image) > This article USES synchronous assistant] [article (https://juejin.cn/post/6940875049587097631)Copy the code