When we need extra control to write custom loss functions, custom metrics, layers, models, initializers, regularization functions, weight constraints, etc., this is when we need to explore TensorFlow and learn about the underlying Python API.

1 TensorFlow Quick view

TensorFlow is a powerful library for numerical computation, particularly suited for large-scale machine learning and fine-tuning it. What does TensorFlow offer? The summary is as follows:

  • Its core is very similar to NumPy, but with GPU support.
  • It supports distributed computing (across multiple devices and servers)
  • It includes a just-in-time (JIT) compiler that allows it to optimize calculations for speed and memory usage. The way it works is it takes a graph from a Python function, optimizes it, and runs it efficiently.
  • Computational diagrams can be exported to a portable format. For example, we can train the TensorFlow model in Linux and then run the TensorFlow model in Android.
  • It implements automatic differentiation and provides some excellent optimizers such as RMSProp and Nadam.

TensorFlow provides more functions based on these core functions. In addition to tf.keras, it also has data loading and preprocessing operations (tF.data, tF.io), image processing operations (TF.image), and signal processing operations (tF.signal). In addition, TensorFlow is not just a library of functions, but the core of a broader ecosystem. First, TensorBoard can be visualized, followed by TensorFlow Extended, a set of libraries built for a production environment for TensorFlow projects, including data validation, preprocessing, model analysis, and service tools. The TensorFlow Hub provides an easy way to download and reuse pre-trained neural networks, and many neural network architectures, some of which have been pre-trained, are available in the TensorFlow model garden.

2 Data structure of TensorFlow

2.1 Use TensorFlow like Numpy

TensorFlow’s API is all about tensors, which flow from one operation to another. Tensors are very similar to Numpy’s NDARray, which is usually a set of dimensions, but it can also hold scalars.

import tensorflow as tf
# create a tensor using tf.constant()
t = tf.constant([[1..2..3.], [4..5..6.]])
Copy the code
The 2022-03-01 20:42:41. 082775: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network  Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 FMA To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.Copy the code
# View shape
print(t.shape)
Check the data type
print(t.dtype)
Copy the code
(2, 3)
<dtype: 'float32'>
Copy the code
# Index works like numpy
t[:, 1:]
Copy the code
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>
Copy the code
t[..., 1, tf.newaxis]
Copy the code
<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>
Copy the code

The operation of various tensors

t + 10   # add 10, equivalent to tf.add(t, 10)
tf.square(t)  # square
t @ tf.transpose(t) # matrices A (2, 3) * B (3, 2) = C (2, 2), matrix multiplication is equivalent to tf. Matmul () function
Copy the code
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>
Copy the code

Tf.reduce_mean (), tf.reduce_sum(), tf.reduce_max(), tf.math.log() are equivalent to NP.mean (), NP.sum (), NP.max (), nP.log (). In addition, in TensorFlow, tF.transpose (t) must be written, while in Numpy, t.T. In TensorFlow, you create a new tensor using your own copy of transposed data, whereas in Numpy, t.T is just a transposed view of the same data.

In addition, many functions and classes have aliases, such as tf.add() and tf.math.add() are the same function. With one exception, tf.math.log() does not have an alias for tf.log().

The underlying API for Keras

The Keras API has its own low-level API in keras.backend. It contains functions such as square(), exp(), and SQRT (). An example of our use of the Keras function:

from tensorflow import keras
K = keras.backend
K.square(K.transpose(t)) + 10
Copy the code
<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
       [14., 35.],
       [19., 46.]], dtype=float32)>
Copy the code

2.2 The use of tensors and Numpy

Using tensors to create numpy arrays, using numpy arrays to create tensors

import numpy as np
a = np.array([2..4..5.])
tf.constant(a)   # numpy array transformation tensor
Copy the code
<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>
Copy the code
t.numpy() # Tensor to numpy array
Copy the code
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)
Copy the code
tf.square(a)  # Apply TensorFlow operations to numpy arrays
Copy the code
<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>
Copy the code
np.square(t)  # numpy operation applied to tensors
Copy the code
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)
Copy the code

Note: By default, Numpy uses 64-bit precision, while TensorFlow uses 32-bit precision. So create a tensor from the numpy array, making sure dTYPE = tF.float32 is set

2.3 Type Conversion

Type conversions severely affect performance. TensorFlow does not automatically perform any type conversions, and an exception will be thrown if an operation is performed on a tensor of incompatible types. For example, you cannot add floating point tensors to integer tensors, or even 32-bit floating point and 64-bit floating point.

tf.constant(2.) + tf.constant(40)   An exception is reported
tf.constant(2.) + tf.constant(40, dtype=tf.float64) An exception is reported
Copy the code
--------------------------------------------------------------------------- InvalidArgumentError Traceback (most recent call last) /var/folders/bn/0lpcbtqn6w7159rr0gp940z80000gn/T/ipykernel_7365/1443138918.py in <module> ----> 1 Tf.constant (2.) + tf.constant(40) # error 2 tf.constant(2.) + tf.constant(40, Dtype = tf. Float64) # exception error/opt/anaconda3 envs/keras/lib/python3.9 / site - packages/tensorflow/python/util/traceback_utils py  in error_handler(*args, **kwargs) 151 except Exception as e: 152 filtered_tb = _process_traceback_frames(e.__traceback__) --> 153 raise e.with_traceback(filtered_tb) from None 154 finally: 155 del filtered_tb/opt/anaconda3 / envs/keras/lib/python3.9 / site - packages/tensorflow/python/framework/ops. Py in raise_from_not_ok_status(e, name) 7105 def raise_from_not_ok_status(e, name): 7106 e.message += (" name: " + name if name is not None else "") -> 7107 raise core._status_to_exception(e) from None # pylint: disable=protected-access 7108 7109 InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2]Copy the code
Use tf.cast() if you really need to convert the type.
t2 = tf.constant(40, dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)
Copy the code
< tf. Tensor: shape = (), dtype = float32, numpy = 42.0 >Copy the code

2.4 variable

We see that the tF. Tensor values are constant and we can’t modify them, which means we can’t use regular tensors for weights in neural networks because they need to be adjusted by back propagation. So we need tF.variable

v = tf.Variable([[1..2..3.], [4..5..6.]])
v
Copy the code
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>
Copy the code

Methods of adding, subtracting, or modifying variables

v.assign(2*v)
v
Copy the code
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>
Copy the code
v[0.1].assign(42)
Copy the code
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>
Copy the code
v.scatter_nd_update(indices=[[0.0], [1.2]], updates=[100..200.])
Copy the code
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   6.],
       [  8.,  10., 200.]], dtype=float32)>
Copy the code

2.5 Other Data Structures

  • SparseTensor (tf.sparsetensor)
  • TensorArray TensorArray
  • Irregular tensor (TF.Raggedtensor)
  • String tensor (tF.string)
  • Sets (tf.sets)
  • Queue (tF.queue)

3. Customized model and training algorithm

3.1 Custom loss function

We want to use Huber losses, currently Huber losses are not part of the official Keras API but can be used in TF.keras (Keras.losses.Huber instance). However, we customize the loss function by pretending it doesn’t exist.

def huber_fn(y_true, y_pred) :
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)
y_true = tf.constant([[1.], [0]], dtype="float32")
y_pred = tf.constant([[0.8], [0.2]], dtype="float32")
huber_fn(y_true, y_pred)
Copy the code
Tensor: Shape =(2, 1), dType = Float32, Numpy = Array ([[0.02], [0.02]], DType = Float32)>Copy the code

As you can see, the tensor of loss per instance is returned, rather than the average loss of the instance, so that Keras can use category weights or sample weights as required. Now we use this loss when compiling the Keras model, and then train the model.

model.compile(loss=huber_fn, optimizer="nadam") model.fit(X_train, y_train, [...] )Copy the code

That is, for each batch during training, Keras calls the huber_fu() function to calculate the loss and use it to perform the “gradient descent” step. In addition, it tracks total losses since the start of the round and shows average losses.

3.2 Save and load models that contain custom components

Saving models that contain custom loss functions is convenient for Keras. Because Keras stores the name of the function, you need to provide a dictionary that maps the function name to the actual function each time it is loaded.

model = keras.models.load_model("my_model_with_a_custom_loss.h5",
                                custom_objects={"huber_fn":huber_fn}
                               )
Copy the code

What if you want a different threshold? One solution is to create a function that creates the configured loss function:

def create_huber(threshold=1.0) :
    def huber_fn(y_true, y_pred) :
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold*tf.abs(error) - 0.5*threshold**2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

model.compile(loss=create_huber(2.0), optimizer="nadam")
Copy the code

Unfortunately, the threshold is not saved when the model is saved, which means that the threshold must be specified when the model is loaded. Also, note that the name used is “huber_fn”, which is the name of the function named by Keras, not create_huber, as it was created.

model = keras.models.load_model("my_model_with_a_custom_loss.h5",
                                custom_objects={"huber_fn":create_huber(2.0)})Copy the code

We can solve this problem by creating a subclass of the Keras.Losses.Loss class and then implementing its get_config() method

class HuberLoss(keras.losses.Loss) :
    def __init__(self, threshold=1.0, **kwargs) :
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred) :
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold*tf.abs(error) - 0.5*threshold**2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self) :
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}
Copy the code

Explain the above code:

  • The constructor takes **kwargs and passes them to the superclass constructor, which handles the standard hyperparameters: the name of the loss and the reduction algorithm used to aggregate the loss of a single instance. By default, it is “sum_over_batch_size”, which means that the loss will be the sum of the instance loss, weighted by the sample weight (if any), divided by the batch size.
  • The Call () method takes the label and forecast, calculates all instance losses, and returns them.
  • The get_config() method returns a dictionary that maps each hyperparameter name to its value. It first calls the parent’s get_config() method, and then adds the new hyperparameter to the dictionary

We can then use any instance of this class when compiling the model:

model.compile(loss=HuberLoss(2.0), optimizer="nadam")
Copy the code

When the model is saved, the thresholds are saved together, and when the model is loaded, just map the class name to the class itself:

model = keras.models.load_model("my_model_with_a_custom_loss.h5",
                                custom_objects={"HuberLoss":HuberLoss}
                               )
Copy the code

3.3 Custom activation functions, initializations, regularization, and constraints

Most Keras functions, such as losses, regularization, constraints, initializations, measures, activation functions, layers, and even complete models, can be customized in almost the same way. In most cases, you just need to write simple functions with appropriate inputs and outputs.

def my_softplus(z) :
    # custom activation function, equivalent to the tf. The nn. Softplus () or keras. Activations. Softplus ()
    return tf.math.log(tf.exp(z)+1.0)
def my_glorot_initializer(shape, dtype=tf.float32) :
    # custom Glorot initialization, equivalent to the keras. The initializers. Glorot_normal ()
    stddev = tf.sqrt(2. /(shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)
def my_l1_regularizer(weights) :
    # Custom L1 regularization, equivalent to keras.regularizers.l1(0.01)
    return tf.reduce_sum(tf.abs(0.01 * weights))
def my_positive_weights(weights) :
    # custom constraint that ensures all weights are positive, equivalent to tf.nn.relu(weights)
    return tf.where(weights < 0, tf.zeros_like(weights), weights)
Copy the code

The argument depends on the type of the custom function, which can then be used normally, for example:

layer = keras.layers.Dense(30, activation=my_softplus,
                           kernel_initializer=my_glorot_initializer,
                           kernel_regularizer=my_l1_regularizer,
                           kernel_constraint=my_positive_weights
                          )
Copy the code

The activation function will apply the output of this Dense layer and the results will be passed on to the next layer. The weight of the layer will be initialized using the value returned by the initializer. At each training step, the weight is passed to the regularization function to calculate the regularization loss and added to the primary loss to get the final loss for training. Finally, the constraint function will be called after each training step, and the weight of the layer will be replaced by the constraint weight.

If the function has hyperparameters that need to be stored with the model, you need to inherit the appropriate class. Such as: keras. Regularizers. Regularizer, keras. Constraints. The Constraint, keras. Initializers., Initializer, keras. The layers. A Layer. Such as:

This time we don't need to call the superclass constructor or get_config() method because they are not defined by the superclass
class MyL1Regularizer(keras.regularizers.Regularizer) :
    def __init__(self, factor) :
        self.factor = factor
    def __call__(self, weights) :
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self) :
        return {"factor": self.factor}
Copy the code

Note: Losses, layers (including activation functions), and models must be fed to implement call() methods, while __call__() methods must be implemented for regularization, initialization, and constraints.

3.4 Customizing Indicators

Losses and indices are not conceptually the same thing: losses (e.g., cross entropy) are gradient dropped to train the model, so they must be differentiable, and the gradient should never be zero anywhere. That’s fine if it’s not easy for humans to explain them. Instead, metrics (e.g., accuracy) are used to evaluate models, and they must be easier to interpret. In most cases, defining a custom index function is exactly the same as defining a custom loss function. In fact, the Huber loss function we created earlier is used as an indicator, such as:

model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])
Copy the code

For each lot during training, Keras calculates this metric and tracks the mean since the start of the round. For example, consider the accuracy of a binary classifier, assuming that the model makes 5 positive predictions in the first batch, 4 of which are correct, then the accuracy is 80%. Assuming that the model looks for that in the second batch and makes 3 positive predictions, 0 is correct, then the accuracy of the second batch is 0%. If you average only the two accuracies, you get 40%. However, this is not the accuracy of the model on these two batches! In fact, of the eight positive predictions (5+3), a total of four are correct (4+0), so the overall accuracy is 50%, not 40%. Therefore, we need an object that can track the number of true and false positives, and that can calculate the ratio at request. This is exactly what the Keras.metrics.Precision class does.

precision = keras.metrics.Precision()
precision([0.1.1.1.0.1.0.1], [1.1.0.1.0.1.0.1])
Copy the code
< tf. Tensor: shape = (), dtype = float32, numpy = 0.8 >Copy the code
precision([0.1.0.0.1.0.1.1], [1.0.1.1.0.0.0.0])
Copy the code
< tf. Tensor: shape = (), dtype = float32, numpy = 0.5 >Copy the code

We can see that a Precision object is created and then used as a function to pass labels and predictions for the first batch and for the second batch to it. For the first batch, 80% accuracy is returned, and after the second batch, 50% is returned. This is called the streaming metric because it is updated batch by batch.

At any time, we can call the result() method to get the current value of the metric. We can also view its variables using the variables property and reset them using the reset_stats() method:

precision.result()
Copy the code
< tf. Tensor: shape = (), dtype = float32, numpy = 0.5 >Copy the code
precision.variables
Copy the code
[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]
Copy the code
precision.reset_states() # variables are reset to 0;
Copy the code
precision.result()
Copy the code
< tf. Tensor: shape = (), dtype = float32, numpy = 0.0 >Copy the code

To create such a streaming variable, subclass the Keras.metrics.Metric class. Below is a simple example of tracking Huber’s total losses and the number of instances seen so far. When asked for the result, it returns the ratio, which is the average Huber loss.

class HuberMetric(keras.metrics.Metric) :
    def __init__(self, threshold=1.0, **kwargs) :
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None) :
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self) :
        return self.total / self.count
    def get_config(self) :
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}
Copy the code

Above code explanation:

  • The constructor uses the add_weight() method to create the variables needed to track the measurement status of multiple batches. In this case, the variable includes the sum of all Huber losses, as well as the number of instances seen so far. You can create variables manually if you wish. Keras keeps track of any TF.variable that is set as a property.
  • When you use an instance of this class as a function, the update_state() method is called. Give the label and predicted value of a batch. Given a batch’s label and predicted value, it updates the variables, as the Precision object does.
  • The result() method computes and returns the final result, in this case the average Huber measure for all instances. When you use the measure as a function, first call the updata_state() method, then call the result() method, and return its output.
  • We also implement the get_config() method to ensure that threshold is saved along with the model.
  • The default implementation of the reset_states() method resets all variables to 0.0.
huber = HuberMetric()
y_true = tf.constant([[1.], [0]], dtype="float32")
y_pred = tf.constant([[0.8], [0.2]], dtype="float32")
huber(y_true, y_pred)
Copy the code
< tf. Tensor: shape = (), dtype = float32, numpy = 0.02 >Copy the code
y_true = tf.constant([[1.], [0]], dtype="float32")
y_pred = tf.constant([[0.6], [0.2]], dtype="float32")
huber(y_true, y_pred)   # (0.02 + 0.02 + 0.08 + 0.02) / 4
Copy the code
< tf. Tensor: shape = (), dtype = float32, numpy = 0.034999996 >Copy the code
huber.result()
Copy the code
< tf. Tensor: shape = (), dtype = float32, numpy = 0.034999996 >Copy the code

3.5 Custom Layer

If you want to build an architecture with unique layers, TensorFlow does not provide a default implementation. We need to create a custom layer, or we want to build a repetitive architecture that contains the same layer block repeated many times.

First, some layers have no weight, such as keras.layers.flatten or keras.layers.relu (). If you want to create a custom layer without any weight, the easiest option is to write a function and wrap it in the keras.layers.lambda layer. For example, the following layer will apply an exponential function to its input:

exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))
Copy the code

This custom layer can then be used like any other layer such as the sequential API, functional API, or subclass API, or it can be used as an activation function, and sometimes an exponential layer in the output layer of the regression model.

To build a stateful Layer (a Layer with weights), create a subclass of the Keras.layers. Layer class. For example, implement a simplified version of the Dense layer.

class MyDense(keras.layers.Layer) :
    def __init__(self, units, activation=None, **kwargs) :
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
    def build(self, batch_input_shape) :
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal"
        )
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros"
        )
        super().build(batch_input_shape)  # This has to go back here
        
    def call(self, X) :
        return self.activation(X @ self.kernel + self.bias)
    
    def compute_output_shape(self, batch_input_shape) :
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
    
    def get_config(self) :
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)}   
Copy the code

Above code explanation:

  • The constructor takes all hyperparameters as parameters, such as units, activation, and importantly, it also accepts **kwargs parameters. It calls the superclass constructor, which is passed to Kwargs: it handles the standard arguments, such as input_Shape, Trainable, and Name, and then saves the hyperarguments as attributes.
  • Use the keras.activation.get() function to convert the activation parameter to the appropriate activation function, which accepts functions or standard strings (e.g., “relu”, “selu”).
  • The build() method creates the layer’s variables by calling the add_weight() method for each weight. The first time you use this layer, the build() method is called. At this point, Keras knows the input shape for that layer and passes it to the build() method, which is usually necessary to create some weights. Also, at the end of the build() method, and only at the end, the parent class’s build() method must be called: this tells Keras that the layer is built (it just sets self.build=True).
  • The call() method performs the required action. Compute the matrix product of input X with layer kernel, add bias vector, and apply activation function to the result to obtain layer output.
  • The compute_output_shape() method returns only the shape of the layer’s output.
  • The get_config() method is just like in the previous custom class. Please note that call keras. Activations. Serialize () keep full configuration of the activation function.

Now we can use the MyDense layer just like any other layer.

Note: the compute_output_shape() method can actually be omitted because tF.keras will automatically infer the output shape, which is required unless the layer is dynamic.

To create a layer with multiple inputs (such as Concatenate), the call() method argument should be a tuple containing all the inputs, and the compute_output_shape() method argument should be a tuple containing the batch shape of each input. To create layers of multiple outputs, the call() method should return a list of outputs, and compute_output_shape() should return a list of batch output shapes. Example: The following layer takes two inputs and returns three:

class MyMultiLayer(keras.layers.Layer) :
    def call(self, X) :
        X1, X2 = X
        return [X1+X2, X1*X2, X1/X2]
    
    def compute_output_shape(self, batch_input_shape) :
        b1, b2 = batch_input_shape
        return [b1, b1, b1]    Follow the slave broadcast rules
Copy the code

You can now use this layer just like any other layer, but only functional and subclass apis, not sequential apis (which only accept layers with one input and one output).

If our layer behaves differently during training and testing, such as: Dropout, we must add the training parameter to the Call () method and use it to decide what to do. Let’s create a layer that adds Gaussian noise during training, but does nothing during testing.

class MyGaussianNoise(keras.layers.Layer) :
    def __init__(self, stddev, **kwargs) :
        super().__init__(**kwargs)
        self.stddev = stddev
    
    def call(self, X, training=None) :
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X+noise
        else:
            return X
    def compute_output_shape(self, batch_input_shape) :
        return batch_input_shape
Copy the code

3.6 Custom Models

Actually, we discussed the creation of custom model classes earlier, when we discussed the subclass API. But when you want to customize an arbitrary model of the ResidualBlock layer (including skipped connections), how do you construct it? As shown in the figure:

class ResidualBlock(keras.layers.Layer) :
    def __init__(self, n_layers, n_neurons, **kwargs) :
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
                                          kernel_initializer="he_normal"
                                         )
                       for _ in range(n_layers)
                      ]
    def call(self, inputs) :
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z
Copy the code

This layer is special because it contains other layers. This is handled transparently by Keras, which automatically detects hidden properties that contain traceable objects, so their variables are automatically added to the variable list for that layer. Next, we define the model itself using the subclass API:

class ResidualRegressor(keras.Model) :
    def __init__(self, output_dim, **kwargs) :
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2.30)
        self.block2 = ResidualBlock(2.30)
        self.out = keras.layers.Dense(output_dim)
    def call(self, inputs) :
        Z = self.hidden1(inputs)
        for _ in range(1+3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)
Copy the code

If you want to save the model using the save() method and load the model using the keras.models.load_model() function. The get_config() method must be implemented in both ResidualBlock and ResidualRegressor classes. Alternatively, you can use the save_weights() and load_weights() methods to save and load weights.

The Model class is a subclass of the Layer class, so you can define and use models just like you define layers, but with some additional functionality, This includes its compile(), fit(), evaluate(), and predict() methods, as well as its get_layers() method, which can return any layer of the model by name or index, and its save() method.

Let’s think about a few things: first, define losses or metrics within the model; Second, how to build custom loops.

3.7 Based on losses and indicators within the model

The custom losses and metrics we defined earlier were based on labels and forecasts (with optional sample weights). But sometimes we may need to define losses in terms of other parts of the model, such as weights or activation of hidden layers. This may be useful for regularizing or monitoring some internal aspects of the model.

To customize losses within the model, calculate against any part of the desired model and pass the results to the add_loss() method.

class ReconstructingRegressor(keras.Model) :
    def __init__(self, output_dim, **kwargs) :
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal")
                       for _ in range(5)
                      ]
        self.out = keras.layers.Dense(output_dim)
    def build(self, batch_input_shape) :
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(batch_input_shape)
    def call(self, inputs) :
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruct - inputs))
        self.add_loss(0.05 * recon_loss)
        return self.out(Z)
Copy the code

Explanation of the above code:

  • The constructor creates five dense hidden layers and one dense output layer, DNN
  • The build() method creates an additional dense layer that is used to reconstruct the inputs to the model. It must be created here because its number of units must equal the input number and is unknown until the build() method is called.
  • The Call () method processes all five hidden layer inputs and then passes the results to the rebuild layer, resulting in refactoring.
  • The call() method then calculates the reconstruction loss(the mean square error between the reconstruction and the input) and adds it to the loss list of the model using the add_loss() method. Note that multiplying by 0.05 scales down the rebuild, which ensures that the rebuild losses do not make up the bulk of the major losses.

Similarly, we can add custom metrics based inside the model by calculating in any way we want, as long as the result is the output of the metrics object. For example, we can create the keras.metrics.Mean object in the constructor, then call it in the call() method, pass it recon_loss, and finally add it to the model by calling the model add_metric() method. So when you train the model, Keras will show you both the average loss per turn and the average reconstruction error per turn.

3.8 Calculate gradients using automatic differentiation

def f(w1, w2) :
    return 3*w1**2 + 2*w1*w2

w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)
gradients = tape.gradient(z, [w1, w2])
gradients
Copy the code
[< tf.tensor: Shape =(), dType =float32, numpy=36.0>, < tf.tensor: Shape =(), dType =float32, numpy=10.0>]Copy the code

After the gradient() method of tape is called, the tape will be automatically erased immediately. Therefore, if we try to call gradient() twice, there will be an exception:

tape.gradient(z, [w1, w2])
Copy the code
--------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) /var/folders/bn/0lpcbtqn6w7159rr0gp940z80000gn/T/ipykernel_7365/2306421709.py in <module> ----> 1 tape.gradient(z, [w1 and w2])/opt/anaconda3 envs/keras/lib/python3.9 / site - packages/tensorflow/python/eager/backprop py in gradient (self, target, sources, output_gradients, unconnected_gradients) 1030 """ 1031 if self._tape is None: -> 1032 raise RuntimeError("A non-persistent GradientTape can only be used to " 1033 "compute one set of gradients (or jacobians)") 1034 if self._recording: RuntimeError: A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)Copy the code

If gradient() needs to be called several times, the tape must be made persistent and deleted after each use to release resources.

with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
del tape
Copy the code

By default, tape tracks only operations involving variables, and if you try to compute the gradient of Z for any variable other than the variable, the result will be None.

However, you can force tape to observe any tensors you like to record all operations involving them, and then compute gradients against those tensors.

c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    z = f(c1, c2)
print(tape.gradient(z, [c1, c2]))

# # # # #
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)
print(tape.gradient(z, [c1, c2]))
Copy the code
[None, None] [< tf.tensor: Shape =(), dType =float32, numpy=36.0>, < tf.tensor: Shape =(), dType =float32, numpy=10.0>]Copy the code

In some cases, we want tissue gradients to propagate back in some parts of the neural network. To do this, you must use the tF.stop_gradient () function. This function returns its input during forward propagation, but does not let the gradient pass through during back propagation.

def f(w1, w2) :
    return 3*w1**2 + tf.stop_gradient(2*w1*w2)
with tf.GradientTape() as tape:
    z = f(w1, w2)  # forward propagation same with or without stop_gradient()
gradients = tape.gradient(z, [w1, w2]) # Backpropagation will ignore the calculation specified by stop_gradient()
gradients
Copy the code
[<tf.Tensor: Shape =(), dType =float32, numpy=30.0>, None]Copy the code

3.9 Custom training cycle

In rare cases, the FIT () method may not be flexible enough to meet your needs. For example, the fit() method uses only one optimizer.

We build a simple model without compiling it, because we will handle the training loop manually;

l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal",
                      kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)
])
Copy the code

Create a small function that randomly samples a batch of instances from the training set:

def random_batch(X, y, batch_size=3) :
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]
Copy the code

Define a function that displays the training status, including number of steps, total number of steps, average loss since the start of the turn, and other metrics:

def print_status_bar(iteration, total, loss, metrics=None) :
    metrics = "-".join(["{}: {:4f}".format(m.name, m.result())
                          for m in [loss] + (metrics or [])
                         ])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics,
         end=end)
Copy the code

To clarify: \r(enter) and end=”” Make sure the status bar is always printed on the same line. The print_status_bar() function includes a progress bar, but TQDM can also be used.

We need to define some hyperparameters, and then select optimizers, loss functions, and metrics.

fashion_mnist = keras.datasets.fashion_mnist
(X_train_full, y_train_full), (X_test, y_test) = fashion_mnist.load_data()
X_valid, X_train = X_train_full[:5000] /255.0, X_train_full[5000: /255.0
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]
Copy the code

Ready to build custom loops:

for epoch in range(1, n_epochs+1) :print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            # Primary loss plus other losses (this model has a regularization loss for each layer)
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        mean_loss(loss)
        # indicators
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
        for metric in [mean_loss] + metrics:
            metric.reset_states()
Copy the code
Epoch 1/5 54976/55000-mean: 11.125340-mean_absolute_error: 3.246988Epoch 2/5 54976/55000-mean: 11.125340-mean_absolute_error: 3.246988Epoch 2/5 54976/55000-mean: 6.180694 - mean_absolute_error: 1.975620Epoch 3/5 54976/55000 - mean: 18.690468-mean_absolute_error: 1.975620Epoch 3/5 54976/55000 - mean: 18.690468-mean_absolute_error: 4.290856Epoch 4/5 54976/55000-mean: 8.1051144-mean_absolute_error: 2.175005Epoch 5/5 54976/55000-mean: 4.059292 - mean_absolute_error: 1.9687572Copy the code

If you set the optimizer’s hyperparameter clipnorm or ClipValue, apply any other transformation to the gradient just before calling the apply_gradients method.

If you want to add weight constraints to the model, such as kernel_constraint or biAS_constraint, apply those constraints after apply_gradients.

for variable in model.variables:
    if variable.constraint is not None:
        variable.assign(variable.constraint(variable))
Copy the code

Most importantly, this training loop does not deal with layers that behave differently during training and testing, such as Dropout. To handle these issues, use the Trining =True call model, ensuring that it is propagated to every layer that needs it.