使用 tf.distribute.Strategy 進行自訂訓練

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 下載筆記本

本教學課程示範如何搭配自訂訓練迴圈使用 tf.distribute.Strategy (TensorFlow API,提供在多個處理單元 (GPU、多部機器或 TPU) 間分配訓練作業的抽象概念)。在本範例中,您將在 Fashion MNIST 資料集上訓練簡單的卷積神經網路,該資料集包含 70,000 張大小為 28 x 28 的圖片。

自訂訓練迴圈提供彈性,並可更完善地掌控訓練作業。此外,也能更輕鬆地偵錯模型和訓練迴圈。

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)

下載 Fashion MNIST 資料集

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Add a dimension to the array -> new shape == (28, 28, 1)
# This is done because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Scale the images to the [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

建立策略以分配變數和圖表

tf.distribute.MirroredStrategy 策略如何運作?

  • 所有變數和模型圖表都會在副本間複製。
  • 輸入會在副本間平均分配。
  • 每個副本都會針對收到的輸入計算損失和梯度。
  • 梯度會透過加總在所有副本間同步處理。
  • 同步處理後,系統會對每個副本上的變數副本進行相同的更新。
# If the list of devices is not specified in
# `tf.distribute.MirroredStrategy` constructor, they will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

設定輸入管線

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

建立資料集並加以分配

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

建立模型

使用 tf.keras.Sequential 建立模型。您也可以使用模型子類別化 APIFunctional API 來執行此操作。

def create_model():
  regularizer = tf.keras.regularizers.L2(1e-5)
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3,
                             activation='relu',
                             kernel_regularizer=regularizer),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3,
                             activation='relu',
                             kernel_regularizer=regularizer),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64,
                            activation='relu',
                            kernel_regularizer=regularizer),
      tf.keras.layers.Dense(10, kernel_regularizer=regularizer)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

定義損失函數

回想一下,損失函數包含一或兩個部分

  • 預測損失會測量模型的預測與一批訓練範例的訓練標籤之間的差距。系統會針對每個已標記的範例計算預測損失,然後透過計算平均值來減少批次中的預測損失。
  • 或者,您可以將正規化損失項新增至預測損失,以引導模型遠離訓練資料的過度擬合。常見的選擇是 L2 正規化,它會新增一個小的固定倍數 (所有模型權重的平方和),且與範例數量無關。上述模型使用 L2 正規化來示範其在以下訓練迴圈中的處理方式。

若要在具有單一 GPU/CPU 的單一機器上進行訓練,其運作方式如下

  • 系統會針對批次中的每個範例計算預測損失、在批次間加總,然後除以批次大小。
  • 正規化損失會新增至預測損失。
  • 系統會計算總損失相對於每個模型權重的梯度,而最佳化工具會根據對應的梯度更新每個模型權重。

使用 tf.distribute.Strategy 時,輸入批次會在副本之間分割。舉例來說,假設您有 4 個 GPU,每個 GPU 都有一個模型副本。一個包含 256 個輸入範例的批次會在 4 個副本之間平均分配,因此每個副本都會取得大小為 64 的批次:我們有 256 = 4*64,或一般而言 GLOBAL_BATCH_SIZE = num_replicas_in_sync * BATCH_SIZE_PER_REPLICA

每個副本都會根據其取得的訓練範例計算損失,並計算損失相對於每個模型權重的梯度。最佳化工具會負責在副本間加總這些梯度,然後再使用這些梯度來更新每個副本上的模型權重副本。

那麼,使用 tf.distribute.Strategy 時,應如何計算損失?

  • 每個副本都會計算分配給它的所有範例的預測損失,加總結果,然後除以 num_replicas_in_sync * BATCH_SIZE_PER_REPLICA,或等效的 GLOBAL_BATCH_SIZE
  • 每個副本都會計算正規化損失,並將其除以 num_replicas_in_sync

與非分散式訓練相比,所有每個副本的損失項都會縮減 1/num_replicas_in_sync 倍。另一方面,所有損失項 (或更確切地說,其梯度) 都會在副本數量之間加總,然後最佳化工具才會套用這些損失項。實際上,每個副本上的最佳化工具使用的梯度,就如同已發生 GLOBAL_BATCH_SIZE 的非分散式計算一樣。這與 Keras Model.fit 的分散式和非分散式行為一致。請參閱搭配 Keras 的分散式訓練教學課程,瞭解較大的全域批次大小如何擴充學習速率。

如何在 TensorFlow 中執行此操作?

  • 損失縮減和縮放會在 Keras Model.compileModel.fit 中自動完成

  • 如果您正在編寫自訂訓練迴圈 (如本教學課程所示),則應使用 tf.nn.compute_average_loss 加總每個範例的損失,並將總和除以全域批次大小。此函式會採用每個範例的損失和選用的範例權重做為引數,並傳回縮放後的損失。

  • 如果使用 tf.keras.losses 類別 (如下列範例所示),則需要明確指定損失縮減為 NONESUM 其中之一。預設的 AUTOSUM_OVER_BATCH_SIZEModel.fit 之外不允許使用。

    • 不允許使用 AUTO,因為使用者應明確思考他們想要的縮減,以確保在分散式情況下是正確的。
    • 不允許使用 SUM_OVER_BATCH_SIZE,因為目前它只會除以每個副本的批次大小,並將除以副本數量的部分留給使用者,這可能很容易遺漏。因此,您需要自行明確執行縮減。
  • 如果您要為具有非空白 Model.losses 清單 (例如權重正規化工具) 的模型編寫自訂訓練迴圈,則應將它們加總,並將總和除以副本數量。您可以使用 tf.nn.scale_regularization_loss 函式來執行此操作。模型程式碼本身仍不知道副本數量。

    但是,模型可以使用 Keras API (例如 Layer.add_loss(...)Layer(activity_regularizer=...)) 定義與輸入相關的正規化損失。對於 Layer.add_loss(...),建模程式碼需負責將加總的每個範例的項除以每個副本的(!) 批次大小,例如,使用 tf.math.reduce_mean()

with strategy.scope():
  # Set reduction to `NONE` so you can do the reduction yourself.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions, model_losses):
    per_example_loss = loss_object(labels, predictions)
    loss = tf.nn.compute_average_loss(per_example_loss)
    if model_losses:
      loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
    return loss

特殊情況

進階使用者也應考量下列特殊情況。

  • 輸入批次短於 GLOBAL_BATCH_SIZE 會在多個位置產生令人不快的邊緣情況。在實務上,通常最好避免這些情況,方法是允許批次使用 Dataset.repeat().batch() 跨越 epoch 邊界,並依步驟計數 (而非資料集結尾) 定義近似的 epoch。或者,Dataset.batch(drop_remainder=True) 會維護 epoch 的概念,但會捨棄最後幾個範例。

    為了說明,本範例採用較困難的方法,並允許短批次,以便每個訓練 epoch 都完全包含每個訓練範例一次。

    tf.nn.compute_average_loss() 應使用哪個分母?

    • 根據預設,在上述範例程式碼中以及在 Keras.fit() 中,預測損失的總和會除以 num_replicas_in_sync 乘以副本上看到的實際批次大小 (空白批次會靜默忽略)。這保留了預測損失與正規化損失之間的平衡。這特別適用於使用與輸入相關的正規化損失的模型。純 L2 正規化只會將權重衰減疊加到預測損失的梯度上,因此比較不需要這種平衡。
    • 在實務上,許多自訂訓練迴圈會將常數 Python 值傳遞到 tf.nn.compute_average_loss(..., global_batch_size=GLOBAL_BATCH_SIZE) 以用作分母。這保留了批次之間訓練範例的相對權重。如果沒有它,短批次中較小的分母會有效地加權這些批次中的範例。(在 TensorFlow 2.13 之前,如果某些副本收到的實際批次大小為零,也需要這樣做才能避免 NaN。)

    如果避免短批次 (如上所述),則這兩個選項是等效的。

  • 多維 labels 需要您在每個範例中預測數量之間平均 per_example_loss。以輸入圖片的所有像素的分類工作為例,其中 predictions 的形狀為 (batch_size, H, W, n_classes),而 labels 的形狀為 (batch_size, H, W)。您需要更新 per_example_loss,如下所示:per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

定義指標以追蹤損失和準確性

這些指標會追蹤測試損失以及訓練和測試準確性。您可以使用 .result() 隨時取得累積的統計資料。

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

訓練迴圈

# A model, an optimizer, and a checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions, model.losses)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print(template.format(epoch + 1, train_loss,
                         train_accuracy.result() * 100, test_loss.result(),
                         test_accuracy.result() * 100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()

上述範例中需要注意的事項

還原最新的檢查點並進行測試

使用 tf.distribute.Strategy 檢查點的模型可以使用或不使用策略來還原。

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result() * 100))

疊代資料集的其他方式

使用疊代器

如果您想要疊代給定步驟數,而不是疊代整個資料集,則可以使用 iter 呼叫建立疊代器,並明確呼叫疊代器上的 next。您可以選擇在 tf.function 內外疊代資料集。以下是小型程式碼片段,示範如何在 tf.function 之外使用疊代器疊代資料集。

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print(template.format(epoch + 1, average_train_loss, train_accuracy.result() * 100))
  train_accuracy.reset_states()

tf.function 內疊代

您也可以使用 for x in ... 結構或透過建立疊代器 (就像您在上方所做的那樣),在 tf.function 內疊代整個輸入 train_dist_dataset。下列範例示範如何使用 @tf.function 裝飾器包裝一個 epoch 的訓練,並在函式內疊代 train_dist_dataset

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print(template.format(epoch + 1, train_loss, train_accuracy.result() * 100))

  train_accuracy.reset_states()

追蹤跨副本的訓練損失

由於執行的損失縮放計算,因此不建議使用 tf.keras.metrics.Mean 來追蹤跨不同副本的訓練損失。

例如,如果您執行具有下列特徵的訓練工作

  • 兩個副本
  • 每個副本上處理兩個範例
  • 產生的損失值:每個副本上 [2, 3] 和 [4, 5]
  • 全域批次大小 = 4

透過損失縮放,您可以透過新增損失值,然後除以全域批次大小,來計算每個副本上的每個範例損失值。在本例中:(2 + 3) / 4 = 1.25(4 + 5) / 4 = 2.25

如果您使用 tf.keras.metrics.Mean 來追蹤跨兩個副本的損失,則結果會有所不同。在本範例中,您最終會得到 3.50 的 total 和 2 的 count,當在指標上呼叫 result() 時,會產生 total/count = 1.75。使用 tf.keras.Metrics 計算的損失會再縮放一個等於同步副本數量的因素。

指南和範例

以下是一些搭配自訂訓練迴圈使用分散式策略的範例

  1. 分散式訓練指南
  2. 使用 MirroredStrategyDenseNet 範例。
  3. 使用 MirroredStrategyTPUStrategy 訓練的 BERT 範例。此範例對於瞭解如何從檢查點載入,以及在分散式訓練期間產生定期檢查點等特別有幫助。
  4. 使用 MirroredStrategy 訓練的 NCF 範例,可以使用 keras_use_ctl 旗標啟用該範例。
  5. 使用 MirroredStrategy 訓練的 NMT 範例。

您可以在分散式策略指南中的「範例和教學課程」下方找到更多範例。

後續步驟