透過子類別化建立新的層和模型

作者: fchollet

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 在 keras.io 上檢視

設定

import tensorflow as tf
from tensorflow import keras

Layer 類別:狀態 (權重) 和一些運算的組合

Keras 的核心抽象概念之一是 Layer 類別。層封裝了狀態 (層的「權重」) 以及從輸入到輸出的轉換 («call»,層的正向傳遞)。

以下是密集連接層。它具有狀態:變數 wb

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super().__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

您可以使用層,方法是在一些張量輸入上呼叫它,就像 Python 函式一樣。

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor(
[[-0.02419483 -0.06813122  0.00395634 -0.03124779]
 [-0.02419483 -0.06813122  0.00395634 -0.03124779]], shape=(2, 4), dtype=float32)

請注意,權重 wb 會在設定為層屬性後,由層自動追蹤

assert linear_layer.weights == [linear_layer.w, linear_layer.b]

層可以具有非可訓練權重

除了可訓練權重外,您也可以將非可訓練權重新增至層。這類權重在您訓練層時,不應在反向傳播期間納入考量。

以下說明如何新增和使用非可訓練權重

class ComputeSum(keras.layers.Layer):
    def __init__(self, input_dim):
        super().__init__()
        self.total = self.add_weight(
            initializer="zeros", shape=(input_dim,), trainable=False
        )

    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total


x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())
[2. 2.]
[4. 4.]

它是 layer.weights 的一部分,但會歸類為非可訓練權重

print("weights:", len(my_sum.weights))
print("non-trainable weights:", len(my_sum.non_trainable_weights))

# It's not included in the trainable weights:
print("trainable_weights:", my_sum.trainable_weights)
weights: 1
non-trainable weights: 1
trainable_weights: []

最佳做法:延後權重建立,直到輸入形狀已知為止

我們上方的 Linear 層採用了 input_dim 引數,該引數用於計算 __init__() 中權重 wb 的形狀

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super().__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

在許多情況下,您可能無法事先知道輸入的大小,而且您希望在值變成已知時 (在層例項化後的一段時間),延遲建立權重。

在 Keras API 中,我們建議在層的 build(self, inputs_shape) 方法中建立層權重。如下所示

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

層的 __call__() 方法會在第一次呼叫時自動執行建構。您現在有一個延遲載入的層,因此更容易使用

# At instantiation, we don't know on what inputs this is going to get called
linear_layer = Linear(32)

# The layer's weights are created dynamically the first time the layer is called
y = linear_layer(x)

如上所示,分別實作 build() 可很好地將僅建立權重一次與在每次呼叫中使用權重分開。但是,對於某些進階自訂層,將狀態建立和運算分開可能變得不切實際。層實作人員可以延後將權重建立至第一次 __call__(),但需要注意後續呼叫使用相同的權重。此外,由於 __call__() 很可能在 tf.function 內第一次執行,因此在 __call__() 中發生的任何變數建立都應包裝在 tf.init_scope 中。

層可以遞迴組合

如果您將 Layer 例項指派為另一個 Layer 的屬性,則外部層將開始追蹤內部層所建立的權重。

我們建議在 __init__() 方法中建立這類子層,並將觸發其權重建構的工作留給第一次 __call__()

class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)


mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))
weights: 6
trainable weights: 6

add_loss() 方法

在編寫層的 call() 方法時,您可以建立稍後在編寫訓練迴圈時想要使用的損失張量。這可以透過呼叫 self.add_loss(value) 來完成

# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self, rate=1e-2):
        super().__init__()
        self.rate = rate

    def call(self, inputs):
        self.add_loss(self.rate * tf.reduce_mean(inputs))
        return inputs

請注意,add_loss() 可以採用一般 TensorFlow 運算的結果。這裡不需要呼叫 Loss 物件。

這些損失 (包括任何內部層所建立的損失) 可以透過 layer.losses 擷取。此屬性會在每個 __call__() 的開頭重設為最上層層,以便 layer.losses 始終包含上次正向傳遞期間建立的損失值。

class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)


layer = OuterLayer()
assert len(layer.losses) == 0  # No losses yet since the layer has never been called

_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # We created one loss value

# `layer.losses` gets reset at the start of each __call__
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # This is the loss created during the call above

此外,loss 屬性也包含為任何內部層的權重建立的正規化損失

class OuterLayerWithKernelRegularizer(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.dense = keras.layers.Dense(
            32, kernel_regularizer=keras.regularizers.l2(1e-3)
        )

    def call(self, inputs):
        return self.dense(inputs)


layer = OuterLayerWithKernelRegularizer()
_ = layer(tf.zeros((1, 1)))

# This is `1e-3 * sum(layer.dense.kernel ** 2)`,
# created by the `kernel_regularizer` above.
print(layer.losses)
[<tf.Tensor: shape=(), dtype=float32, numpy=0.0017542194>]

這些損失旨在編寫訓練迴圈時納入考量,如下所示

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Iterate over the batches of a dataset.
for x_batch_train, y_batch_train in train_dataset:
    with tf.GradientTape() as tape:
        logits = layer(x_batch_train)  # Logits for this minibatch
        # Loss value for this minibatch
        loss_value = loss_fn(y_batch_train, logits)
        # Add extra losses created during this forward pass:
        loss_value += sum(model.losses)

    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

如需編寫訓練迴圈的詳細指南,請參閱從頭開始編寫訓練迴圈指南

這些損失也可以與 fit() 無縫協作 (它們會自動加總並新增至主要損失,如果有的話)

import numpy as np

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

# If there is a loss passed in `compile`, the regularization
# losses get added to it
model.compile(optimizer="adam", loss="mse")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

# It's also possible not to pass any loss in `compile`,
# since the model already has a loss to minimize, via the `add_loss`
# call during the forward pass!
model.compile(optimizer="adam")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
1/1 [==============================] - 0s 75ms/step - loss: 0.1081
1/1 [==============================] - 0s 31ms/step - loss: 0.0044
<keras.src.callbacks.History at 0x7fb23c0e3f40>

您可以選擇性地在層上啟用序列化

如果您需要自訂層可序列化,以作為Functional 模型的一部分,您可以選擇性地實作 get_config() 方法

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        return {"units": self.units}


# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'units': 64}

請注意,基本 Layer 類別的 __init__() 方法採用一些關鍵字引數,特別是 namedtype。在 __init__() 中將這些引數傳遞至父類別,並將它們包含在層設定中,是很好的做法

class Linear(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super().__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super().get_config()
        config.update({"units": self.units})
        return config


layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'name': 'linear_7', 'trainable': True, 'dtype': 'float32', 'units': 64}

如果您在從設定還原序列化層時需要更高的彈性,您也可以覆寫 from_config() 類別方法。這是 from_config() 的基本實作

def from_config(cls, config):
  return cls(**config)

如要進一步瞭解序列化和儲存,請參閱完整的模型儲存和序列化指南

call() 方法中的特殊權限 training 引數

有些層 (特別是 BatchNormalization 層和 Dropout 層) 在訓練和推論期間具有不同的行為。對於這類層,標準做法是在 call() 方法中公開 training (布林值) 引數。

透過在 call() 中公開此引數,您可以讓內建的訓練和評估迴圈 (例如 fit()) 在訓練和推論中正確使用層。

class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=False):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

call() 方法中的特殊權限 mask 引數

call() 支援的另一個特殊權限引數是 mask 引數。

您會在所有 Keras RNN 層中找到它。遮罩是布林值張量 (輸入中每個時間步有一個布林值),用於在處理時間序列資料時跳過某些輸入時間步。

當遮罩由先前的層產生時,Keras 會自動將正確的 mask 引數傳遞至支援它的層的 __call__()Embedding 層 (配置為 mask_zero=True) 和 Masking 層是產生遮罩的層。

如要進一步瞭解遮罩以及如何編寫啟用遮罩的層,請查看指南「瞭解填補和遮罩」

Model 類別

一般而言,您會使用 Layer 類別來定義內部運算區塊,並使用 Model 類別來定義外部模型,也就是您要訓練的物件。

例如,在 ResNet50 模型中,您會有數個子類別化 Layer 的 ResNet 區塊,以及一個包含整個 ResNet50 網路的 Model

Model 類別具有與 Layer 相同的 API,但有以下差異

  • 它公開了內建的訓練、評估和預測迴圈 (model.fit()model.evaluate()model.predict())。
  • 它透過 model.layers 屬性公開了其內部層的清單。
  • 它公開了儲存和序列化 API (save()save_weights()...)。

實際上,Layer 類別對應於我們在文獻中稱為「層」(如「卷積層」或「循環層」) 或「區塊」(如「ResNet 區塊」或「Inception 區塊」) 的內容。

同時,Model 類別對應於文獻中稱為「模型」(如「深度學習模型」) 或「網路」(如「深度神經網路」) 的內容。

因此,如果您想知道「我應該使用 Layer 類別還是 Model 類別?」,請自問:我是否需要在其上呼叫 fit()?我是否需要在其上呼叫 save()?如果是,請使用 Model。如果不是 (因為您的類別只是較大系統中的一個區塊,或者因為您自己編寫訓練和儲存程式碼),請使用 Layer

例如,我們可以採用上述的迷你 ResNet 範例,並使用它來建構一個 Model,我們可以透過 fit() 進行訓練,並且可以透過 save_weights() 進行儲存

class ResNet(keras.Model):

    def __init__(self, num_classes=1000):
        super().__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save(filepath.keras)

整合在一起:端對端範例

以下是您目前學到的內容

  • Layer 封裝了狀態 (在 __init__()build() 中建立) 和一些運算 (在 call() 中定義)。
  • 層可以遞迴巢狀化,以建立新的、更大的運算區塊。
  • 層可以透過 add_loss() 建立和追蹤損失 (通常是正規化損失)。
  • 外部容器 (您要訓練的東西) 是 ModelModel 就像 Layer,但新增了訓練和序列化公用程式。

讓我們將所有這些內容整合到一個端對端範例中:我們將實作變分自動編碼器 (VAE)。我們將在 MNIST 數字上訓練它。

我們的 VAE 將是 Model 的子類別,建構為子類別化 Layer 的層的巢狀組合。它將具有正規化損失 (KL 散度)。

from keras import layers


@keras.saving.register_keras_serializable()
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


@keras.saving.register_keras_serializable()
class Encoder(layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

    def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
        super().__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


@keras.saving.register_keras_serializable()
class Decoder(layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""

    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
        super().__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_output = layers.Dense(original_dim, activation="sigmoid")

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)


@keras.saving.register_keras_serializable()
class VariationalAutoEncoder(keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""

    def __init__(
        self,
        original_dim,
        intermediate_dim=64,
        latent_dim=32,
        name="autoencoder",
        **kwargs
    ):
        super().__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        kl_loss = -0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)
        return reconstructed

讓我們在 MNIST 上編寫一個簡單的訓練迴圈

original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = keras.losses.MeanSquaredError()

loss_metric = keras.metrics.Mean()

(x_train, _), _ = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 2

# Iterate over epochs.
for epoch in range(epochs):
    print("Start of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # Add KLD regularization loss

        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
Start of epoch 0
WARNING:tensorflow:5 out of the last 5 calls to <function _BaseOptimizer._update_step_xla at 0x7fb220066af0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has reduce_retracing=True option that can avoid unnecessary retracing. For (3), please refer to https://tensorflow.dev.org.tw/guide/function#controlling_retracing and https://tensorflow.dev.org.tw/api_docs/python/tf/function for  more details.
WARNING:tensorflow:6 out of the last 6 calls to <function _BaseOptimizer._update_step_xla at 0x7fb220066af0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has reduce_retracing=True option that can avoid unnecessary retracing. For (3), please refer to https://tensorflow.dev.org.tw/guide/function#controlling_retracing and https://tensorflow.dev.org.tw/api_docs/python/tf/function for  more details.
step 0: mean loss = 0.3433
step 100: mean loss = 0.1257
step 200: mean loss = 0.0994
step 300: mean loss = 0.0893
step 400: mean loss = 0.0844
step 500: mean loss = 0.0810
step 600: mean loss = 0.0788
step 700: mean loss = 0.0772
step 800: mean loss = 0.0760
step 900: mean loss = 0.0750
Start of epoch 1
step 0: mean loss = 0.0747
step 100: mean loss = 0.0741
step 200: mean loss = 0.0736
step 300: mean loss = 0.0731
step 400: mean loss = 0.0727
step 500: mean loss = 0.0723
step 600: mean loss = 0.0720
step 700: mean loss = 0.0717
step 800: mean loss = 0.0715
step 900: mean loss = 0.0712

請注意,由於 VAE 是子類別化 Model,因此它具有內建的訓練迴圈。因此,您也可以像這樣訓練它

vae = VariationalAutoEncoder(784, 64, 32)

optimizer = keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=2, batch_size=64)
Epoch 1/2
938/938 [==============================] - 4s 3ms/step - loss: 0.0746
Epoch 2/2
938/938 [==============================] - 3s 3ms/step - loss: 0.0676
<keras.src.callbacks.History at 0x7fb1e0533580>