作者: fchollet
import tensorflow as tf
import keras
from keras import layers
import numpy as np
Keras 提供預設的訓練和評估迴圈 fit()
和 evaluate()
如果您想要自訂模型的學習演算法,同時仍利用 fit()
的便利性 (例如,使用 fit()
訓練 GAN),您可以子類別化 Model
類別並實作您自己的 train_step()
方法,該方法會在 fit()
期間重複呼叫。這在指南自訂 fit() 中發生的情況中涵蓋。
使用 GradientTape
在 GradientTape
範圍內呼叫模型可讓您檢索層的可訓練權重相對於損失值的梯度。使用最佳化工具執行個體,您可以使用這些梯度來更新這些變數 (您可以使用 model.trainable_weights
讓我們考慮一個簡單的 MNIST 模型
inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)
# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))
# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]
# Prepare the training dataset.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)
- 我們開啟一個
迴圈,該迴圈會疊代週期 - 對於每個週期,我們開啟一個
迴圈,該迴圈會在資料集上以批次疊代 - 對於每個批次,我們開啟一個
範圍 - 在此範圍內,我們呼叫模型 (正向傳遞) 並計算損失
- 在範圍之外,我們檢索模型權重相對於損失的梯度
- 最後,我們使用最佳化工具根據梯度更新模型權重
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
# Open a GradientTape to record the operations run
# during the forward pass, which enables auto-differentiation.
with tf.GradientTape() as tape:
# Run the forward pass of the layer.
# The operations that the layer applies
# to its inputs are going to be recorded
# on the GradientTape.
logits = model(x_batch_train, training=True) # Logits for this minibatch
# Compute the loss value for this minibatch.
loss_value = loss_fn(y_batch_train, logits)
# Use the gradient tape to automatically retrieve
# the gradients of the trainable variables with respect to the loss.
grads = tape.gradient(loss_value, model.trainable_weights)
# Run one step of gradient descent by updating
# the value of the variables to minimize the loss.
optimizer.apply_gradients(zip(grads, model.trainable_weights))
# Log every 200 batches.
if step % 200 == 0:
"Training loss (for one batch) at step %d: %.4f"
% (step, float(loss_value))
print("Seen so far: %s samples" % ((step + 1) * batch_size))
Start of epoch 0 WARNING:tensorflow:5 out of the last 5 calls to <function _BaseOptimizer._update_step_xla at 0x7f51fe36a4c0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has reduce_retracing=True option that can avoid unnecessary retracing. For (3), please refer to https://tensorflow.dev.org.tw/guide/function#controlling_retracing and https://tensorflow.dev.org.tw/api_docs/python/tf/function for more details. WARNING:tensorflow:6 out of the last 6 calls to <function _BaseOptimizer._update_step_xla at 0x7f51fe36a4c0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has reduce_retracing=True option that can avoid unnecessary retracing. For (3), please refer to https://tensorflow.dev.org.tw/guide/function#controlling_retracing and https://tensorflow.dev.org.tw/api_docs/python/tf/function for more details. Training loss (for one batch) at step 0: 131.3794 Seen so far: 64 samples Training loss (for one batch) at step 200: 1.2871 Seen so far: 12864 samples Training loss (for one batch) at step 400: 1.2652 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.8800 Seen so far: 38464 samples Start of epoch 1 Training loss (for one batch) at step 0: 0.8296 Seen so far: 64 samples Training loss (for one batch) at step 200: 1.3322 Seen so far: 12864 samples Training loss (for one batch) at step 400: 1.0486 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.6610 Seen so far: 38464 samples
您可以在從頭開始編寫的此類訓練迴圈中輕鬆重複使用內建度量 (或您編寫的自訂度量)。以下是流程
- 在迴圈開始時具現化度量
- 在每個批次之後呼叫
- 當您需要顯示度量的目前值時,呼叫
- 當您需要清除度量的狀態時 (通常在週期結束時),呼叫
讓我們使用此知識來計算每個週期結束時驗證資料的 SparseCategoricalAccuracy
# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)
# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()
import time
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
start_time = time.time()
# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
with tf.GradientTape() as tape:
logits = model(x_batch_train, training=True)
loss_value = loss_fn(y_batch_train, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
# Update training metric.
train_acc_metric.update_state(y_batch_train, logits)
# Log every 200 batches.
if step % 200 == 0:
"Training loss (for one batch) at step %d: %.4f"
% (step, float(loss_value))
print("Seen so far: %d samples" % ((step + 1) * batch_size))
# Display metrics at the end of each epoch.
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
# Reset training metrics at the end of each epoch
# Run a validation loop at the end of each epoch.
for x_batch_val, y_batch_val in val_dataset:
val_logits = model(x_batch_val, training=False)
# Update val metrics
val_acc_metric.update_state(y_batch_val, val_logits)
val_acc = val_acc_metric.result()
print("Validation acc: %.4f" % (float(val_acc),))
print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0 Training loss (for one batch) at step 0: 106.2691 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.9259 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.9347 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.7641 Seen so far: 38464 samples Training acc over epoch: 0.7332 Validation acc: 0.8325 Time taken: 10.95s Start of epoch 1 Training loss (for one batch) at step 0: 0.5238 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.7125 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.5705 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.6006 Seen so far: 38464 samples Training acc over epoch: 0.8424 Validation acc: 0.8525 Time taken: 10.59s
使用 tf.function
TensorFlow 2 中的預設執行階段是 即時執行。因此,我們上面的訓練迴圈會以即時方式執行。
您可以將任何以張量作為輸入的函式編譯成靜態圖形。只需在其上新增 @tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
def test_step(x, y):
val_logits = model(x, training=False)
val_acc_metric.update_state(y, val_logits)
import time
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
start_time = time.time()
# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
loss_value = train_step(x_batch_train, y_batch_train)
# Log every 200 batches.
if step % 200 == 0:
"Training loss (for one batch) at step %d: %.4f"
% (step, float(loss_value))
print("Seen so far: %d samples" % ((step + 1) * batch_size))
# Display metrics at the end of each epoch.
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
# Reset training metrics at the end of each epoch
# Run a validation loop at the end of each epoch.
for x_batch_val, y_batch_val in val_dataset:
test_step(x_batch_val, y_batch_val)
val_acc = val_acc_metric.result()
print("Validation acc: %.4f" % (float(val_acc),))
print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0 Training loss (for one batch) at step 0: 0.5162 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.4599 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.3975 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.2557 Seen so far: 38464 samples Training acc over epoch: 0.8747 Validation acc: 0.8545 Time taken: 1.85s Start of epoch 1 Training loss (for one batch) at step 0: 0.6145 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.3751 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.3464 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.4128 Seen so far: 38464 samples Training acc over epoch: 0.8919 Validation acc: 0.8996 Time taken: 1.34s
層和模型會遞迴追蹤在正向傳遞期間由呼叫 self.add_loss(value)
的層建立的任何損失。產生的純量損失值清單可透過正向傳遞結束時的屬性 model.losses
class ActivityRegularizationLayer(layers.Layer):
def call(self, inputs):
self.add_loss(1e-2 * tf.reduce_sum(inputs))
return inputs
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
# Add any extra losses created during the forward pass.
loss_value += sum(model.losses)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
總之,以下是一個簡單的端對端範例,它將您在本指南中學到的所有內容結合在一起:在 MNIST 數字上訓練的 DCGAN。
端對端範例:從頭開始的 GAN 訓練迴圈
您可能熟悉生成對抗網路 (GAN)。GAN 可以產生看起來幾乎真實的新影像,方法是學習影像訓練資料集的潛在分佈 (影像的「潛在空間」)。
GAN 由兩個部分組成:「產生器」模型,將潛在空間中的點對應到影像空間中的點;「鑑別器」模型,一種分類器,可以分辨真實影像 (來自訓練資料集) 和偽造影像 (產生器網路的輸出) 之間的差異。
GAN 訓練迴圈如下所示
1) 訓練鑑別器。- 在潛在空間中取樣一批隨機點。- 透過「產生器」模型將點轉換為偽造影像。- 取得一批真實影像,並將其與產生的影像結合。- 訓練「鑑別器」模型以分類產生的影像與真實影像。
2) 訓練產生器。- 在潛在空間中取樣隨機點。- 透過「產生器」網路將點轉換為偽造影像。- 取得一批真實影像,並將其與產生的影像結合。- 訓練「產生器」模型以「欺騙」鑑別器,並將偽造影像分類為真實影像。
如需 GAN 運作方式的更詳細總覽,請參閱Deep Learning with Python。
discriminator = keras.Sequential(
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
Model: "discriminator" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= conv2d (Conv2D) (None, 14, 14, 64) 640 leaky_re_lu (LeakyReLU) (None, 14, 14, 64) 0 conv2d_1 (Conv2D) (None, 7, 7, 128) 73856 leaky_re_lu_1 (LeakyReLU) (None, 7, 7, 128) 0 global_max_pooling2d (Glob (None, 128) 0 alMaxPooling2D) dense_4 (Dense) (None, 1) 129 ================================================================= Total params: 74625 (291.50 KB) Trainable params: 74625 (291.50 KB) Non-trainable params: 0 (0.00 Byte) _________________________________________________________________
然後讓我們建立一個產生器網路,將潛在向量轉換為形狀為 (28, 28, 1)
的輸出 (表示 MNIST 數字)
latent_dim = 128
generator = keras.Sequential(
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
這是關鍵部分:訓練迴圈。如您所見,它非常簡單。訓練步驟函式僅需 17 行。
# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)
# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
def train_step(real_images):
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
# Decode them to fake images
generated_images = generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(labels.shape)
# Train the discriminator
with tf.GradientTape() as tape:
predictions = discriminator(combined_images)
d_loss = loss_fn(labels, predictions)
grads = tape.gradient(d_loss, discriminator.trainable_weights)
d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = discriminator(generator(random_latent_vectors))
g_loss = loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, generator.trainable_weights)
g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
return d_loss, g_loss, generated_images
讓我們透過重複在影像批次上呼叫 train_step
來訓練我們的 GAN。
由於我們的鑑別器和產生器是 convnet,因此您會想要在 GPU 上執行此程式碼。
import os
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
epochs = 1 # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"
for epoch in range(epochs):
print("\nStart epoch", epoch)
for step, real_images in enumerate(dataset):
# Train the discriminator & generator on one batch of real images.
d_loss, g_loss, generated_images = train_step(real_images)
# Logging.
if step % 200 == 0:
# Print metrics
print("discriminator loss at step %d: %.2f" % (step, d_loss))
print("adversarial loss at step %d: %.2f" % (step, g_loss))
# Save one generated image
img = keras.utils.array_to_img(generated_images[0] * 255.0, scale=False)
img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))
# To limit execution time we stop after 10 steps.
# Remove the lines below to actually train the model!
if step > 10:
Start epoch 0 discriminator loss at step 0: 0.72 adversarial loss at step 0: 0.72
就是這樣!在 Colab GPU 上僅訓練約 30 秒後,您將獲得外觀不錯的偽造 MNIST 數字。