![]() |
![]() |
![]() |
![]() |
總覽
本教學課程示範如何使用 Keras 模型和 Model.fit
API,透過 tf.distribute.MultiWorkerMirroredStrategy
API 執行多 Worker 分散式訓練。藉由此策略的協助,原本設計在單一 Worker 上執行的 Keras 模型,可以順暢地在多個 Worker 上運作,而且程式碼變更極少。
若要瞭解如何將 MultiWorkerMirroredStrategy
與 Keras 和自訂訓練迴圈搭配使用,請參閱搭配 Keras 和 MultiWorkerMirroredStrategy 的自訂訓練迴圈。
本教學課程包含一個使用兩個 Worker 的最簡多 Worker 範例,僅供示範用途。
選擇合適的策略
在深入探討之前,請確認 tf.distribute.MultiWorkerMirroredStrategy
是否適合您的加速器和訓練。以下是兩種常見的使用資料平行處理分散訓練的方式
- 同步訓練:訓練步驟會在 Worker 和副本之間同步處理,例如
tf.distribute.MirroredStrategy
、tf.distribute.TPUStrategy
和tf.distribute.MultiWorkerMirroredStrategy
。所有 Worker 會同步處理不同輸入資料切片的訓練,並在每個步驟彙總梯度。 - 非同步訓練:訓練步驟並未嚴格同步處理,例如
tf.distribute.experimental.ParameterServerStrategy
。所有 Worker 會獨立處理輸入資料的訓練,並以非同步方式更新變數。
如果您正在尋找不含 TPU 的多 Worker 同步訓練,tf.distribute.MultiWorkerMirroredStrategy
是您的理想選擇。它會在所有 Worker 的每個裝置上,建立模型層中所有變數的副本。它會使用 CollectiveOps
(一種 TensorFlow 運算,用於集體通訊) 來彙總梯度,並保持變數同步。有興趣者可以查看 tf.distribute.experimental.CommunicationOptions
參數,以瞭解集體實作選項。
如需 tf.distribute.Strategy
API 的總覽,請參閱TensorFlow 中的分散式訓練。
設定
從一些必要的匯入開始
import json
import os
import sys
匯入 TensorFlow 之前,先對環境進行一些變更
- 在真實世界的應用程式中,每個 Worker 都會在不同的機器上。在本教學課程中,所有 Worker 都會在這部機器上執行。因此,停用所有 GPU,以防止所有 Worker 嘗試使用同一個 GPU 而造成錯誤。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- 重設
TF_CONFIG
環境變數 (稍後您會進一步瞭解)
os.environ.pop('TF_CONFIG', None)
- 確認目前的目錄位於 Python 的路徑中,這可讓筆記本稍後匯入
%%writefile
寫入的檔案
if '.' not in sys.path:
sys.path.insert(0, '.')
安裝 tf-nightly
,因為從 TensorFlow 2.10 開始,在特定步驟中使用 tf.keras.callbacks.BackupAndRestore
中的 save_freq
引數儲存檢查點的頻率才推出
pip install tf-nightly
最後,匯入 TensorFlow
import tensorflow as tf
資料集和模型定義
接下來,建立一個 mnist_setup.py
檔案,其中包含簡單的模型和資料集設定。本教學課程中的 Worker 處理程序會使用這個 Python 檔案
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
在單一 Worker 上進行模型訓練
嘗試訓練模型幾個週期,並觀察單一 Worker 的結果,以確保一切運作正常。隨著訓練進度,損失應會下降,而準確度應會提高。
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
多 Worker 設定
現在,讓我們進入多 Worker 訓練的世界。
具有工作和任務的叢集
在 TensorFlow 中,分散式訓練牽涉到具有多個工作的 'cluster'
,而每個工作可能有一個或多個 'task'
。
您會需要 TF_CONFIG
設定環境變數,才能在多部機器上進行訓練,而每部機器可能具有不同的角色。TF_CONFIG
是一個 JSON 字串,用於指定叢集中每個 Worker 的叢集設定。
TF_CONFIG
變數有兩個元件:'cluster'
和 'task'
。
'cluster'
對於所有 Worker 都是相同的,並提供有關訓練叢集的資訊,這是一個字典,其中包含不同類型的工作,例如'worker'
或'chief'
。- 在使用
tf.distribute.MultiWorkerMirroredStrategy
進行多 Worker 訓練時,通常會有一個'worker'
承擔更多責任,例如儲存檢查點和寫入 TensorBoard 的摘要檔案,以及一般'worker'
所做的事情。這類'worker'
稱為主要 Worker (工作名稱為'chief'
)。 - 慣例上,
'index'
為0
的 Worker 會是'chief'
。
- 在使用
'task'
提供目前任務的相關資訊,而且每個 Worker 都不同。它會指定該 Worker 的'type'
和'index'
。
以下是設定範例
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
請注意,tf_config
只是 Python 中的本機變數。若要將其用於訓練設定,請將其序列化為 JSON,並放入 TF_CONFIG
環境變數中。
json.dumps(tf_config)
在上述設定範例中,您將工作 'type'
設定為 'worker'
,並將工作 'index'
設定為 0
。因此,這部機器是第一個 Worker。它會被指派為 'chief'
Worker。
實際上,您會在外部 IP 位址/連接埠上建立多個 Worker,並據此在每個 Worker 上設定 TF_CONFIG
變數。為了便於說明,本教學課程將示範如何在 localhost
上設定具有兩個 Worker 的 TF_CONFIG
變數
- 第一個 (
'chief'
) Worker 的TF_CONFIG
如上所示。 - 對於第二個 Worker,您會將
tf_config['task']['index']=1
設定為
筆記本中的環境變數和子處理程序
子處理程序會繼承其父項的環境變數。因此,如果您在這個 Jupyter 筆記本處理程序中設定環境變數
os.environ['GREETINGS'] = 'Hello TensorFlow!'
...那麼您可以從子處理程序存取環境變數
echo ${GREETINGS}
在下一個章節中,您會使用此方法將 TF_CONFIG
傳遞至 Worker 子處理程序。在真實世界的情境中,您永遠不會真的以這種方式啟動工作,本教學課程只是示範如何使用最簡多 Worker 範例來完成。
訓練模型
若要訓練模型,首先建立 tf.distribute.MultiWorkerMirroredStrategy
的執行個體
strategy = tf.distribute.MultiWorkerMirroredStrategy()
透過將 tf.distribute.Strategy
API 整合到 tf.keras
中,您將對分散訓練到多個 Worker 所做的唯一變更,就是將模型建構和 model.compile()
呼叫封閉在 strategy.scope()
內部。分散式策略的範圍會決定變數的建立方式和位置;對於 MultiWorkerMirroredStrategy
而言,建立的變數是 MirroredVariable
,而且會在每個 Worker 上複製。
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
若要實際使用 MultiWorkerMirroredStrategy
執行,您需要執行 Worker 處理程序,並將 TF_CONFIG
傳遞給這些處理程序。
如同先前寫入的 mnist_setup.py
檔案,以下是每個 Worker 將執行的 main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
在上述程式碼片段中,請注意傳遞至 Dataset.batch
的 global_batch_size
設定為 per_worker_batch_size * num_workers
。這可確保每個 Worker 處理 per_worker_batch_size
個範例的批次,而不論 Worker 數量為何。
目前的目錄現在同時包含這兩個 Python 檔案
ls *.py
將 TF_CONFIG
序列化為 JSON,並將其新增至環境變數
os.environ['TF_CONFIG'] = json.dumps(tf_config)
現在,您可以啟動 Worker 處理程序,以執行 main.py
並使用 TF_CONFIG
# first kill any previous runs
%killbgscripts
python main.py &> job_0.log
關於上述指令,有幾件事要注意
- 它使用
%%bash
,這是 筆記本「magic」,可執行一些 bash 指令。 - 它使用
--bg
旗標在背景執行bash
處理程序,因為此 Worker 不會終止。它會等待所有 Worker,然後才開始。
背景 Worker 處理程序不會將輸出列印到此筆記本,因此 &>
會將其輸出重新導向至檔案,以便您稍後檢查記錄檔中發生的情況。
因此,請稍候幾秒鐘讓處理程序啟動
import time
time.sleep(10)
現在,檢查到目前為止 Worker 的記錄檔輸出內容
cat job_0.log
記錄檔的最後一行應為:Started server with target: grpc://127.0.0.1:12345
。第一個 Worker 現在已就緒,並等待所有其他 Worker 準備好繼續進行。
因此,請更新 tf_config
,以供第二個 Worker 的處理程序接收
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
啟動第二個 Worker。由於所有 Worker 都已啟動,這將會開始訓練 (因此不需要將此處理程序放在背景中)
python main.py
如果您重新檢查第一個 Worker 寫入的記錄,您會瞭解到它參與了該模型的訓練
cat job_0.log
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
深入瞭解多 Worker 訓練
到目前為止,您已瞭解如何執行基本的多 Worker 設定。本教學課程的其餘部分會詳細介紹其他因素,這些因素對於實際使用案例可能很有用或很重要。
資料集分片
在多 Worker 訓練中,需要資料集分片,以確保收斂和效能。
前一個章節中的範例仰賴 tf.distribute.Strategy
API 提供的預設自動分片。您可以透過設定 tf.data.experimental.DistributeOptions
的 tf.data.experimental.AutoShardPolicy
,來控制分片。
若要進一步瞭解自動分片,請參閱分散式輸入指南。
以下快速範例說明如何關閉自動分片,以便每個副本處理每個範例 (不建議)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
評估
如果您也將 validation_data
傳遞至 Model.fit
,則每個週期都會在訓練和評估之間交替。評估工作會分散在同一組 Worker 中,而其結果會彙總並提供給所有 Worker。
與訓練類似,驗證資料集會在檔案層級自動分片。您需要在驗證資料集中設定全域批次大小,並設定 validation_steps
。
建議重複的資料集 (透過呼叫 tf.data.Dataset.repeat
) 用於評估。
或者,您也可以建立另一個任務,定期讀取檢查點並執行評估。這就是 Estimator 的作用。但這不是執行評估的建議方式,因此省略其詳細資訊。
效能
若要調整多 Worker 訓練的效能,您可以嘗試下列方法
tf.distribute.MultiWorkerMirroredStrategy
提供多個 集體通訊實作RING
使用 gRPC 作為跨主機通訊層,實作以環狀為基礎的集體通訊。NCCL
使用 NVIDIA Collective Communication Library 來實作集體通訊。AUTO
將選擇權延後至執行階段。
集體實作的最佳選擇取決於 GPU 數量、GPU 類型和叢集中的網路互連。若要覆寫自動選擇,請指定
MultiWorkerMirroredStrategy
建構函式的communication_options
參數。例如communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
盡可能將變數轉換為
tf.float
- 官方 ResNet 模型包含範例,說明如何執行此操作。
容錯能力
在同步訓練中,如果其中一個 Worker 失敗,而且沒有失敗復原機制,叢集就會失敗。
將 Keras 與 tf.distribute.Strategy
搭配使用,在 Worker 停止運作或變得不穩定的情況下,具有容錯能力的優點。您可以透過在您選擇的分散式檔案系統中保留訓練狀態來執行此操作,以便在先前失敗或遭到搶佔的執行個體重新啟動時,復原訓練狀態。
當 Worker 變得無法使用時,其他 Worker 也會失敗 (可能在逾時之後)。在這種情況下,需要重新啟動無法使用的 Worker,以及其他已失敗的 Worker。
ModelCheckpoint
回呼
ModelCheckpoint
回呼不再提供容錯能力功能,請改用 BackupAndRestore
回呼。
仍然可以使用 ModelCheckpoint
回呼來儲存檢查點。但是,如果訓練中斷或成功完成,為了從檢查點繼續訓練,使用者有責任手動載入模型。
或者,使用者可以選擇在 ModelCheckpoint
回呼之外儲存和還原模型/權重。
模型儲存和載入
若要使用 model.save
或 tf.saved_model.save
儲存模型,則每個 Worker 的儲存目的地都必須不同。
- 對於非主要 Worker,您需要將模型儲存到暫時目錄。
- 對於主要 Worker,您需要儲存到提供的模型目錄。
Worker 上的暫時目錄必須是唯一的,以防止多個 Worker 嘗試寫入同一個位置而造成錯誤。
所有目錄中儲存的模型都相同,通常只有主要 Worker 儲存的模型才應參考用於還原或服務。
您應該具備一些清理邏輯,可在訓練完成後刪除 Worker 建立的暫時目錄。
同時在主要 Worker 和 Worker 上儲存的原因是,您可能會在檢查點期間彙總變數,這需要主要 Worker 和 Worker 都參與 allreduce 通訊協定。另一方面,讓主要 Worker 和 Worker 儲存到同一個模型目錄,會因為爭用而導致錯誤。
使用 MultiWorkerMirroredStrategy
時,程式會在每個 Worker 上執行,為了瞭解目前的 Worker 是否為主要 Worker,它會利用叢集解析器物件,該物件具有屬性 task_type
和 task_id
task_type
會告訴您目前的工作類型 (例如,'worker'
)。task_id
會告訴您 Worker 的識別碼。task_id == 0
的 Worker 會指定為主要 Worker。
在下列程式碼片段中,write_filepath
函式提供要寫入的檔案路徑,這取決於 Worker 的 task_id
- 對於主要 Worker (具有
task_id == 0
),它會寫入原始檔案路徑。 - 對於其他 Worker,它會建立暫時目錄—
temp_dir
—在目錄路徑中使用task_id
進行寫入
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configurations.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type` is `None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
如此一來,您現在就可以儲存了
multi_worker_model.save(write_model_path)
如上所述,稍後模型應僅從主要 Worker 儲存到的檔案路徑載入。因此,請移除非主要 Worker 儲存的暫時檔案
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
現在,當需要載入時,請使用方便的 tf.keras.models.load_model
API,並繼續後續工作。
在這裡,假設只使用單一 Worker 載入並繼續訓練,在這種情況下,您不會在另一個 strategy.scope()
內呼叫 tf.keras.models.load_model
(請注意 strategy = tf.distribute.MultiWorkerMirroredStrategy()
,如先前定義)
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
檢查點儲存和還原
另一方面,檢查點允許您儲存模型的權重並還原這些權重,而無需儲存整個模型。
在這裡,您將建立一個 tf.train.Checkpoint
,追蹤由 tf.train.CheckpointManager
管理的模型,以便只保留最新的檢查點
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
設定 CheckpointManager
後,您現在可以儲存並移除非主要 Worker 儲存的檢查點
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
現在,當您需要還原模型時,可以使用方便的 tf.train.latest_checkpoint
函式,找到儲存的最新檢查點。還原檢查點後,您可以繼續訓練。
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
BackupAndRestore
回呼
tf.keras.callbacks.BackupAndRestore
回呼透過在 BackupAndRestore
的 backup_dir
引數下的暫時檢查點檔案中備份模型和目前的訓練狀態,來提供容錯能力功能。
一旦任務中斷並重新啟動,BackupAndRestore
回呼函數會還原上次的檢查點,您可以從上次儲存訓練狀態的 epoch 和步驟的開頭繼續訓練。
若要使用此功能,請在 tf.keras.callbacks.BackupAndRestore
的 Model.fit
呼叫中提供一個執行個體。
使用 MultiWorkerMirroredStrategy
時,如果某個 worker 中斷,整個叢集會暫停,直到中斷的 worker 重新啟動為止。其他 worker 也會重新啟動,而中斷的 worker 將會重新加入叢集。然後,每個 worker 都會讀取先前儲存的檢查點檔案並恢復其先前的狀態,從而使叢集恢復同步。接著,訓練將會繼續。分散式資料集迭代器狀態將會重新初始化,而不會還原。
BackupAndRestore
回呼函數使用 CheckpointManager
來儲存和還原訓練狀態,這會產生一個名為 checkpoint 的檔案,用於追蹤現有的檢查點以及最新的檢查點。因此,為了避免名稱衝突,backup_dir
不應重複使用來儲存其他檢查點。
目前,BackupAndRestore
回呼函數支援不使用策略的單一 worker 訓練—MirroredStrategy
—以及使用 MultiWorkerMirroredStrategy
的多 worker 訓練。
以下是多 worker 訓練和單一 worker 訓練的兩個範例
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback. The training state
# is backed up at epoch boundaries by default.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
如果 BackupAndRestore
回呼函數中的 save_freq
引數設定為 'epoch'
,則模型會在每個 epoch 之後備份。
# The training state is backed up at epoch boundaries because `save_freq` is
# set to `epoch`.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
如果 BackupAndRestore
回呼函數中的 save_freq
引數設定為大於 0
的整數值,則模型會在每 save_freq
個批次之後備份。
# The training state is backed up at every 30 steps because `save_freq` is set
# to an integer value of `30`.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup', save_freq=30)]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
如果您檢查在 BackupAndRestore
中指定的 backup_dir
目錄,您可能會注意到一些暫時產生的檢查點檔案。這些檔案是還原先前遺失的執行個體所必需的,並且在您的訓練成功結束時,它們將會由程式庫在 Model.fit
結束時移除。
其他資源
- TensorFlow 中的分散式訓練指南概述了可用的分散策略。
- 搭配 Keras 和 MultiWorkerMirroredStrategy 的自訂訓練迴圈教學課程示範如何搭配 Keras 和自訂訓練迴圈使用
MultiWorkerMirroredStrategy
。 - 查看官方模型,其中許多模型可以設定為執行多種分散策略。
- 使用 tf.function 提升效能指南提供了有關其他策略和工具的資訊,例如 TensorFlow Profiler,您可以使用它來最佳化 TensorFlow 模型的效能。