![]() |
![]() |
![]() |
![]() |
本教學課程示範如何搭配自訂訓練迴圈使用 tf.distribute.Strategy
(TensorFlow API,提供在多個處理單元 (GPU、多部機器或 TPU) 間分配訓練作業的抽象概念)。在本範例中,您將在 Fashion MNIST 資料集上訓練簡單的卷積神經網路,該資料集包含 70,000 張大小為 28 x 28 的圖片。
自訂訓練迴圈提供彈性,並可更完善地掌控訓練作業。此外,也能更輕鬆地偵錯模型和訓練迴圈。
# Import TensorFlow
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
下載 Fashion MNIST 資料集
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# Add a dimension to the array -> new shape == (28, 28, 1)
# This is done because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]
# Scale the images to the [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
建立策略以分配變數和圖表
tf.distribute.MirroredStrategy
策略如何運作?
- 所有變數和模型圖表都會在副本間複製。
- 輸入會在副本間平均分配。
- 每個副本都會針對收到的輸入計算損失和梯度。
- 梯度會透過加總在所有副本間同步處理。
- 同步處理後,系統會對每個副本上的變數副本進行相同的更新。
# If the list of devices is not specified in
# `tf.distribute.MirroredStrategy` constructor, they will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
設定輸入管線
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 10
建立資料集並加以分配
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
建立模型
使用 tf.keras.Sequential
建立模型。您也可以使用模型子類別化 API 或 Functional API 來執行此操作。
def create_model():
regularizer = tf.keras.regularizers.L2(1e-5)
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3,
activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3,
activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64,
activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.Dense(10, kernel_regularizer=regularizer)
])
return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
定義損失函數
回想一下,損失函數包含一或兩個部分
- 預測損失會測量模型的預測與一批訓練範例的訓練標籤之間的差距。系統會針對每個已標記的範例計算預測損失,然後透過計算平均值來減少批次中的預測損失。
- 或者,您可以將正規化損失項新增至預測損失,以引導模型遠離訓練資料的過度擬合。常見的選擇是 L2 正規化,它會新增一個小的固定倍數 (所有模型權重的平方和),且與範例數量無關。上述模型使用 L2 正規化來示範其在以下訓練迴圈中的處理方式。
若要在具有單一 GPU/CPU 的單一機器上進行訓練,其運作方式如下
- 系統會針對批次中的每個範例計算預測損失、在批次間加總,然後除以批次大小。
- 正規化損失會新增至預測損失。
- 系統會計算總損失相對於每個模型權重的梯度,而最佳化工具會根據對應的梯度更新每個模型權重。
使用 tf.distribute.Strategy
時,輸入批次會在副本之間分割。舉例來說,假設您有 4 個 GPU,每個 GPU 都有一個模型副本。一個包含 256 個輸入範例的批次會在 4 個副本之間平均分配,因此每個副本都會取得大小為 64 的批次:我們有 256 = 4*64
,或一般而言 GLOBAL_BATCH_SIZE = num_replicas_in_sync * BATCH_SIZE_PER_REPLICA
。
每個副本都會根據其取得的訓練範例計算損失,並計算損失相對於每個模型權重的梯度。最佳化工具會負責在副本間加總這些梯度,然後再使用這些梯度來更新每個副本上的模型權重副本。
那麼,使用 tf.distribute.Strategy
時,應如何計算損失?
- 每個副本都會計算分配給它的所有範例的預測損失,加總結果,然後除以
num_replicas_in_sync * BATCH_SIZE_PER_REPLICA
,或等效的GLOBAL_BATCH_SIZE
。 - 每個副本都會計算正規化損失,並將其除以
num_replicas_in_sync
。
與非分散式訓練相比,所有每個副本的損失項都會縮減 1/num_replicas_in_sync
倍。另一方面,所有損失項 (或更確切地說,其梯度) 都會在副本數量之間加總,然後最佳化工具才會套用這些損失項。實際上,每個副本上的最佳化工具使用的梯度,就如同已發生 GLOBAL_BATCH_SIZE
的非分散式計算一樣。這與 Keras Model.fit
的分散式和非分散式行為一致。請參閱搭配 Keras 的分散式訓練教學課程,瞭解較大的全域批次大小如何擴充學習速率。
如何在 TensorFlow 中執行此操作?
損失縮減和縮放會在 Keras
Model.compile
和Model.fit
中自動完成如果您正在編寫自訂訓練迴圈 (如本教學課程所示),則應使用
tf.nn.compute_average_loss
加總每個範例的損失,並將總和除以全域批次大小。此函式會採用每個範例的損失和選用的範例權重做為引數,並傳回縮放後的損失。如果使用
tf.keras.losses
類別 (如下列範例所示),則需要明確指定損失縮減為NONE
或SUM
其中之一。預設的AUTO
和SUM_OVER_BATCH_SIZE
在Model.fit
之外不允許使用。- 不允許使用
AUTO
,因為使用者應明確思考他們想要的縮減,以確保在分散式情況下是正確的。 - 不允許使用
SUM_OVER_BATCH_SIZE
,因為目前它只會除以每個副本的批次大小,並將除以副本數量的部分留給使用者,這可能很容易遺漏。因此,您需要自行明確執行縮減。
- 不允許使用
如果您要為具有非空白
Model.losses
清單 (例如權重正規化工具) 的模型編寫自訂訓練迴圈,則應將它們加總,並將總和除以副本數量。您可以使用tf.nn.scale_regularization_loss
函式來執行此操作。模型程式碼本身仍不知道副本數量。但是,模型可以使用 Keras API (例如
Layer.add_loss(...)
和Layer(activity_regularizer=...)
) 定義與輸入相關的正規化損失。對於Layer.add_loss(...)
,建模程式碼需負責將加總的每個範例的項除以每個副本的(!) 批次大小,例如,使用tf.math.reduce_mean()
。
with strategy.scope():
# Set reduction to `NONE` so you can do the reduction yourself.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions, model_losses):
per_example_loss = loss_object(labels, predictions)
loss = tf.nn.compute_average_loss(per_example_loss)
if model_losses:
loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
return loss
特殊情況
進階使用者也應考量下列特殊情況。
輸入批次短於
GLOBAL_BATCH_SIZE
會在多個位置產生令人不快的邊緣情況。在實務上,通常最好避免這些情況,方法是允許批次使用Dataset.repeat().batch()
跨越 epoch 邊界,並依步驟計數 (而非資料集結尾) 定義近似的 epoch。或者,Dataset.batch(drop_remainder=True)
會維護 epoch 的概念,但會捨棄最後幾個範例。為了說明,本範例採用較困難的方法,並允許短批次,以便每個訓練 epoch 都完全包含每個訓練範例一次。
tf.nn.compute_average_loss()
應使用哪個分母?- 根據預設,在上述範例程式碼中以及在
Keras.fit()
中,預測損失的總和會除以num_replicas_in_sync
乘以副本上看到的實際批次大小 (空白批次會靜默忽略)。這保留了預測損失與正規化損失之間的平衡。這特別適用於使用與輸入相關的正規化損失的模型。純 L2 正規化只會將權重衰減疊加到預測損失的梯度上,因此比較不需要這種平衡。 - 在實務上,許多自訂訓練迴圈會將常數 Python 值傳遞到
tf.nn.compute_average_loss(..., global_batch_size=GLOBAL_BATCH_SIZE)
以用作分母。這保留了批次之間訓練範例的相對權重。如果沒有它,短批次中較小的分母會有效地加權這些批次中的範例。(在 TensorFlow 2.13 之前,如果某些副本收到的實際批次大小為零,也需要這樣做才能避免 NaN。)
如果避免短批次 (如上所述),則這兩個選項是等效的。
- 根據預設,在上述範例程式碼中以及在
多維
labels
需要您在每個範例中預測數量之間平均per_example_loss
。以輸入圖片的所有像素的分類工作為例,其中predictions
的形狀為(batch_size, H, W, n_classes)
,而labels
的形狀為(batch_size, H, W)
。您需要更新per_example_loss
,如下所示:per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)
定義指標以追蹤損失和準確性
這些指標會追蹤測試損失以及訓練和測試準確性。您可以使用 .result()
隨時取得累積的統計資料。
with strategy.scope():
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='test_accuracy')
訓練迴圈
# A model, an optimizer, and a checkpoint must be created under `strategy.scope`.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions, model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
def test_step(inputs):
images, labels = inputs
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(dataset_inputs):
return strategy.run(test_step, args=(dataset_inputs,))
for epoch in range(EPOCHS):
# TRAIN LOOP
total_loss = 0.0
num_batches = 0
for x in train_dist_dataset:
total_loss += distributed_train_step(x)
num_batches += 1
train_loss = total_loss / num_batches
# TEST LOOP
for x in test_dist_dataset:
distributed_test_step(x)
if epoch % 2 == 0:
checkpoint.save(checkpoint_prefix)
template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
"Test Accuracy: {}")
print(template.format(epoch + 1, train_loss,
train_accuracy.result() * 100, test_loss.result(),
test_accuracy.result() * 100))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
上述範例中需要注意的事項
- 使用
for x in ...
結構疊代train_dist_dataset
和test_dist_dataset
。 - 縮放後的損失是
distributed_train_step
的傳回值。此值會使用tf.distribute.Strategy.reduce
呼叫在副本間彙總,然後透過加總tf.distribute.Strategy.reduce
呼叫的傳回值,在批次間彙總。 tf.keras.Metrics
應在train_step
和test_step
內更新,這些步驟由tf.distribute.Strategy.run
執行。tf.distribute.Strategy.run
會傳回策略中每個本機副本的結果,而且有多種方法可使用此結果。您可以執行tf.distribute.Strategy.reduce
以取得彙總值。您也可以執行tf.distribute.Strategy.experimental_local_results
以取得結果中包含的值清單,每個本機副本一個值。
還原最新的檢查點並進行測試
使用 tf.distribute.Strategy
檢查點的模型可以使用或不使用策略來還原。
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='eval_accuracy')
new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
predictions = new_model(images, training=False)
eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
for images, labels in test_dataset:
eval_step(images, labels)
print('Accuracy after restoring the saved model without strategy: {}'.format(
eval_accuracy.result() * 100))
疊代資料集的其他方式
使用疊代器
如果您想要疊代給定步驟數,而不是疊代整個資料集,則可以使用 iter
呼叫建立疊代器,並明確呼叫疊代器上的 next
。您可以選擇在 tf.function
內外疊代資料集。以下是小型程式碼片段,示範如何在 tf.function
之外使用疊代器疊代資料集。
for _ in range(EPOCHS):
total_loss = 0.0
num_batches = 0
train_iter = iter(train_dist_dataset)
for _ in range(10):
total_loss += distributed_train_step(next(train_iter))
num_batches += 1
average_train_loss = total_loss / num_batches
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print(template.format(epoch + 1, average_train_loss, train_accuracy.result() * 100))
train_accuracy.reset_states()
在 tf.function
內疊代
您也可以使用 for x in ...
結構或透過建立疊代器 (就像您在上方所做的那樣),在 tf.function
內疊代整個輸入 train_dist_dataset
。下列範例示範如何使用 @tf.function
裝飾器包裝一個 epoch 的訓練,並在函式內疊代 train_dist_dataset
。
@tf.function
def distributed_train_epoch(dataset):
total_loss = 0.0
num_batches = 0
for x in dataset:
per_replica_losses = strategy.run(train_step, args=(x,))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
num_batches += 1
return total_loss / tf.cast(num_batches, dtype=tf.float32)
for epoch in range(EPOCHS):
train_loss = distributed_train_epoch(train_dist_dataset)
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print(template.format(epoch + 1, train_loss, train_accuracy.result() * 100))
train_accuracy.reset_states()
追蹤跨副本的訓練損失
由於執行的損失縮放計算,因此不建議使用 tf.keras.metrics.Mean
來追蹤跨不同副本的訓練損失。
例如,如果您執行具有下列特徵的訓練工作
- 兩個副本
- 每個副本上處理兩個範例
- 產生的損失值:每個副本上 [2, 3] 和 [4, 5]
- 全域批次大小 = 4
透過損失縮放,您可以透過新增損失值,然後除以全域批次大小,來計算每個副本上的每個範例損失值。在本例中:(2 + 3) / 4 = 1.25
和 (4 + 5) / 4 = 2.25
。
如果您使用 tf.keras.metrics.Mean
來追蹤跨兩個副本的損失,則結果會有所不同。在本範例中,您最終會得到 3.50 的 total
和 2 的 count
,當在指標上呼叫 result()
時,會產生 total
/count
= 1.75。使用 tf.keras.Metrics
計算的損失會再縮放一個等於同步副本數量的因素。
指南和範例
以下是一些搭配自訂訓練迴圈使用分散式策略的範例
- 分散式訓練指南
- 使用
MirroredStrategy
的 DenseNet 範例。 - 使用
MirroredStrategy
和TPUStrategy
訓練的 BERT 範例。此範例對於瞭解如何從檢查點載入,以及在分散式訓練期間產生定期檢查點等特別有幫助。 - 使用
MirroredStrategy
訓練的 NCF 範例,可以使用keras_use_ctl
旗標啟用該範例。 - 使用
MirroredStrategy
訓練的 NMT 範例。
您可以在分散式策略指南中的「範例和教學課程」下方找到更多範例。
後續步驟
- 在您的模型上試用新的
tf.distribute.Strategy
API。 - 瀏覽 使用
tf.function
提升效能和 TensorFlow Profiler 指南,進一步瞭解最佳化 TensorFlow 模型效能的工具。 - 查看 TensorFlow 中的分散式訓練指南,其中概述了可用的分散式策略。