![]() |
![]() |
![]() |
![]() |
總覽
本教學課程示範如何使用 Keras 模型和自訂訓練迴圈,透過 自訂訓練迴圈 tf.distribute.Strategy API 執行多 worker 分散式訓練。訓練迴圈透過 tf.distribute.MultiWorkerMirroredStrategy
分散,讓設計在單一 worker 上執行的 tf.keras
模型能夠順暢地在多個 worker 上運作,且程式碼變更幅度極小。自訂訓練迴圈提供彈性和對訓練的更大控制權,同時也讓模型偵錯更容易。深入瞭解編寫基本訓練迴圈、從頭開始編寫訓練迴圈和自訂訓練。
如果您想瞭解如何將 MultiWorkerMirroredStrategy
與 tf.keras.Model.fit
搭配使用,請參閱本教學課程。
TensorFlow 分散式訓練指南概述 TensorFlow 支援的分散策略,適合想要深入瞭解 tf.distribute.Strategy
API 的使用者。
設定
首先,匯入一些必要的程式庫。
import json
import os
import sys
在匯入 TensorFlow 之前,請先對環境進行一些變更
- 停用所有 GPU。這可防止所有 worker 嘗試使用相同 GPU 造成的錯誤。在實際應用中,每個 worker 都會在不同的機器上。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- 重設
'TF_CONFIG'
環境變數 (稍後您會看到更多相關資訊)。
os.environ.pop('TF_CONFIG', None)
- 確定目前的目錄位於 Python 的路徑中。這可讓筆記本稍後匯入
%%writefile
寫入的檔案。
if '.' not in sys.path:
sys.path.insert(0, '.')
現在匯入 TensorFlow。
import tensorflow as tf
資料集與模型定義
接下來,建立一個 mnist.py
檔案,其中包含簡單的模型和資料集設定。本教學課程中的 worker 程序會使用這個 Python 檔案
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
regularizer = tf.keras.regularizers.L2(1e-5)
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3,
activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128,
activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.Dense(10, kernel_regularizer=regularizer)
])
多 worker 設定
現在讓我們進入多 worker 訓練的世界。在 TensorFlow 中,在多部機器上進行訓練需要 'TF_CONFIG'
環境變數。每部機器可能扮演不同的角色。以下使用的 'TF_CONFIG'
變數是 JSON 字串,用於指定叢集中每個 worker 的叢集設定。這是指定叢集的預設方法,使用 cluster_resolver.TFConfigClusterResolver
,但在 distribute.cluster_resolver
模組中還有其他選項。如要進一步瞭解如何設定 'TF_CONFIG'
變數,請參閱分散式訓練指南。
描述您的叢集
以下是組態範例
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
請注意,tf_config
只是 Python 中的本機變數。如要將其用於訓練組態,請將其序列化為 JSON 並放在 'TF_CONFIG'
環境變數中。以下是與 'TF_CONFIG'
相同的 JSON 字串序列化版本
json.dumps(tf_config)
'TF_CONFIG'
有兩個元件:'cluster'
和 'task'
。
'cluster'
對所有 worker 而言都相同,並提供訓練叢集的相關資訊,這是一個包含不同類型工作 (例如'worker'
) 的字典。在使用MultiWorkerMirroredStrategy
進行多 worker 訓練時,通常會有一個'worker'
承擔較多責任,例如儲存檢查點和寫入 TensorBoard 的摘要檔案,以及一般'worker'
的工作。這類 worker 稱為'chief'
worker,通常會指定'index'
為 0 的'worker'
作為 chief worker。'task'
提供目前工作的相關資訊,而且每個 worker 都不同。它會指定該 worker 的'type'
和'index'
。
在本範例中,您將工作 'type'
設為 'worker'
,並將工作 'index'
設為 0。這部機器是第一個 worker,將被指定為 chief worker,並執行比其他 worker 更多的工作。請注意,其他機器也需要設定 'TF_CONFIG'
環境變數,而且應該具有相同的 'cluster'
字典,但工作 'type'
或工作 'index'
則會根據這些機器的角色而有所不同。
為了說明用途,本教學課程示範如何在 'localhost'
上設定具有兩個 worker 的 'TF_CONFIG'
。實際上,使用者會在外部 IP 位址/連接埠上建立多個 worker,並在每個 worker 上適當設定 'TF_CONFIG'
。
本範例使用兩個 worker。第一個 worker 的 'TF_CONFIG'
如上所示。對於第二個 worker,請設定 tf_config['task']['index']=1
。
筆記本中的環境變數和子程序
子程序會繼承其父程序的環境變數。因此,如果您在這個 Jupyter Notebook 程序中設定環境變數
os.environ['GREETINGS'] = 'Hello TensorFlow!'
然後您就可以從子程序存取環境變數
echo ${GREETINGS}
在下一節中,您將使用此方法將 'TF_CONFIG'
傳遞至 worker 子程序。您永遠不會真正以這種方式啟動工作,但這足以用於本教學課程的目的:示範最基本的多 worker 範例。
MultiWorkerMirroredStrategy
在訓練模型之前,請先建立 tf.distribute.MultiWorkerMirroredStrategy
的執行個體
strategy = tf.distribute.MultiWorkerMirroredStrategy()
使用 tf.distribute.Strategy.scope
指定在建構模型時應使用策略。這可讓策略控制變數放置等事項,它會在所有 worker 的每個裝置上,在模型層中建立所有變數的副本。
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
跨 worker 自動分片您的資料
在多 worker 訓練中,資料集分片是確保收斂和可重現性所必需的。分片表示將整個資料集的子集交給每個 worker,這有助於建立類似於在單一 worker 上訓練的體驗。在以下範例中,您依賴 tf.distribute
的預設自動分片政策。您也可以透過設定 tf.data.experimental.DistributeOptions
的 tf.data.experimental.AutoShardPolicy
自訂它。如要進一步瞭解,請參閱分散式輸入教學課程的分片章節。
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
定義自訂訓練迴圈並訓練模型
指定最佳化工具
with strategy.scope():
# The creation of optimizer and train_accuracy needs to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
使用 tf.function
定義訓練步驟
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_example_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(per_example_loss)
model_losses = multi_worker_model.losses
if model_losses:
loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
檢查點儲存與還原
當您編寫自訂訓練迴圈時,需要手動處理檢查點儲存,而不是依賴 Keras 回呼。請注意,對於 MultiWorkerMirroredStrategy
,儲存檢查點或完整模型需要所有 worker 的參與,因為嘗試僅在 chief worker 上儲存可能會導致死鎖。Worker 也需要寫入不同的路徑,以避免互相覆寫。以下範例說明如何設定目錄
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
建立一個 tf.train.Checkpoint
來追蹤模型,該模型由 tf.train.CheckpointManager
管理,以便僅保留最新的檢查點
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# Normally, you don't need to manually instantiate a `ClusterSpec`, but in this
# illustrative example you did not set `'TF_CONFIG'` before initializing the
# strategy. Check out the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
現在,當您需要還原檢查點時,可以使用便利的 tf.train.latest_checkpoint
函式 (或呼叫 tf.train.CheckpointManager.restore_or_initialize
) 找到儲存的最新檢查點。
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
還原檢查點後,您可以繼續訓練您的自訂訓練迴圈。
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
程式碼總覽
總結目前討論的所有程序
- 您建立 worker 程序。
- 將
'TF_CONFIG'
傳遞至 worker 程序。 - 讓每個 worker 程序執行以下包含訓練程式碼的指令碼。
檔案:main.py
目前的目錄現在包含兩個 Python 檔案
ls *.py
因此,將 'TF_CONFIG'
JSON 序列化並新增至環境變數
os.environ['TF_CONFIG'] = json.dumps(tf_config)
現在,您可以啟動 worker 程序,該程序將執行 main.py
並使用 'TF_CONFIG'
# first kill any previous runs
%killbgscripts
python main.py &> job_0.log
關於上述指令,有幾件事要注意
- 它使用
%%bash
,這是筆記本「magic」指令,可執行一些 bash 指令。 - 它使用
--bg
旗標在背景執行bash
程序,因為此 worker 不會終止。它會等待所有 worker 就緒後才開始。
背景 worker 程序不會將輸出列印到此筆記本。&>
會將其輸出重新導向至檔案,以便您可以檢查發生了什麼事。
等待幾秒鐘讓程序啟動
import time
time.sleep(20)
現在,檢查到目前為止 worker 記錄檔的輸出
cat job_0.log
記錄檔的最後一行應顯示:Started server with target: grpc://127.0.0.1:12345
。第一個 worker 現在已就緒,並正在等待所有其他 worker 就緒以繼續執行。
更新 tf_config
以供第二個 worker 程序接收
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
現在啟動第二個 worker。由於所有 worker 都已啟動,這將開始訓練 (因此不需要將此程序放在背景執行)
python main.py > /dev/null 2>&1
如果您重新檢查第一個 worker 寫入的記錄,請注意它參與了該模型的訓練
cat job_0.log
# Delete the `'TF_CONFIG'`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
深入瞭解多 worker 訓練
本教學課程示範了多 worker 設定的自訂訓練迴圈工作流程。其他主題的詳細說明可在適用於自訂訓練迴圈的使用 Keras 進行多 worker 訓練 (tf.keras.Model.fit
) 教學課程中找到。
瞭解詳情
- TensorFlow 分散式訓練指南概述了可用的分散策略。
- 官方模型,其中許多模型可以設定為執行多種分散策略。
tf.function
指南中的效能章節提供關於其他策略和工具的資訊,您可以使用這些策略和工具來最佳化 TensorFlow 模型的效能。