使用 Keras 和 MultiWorkerMirroredStrategy 的自訂訓練迴圈

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 下載筆記本

總覽

本教學課程示範如何使用 Keras 模型和自訂訓練迴圈,透過 自訂訓練迴圈 tf.distribute.Strategy API 執行多 worker 分散式訓練。訓練迴圈透過 tf.distribute.MultiWorkerMirroredStrategy 分散,讓設計在單一 worker 上執行的 tf.keras 模型能夠順暢地在多個 worker 上運作,且程式碼變更幅度極小。自訂訓練迴圈提供彈性和對訓練的更大控制權,同時也讓模型偵錯更容易。深入瞭解編寫基本訓練迴圈從頭開始編寫訓練迴圈自訂訓練

如果您想瞭解如何將 MultiWorkerMirroredStrategytf.keras.Model.fit 搭配使用,請參閱本教學課程

TensorFlow 分散式訓練指南概述 TensorFlow 支援的分散策略,適合想要深入瞭解 tf.distribute.Strategy API 的使用者。

設定

首先,匯入一些必要的程式庫。

import json
import os
import sys

在匯入 TensorFlow 之前,請先對環境進行一些變更

  • 停用所有 GPU。這可防止所有 worker 嘗試使用相同 GPU 造成的錯誤。在實際應用中,每個 worker 都會在不同的機器上。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  • 重設 'TF_CONFIG' 環境變數 (稍後您會看到更多相關資訊)。
os.environ.pop('TF_CONFIG', None)
  • 確定目前的目錄位於 Python 的路徑中。這可讓筆記本稍後匯入 %%writefile 寫入的檔案。
if '.' not in sys.path:
  sys.path.insert(0, '.')

現在匯入 TensorFlow。

import tensorflow as tf

資料集與模型定義

接下來,建立一個 mnist.py 檔案,其中包含簡單的模型和資料集設定。本教學課程中的 worker 程序會使用這個 Python 檔案

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  regularizer = tf.keras.regularizers.L2(1e-5)
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3,
                             activation='relu',
                             kernel_regularizer=regularizer),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128,
                            activation='relu',
                            kernel_regularizer=regularizer),
      tf.keras.layers.Dense(10, kernel_regularizer=regularizer)
  ])

多 worker 設定

現在讓我們進入多 worker 訓練的世界。在 TensorFlow 中,在多部機器上進行訓練需要 'TF_CONFIG' 環境變數。每部機器可能扮演不同的角色。以下使用的 'TF_CONFIG' 變數是 JSON 字串,用於指定叢集中每個 worker 的叢集設定。這是指定叢集的預設方法,使用 cluster_resolver.TFConfigClusterResolver,但在 distribute.cluster_resolver 模組中還有其他選項。如要進一步瞭解如何設定 'TF_CONFIG' 變數,請參閱分散式訓練指南

描述您的叢集

以下是組態範例

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

請注意,tf_config 只是 Python 中的本機變數。如要將其用於訓練組態,請將其序列化為 JSON 並放在 'TF_CONFIG' 環境變數中。以下是與 'TF_CONFIG' 相同的 JSON 字串序列化版本

json.dumps(tf_config)

'TF_CONFIG' 有兩個元件:'cluster''task'

  • 'cluster' 對所有 worker 而言都相同,並提供訓練叢集的相關資訊,這是一個包含不同類型工作 (例如 'worker') 的字典。在使用 MultiWorkerMirroredStrategy 進行多 worker 訓練時,通常會有一個 'worker' 承擔較多責任,例如儲存檢查點和寫入 TensorBoard 的摘要檔案,以及一般 'worker' 的工作。這類 worker 稱為 'chief' worker,通常會指定 'index' 為 0 的 'worker' 作為 chief worker。

  • 'task' 提供目前工作的相關資訊,而且每個 worker 都不同。它會指定該 worker 的 'type''index'

在本範例中,您將工作 'type' 設為 'worker',並將工作 'index' 設為 0。這部機器是第一個 worker,將被指定為 chief worker,並執行比其他 worker 更多的工作。請注意,其他機器也需要設定 'TF_CONFIG' 環境變數,而且應該具有相同的 'cluster' 字典,但工作 'type' 或工作 'index' 則會根據這些機器的角色而有所不同。

為了說明用途,本教學課程示範如何在 'localhost' 上設定具有兩個 worker 的 'TF_CONFIG'。實際上,使用者會在外部 IP 位址/連接埠上建立多個 worker,並在每個 worker 上適當設定 'TF_CONFIG'

本範例使用兩個 worker。第一個 worker 的 'TF_CONFIG' 如上所示。對於第二個 worker,請設定 tf_config['task']['index']=1

筆記本中的環境變數和子程序

子程序會繼承其父程序的環境變數。因此,如果您在這個 Jupyter Notebook 程序中設定環境變數

os.environ['GREETINGS'] = 'Hello TensorFlow!'

然後您就可以從子程序存取環境變數

echo ${GREETINGS}

在下一節中,您將使用此方法將 'TF_CONFIG' 傳遞至 worker 子程序。您永遠不會真正以這種方式啟動工作,但這足以用於本教學課程的目的:示範最基本的多 worker 範例。

MultiWorkerMirroredStrategy

在訓練模型之前,請先建立 tf.distribute.MultiWorkerMirroredStrategy 的執行個體

strategy = tf.distribute.MultiWorkerMirroredStrategy()

使用 tf.distribute.Strategy.scope 指定在建構模型時應使用策略。這可讓策略控制變數放置等事項,它會在所有 worker 的每個裝置上,在模型層中建立所有變數的副本。

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

跨 worker 自動分片您的資料

在多 worker 訓練中,資料集分片是確保收斂和可重現性所必需的。分片表示將整個資料集的子集交給每個 worker,這有助於建立類似於在單一 worker 上訓練的體驗。在以下範例中,您依賴 tf.distribute 的預設自動分片政策。您也可以透過設定 tf.data.experimental.DistributeOptionstf.data.experimental.AutoShardPolicy 自訂它。如要進一步瞭解,請參閱分散式輸入教學課程的分片章節。

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

定義自訂訓練迴圈並訓練模型

指定最佳化工具

with strategy.scope():
  # The creation of optimizer and train_accuracy needs to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

使用 tf.function 定義訓練步驟

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_example_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(per_example_loss)
      model_losses = multi_worker_model.losses
      if model_losses:
        loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

檢查點儲存與還原

當您編寫自訂訓練迴圈時,需要手動處理檢查點儲存,而不是依賴 Keras 回呼。請注意,對於 MultiWorkerMirroredStrategy,儲存檢查點或完整模型需要所有 worker 的參與,因為嘗試僅在 chief worker 上儲存可能會導致死鎖。Worker 也需要寫入不同的路徑,以避免互相覆寫。以下範例說明如何設定目錄

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

建立一個 tf.train.Checkpoint 來追蹤模型,該模型由 tf.train.CheckpointManager 管理,以便僅保留最新的檢查點

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# Normally, you don't need to manually instantiate a `ClusterSpec`, but in this
# illustrative example you did not set `'TF_CONFIG'` before initializing the
# strategy. Check out the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

現在,當您需要還原檢查點時,可以使用便利的 tf.train.latest_checkpoint 函式 (或呼叫 tf.train.CheckpointManager.restore_or_initialize) 找到儲存的最新檢查點。

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

還原檢查點後,您可以繼續訓練您的自訂訓練迴圈。

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)

程式碼總覽

總結目前討論的所有程序

  1. 您建立 worker 程序。
  2. 'TF_CONFIG' 傳遞至 worker 程序。
  3. 讓每個 worker 程序執行以下包含訓練程式碼的指令碼。

檔案:main.py

目前的目錄現在包含兩個 Python 檔案

ls *.py

因此,將 'TF_CONFIG' JSON 序列化並新增至環境變數

os.environ['TF_CONFIG'] = json.dumps(tf_config)

現在,您可以啟動 worker 程序,該程序將執行 main.py 並使用 'TF_CONFIG'

# first kill any previous runs
%killbgscripts
python main.py &> job_0.log

關於上述指令,有幾件事要注意

  1. 它使用 %%bash,這是筆記本「magic」指令,可執行一些 bash 指令。
  2. 它使用 --bg 旗標在背景執行 bash 程序,因為此 worker 不會終止。它會等待所有 worker 就緒後才開始。

背景 worker 程序不會將輸出列印到此筆記本。&> 會將其輸出重新導向至檔案,以便您可以檢查發生了什麼事。

等待幾秒鐘讓程序啟動

import time
time.sleep(20)

現在,檢查到目前為止 worker 記錄檔的輸出

cat job_0.log

記錄檔的最後一行應顯示:Started server with target: grpc://127.0.0.1:12345。第一個 worker 現在已就緒,並正在等待所有其他 worker 就緒以繼續執行。

更新 tf_config 以供第二個 worker 程序接收

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

現在啟動第二個 worker。由於所有 worker 都已啟動,這將開始訓練 (因此不需要將此程序放在背景執行)

python main.py > /dev/null 2>&1

如果您重新檢查第一個 worker 寫入的記錄,請注意它參與了該模型的訓練

cat job_0.log
# Delete the `'TF_CONFIG'`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts

深入瞭解多 worker 訓練

本教學課程示範了多 worker 設定的自訂訓練迴圈工作流程。其他主題的詳細說明可在適用於自訂訓練迴圈的使用 Keras 進行多 worker 訓練 (tf.keras.Model.fit) 教學課程中找到。

瞭解詳情

  1. TensorFlow 分散式訓練指南概述了可用的分散策略。
  2. 官方模型,其中許多模型可以設定為執行多種分散策略。
  3. tf.function 指南中的效能章節提供關於其他策略和工具的資訊,您可以使用這些策略和工具來最佳化 TensorFlow 模型的效能。