使用 ParameterServerStrategy 進行參數伺服器訓練

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 下載筆記本

總覽

參數伺服器訓練是一種常見的資料平行方法,可在多部機器上擴大規模模型訓練。

參數伺服器訓練叢集包含工作站參數伺服器。變數會在參數伺服器上建立,並由每個步驟中的工作站讀取和更新。根據預設,工作站會獨立讀取和更新這些變數,而不會彼此同步。這就是有時參數伺服器樣式的訓練稱為非同步訓練的原因。

在 TensorFlow 2 中,參數伺服器訓練由 tf.distribute.ParameterServerStrategy 類別提供支援,此類別會將訓練步驟分散到可擴充至數千個工作站 (以及參數伺服器) 的叢集。

支援的訓練方法

主要支援兩種訓練方法

具有工作和任務的叢集

無論選擇哪個 API (Model.fit 或自訂訓練迴圈),TensorFlow 2 中的分散式訓練都涉及包含多個 'jobs''cluster',而每個工作可能有一個或多個 'tasks'

使用參數伺服器訓練時,建議您具備

  • 一個協調器工作 (其工作名稱為 chief)
  • 多個工作站工作 (工作名稱 worker)
  • 多個參數伺服器工作 (工作名稱 ps)

協調器會建立資源、調度訓練任務、寫入檢查點,以及處理任務失敗。工作站參數伺服器會執行 tf.distribute.Server 執行個體,以監聽來自協調器的要求。

搭配 Model.fit API 的參數伺服器訓練

搭配 Model.fit API 的參數伺服器訓練需要協調器使用 tf.distribute.ParameterServerStrategy 物件。與不使用策略或使用其他策略的 Model.fit 用法類似,工作流程包括建立和編譯模型、準備回呼,以及呼叫 Model.fit

搭配自訂訓練迴圈的參數伺服器訓練

使用自訂訓練迴圈時,tf.distribute.coordinator.ClusterCoordinator 類別是協調器使用的主要元件。

ClusterCoordinator 物件提供的最重要 API 是 schedule

  • schedule API 會將 tf.function 排入佇列,並立即傳回類似 Future 的 RemoteValue
  • 排入佇列的函式將在背景執行緒中調度到遠端工作站,且其 RemoteValue 將以非同步方式填入。
  • 由於 schedule 不需要工作站指派,因此傳入的 tf.function 可以在任何可用的工作站上執行。
  • 如果執行函式的工作站在其完成前變成無法使用,則函式將在另一個可用的工作站上重試。
  • 由於此事實以及函式執行並非不可部分完成的事實,因此單一函式呼叫可能會執行多次。

除了調度遠端函式外,ClusterCoordinator 也可協助在所有工作站上建立資料集,並在工作站從失敗中復原時重建這些資料集。

教學課程設定

本教學課程將分支為 Model.fit 和自訂訓練迴圈路徑,您可以選擇符合您需求的路徑。「使用 X 進行訓練」以外的章節適用於這兩種路徑。

pip install portpicker

叢集設定

如上所述,參數伺服器訓練叢集需要一個協調器任務 (執行您的訓練程式)、一或多個工作站,以及參數伺服器任務 (執行 TensorFlow 伺服器—tf.distribute.Server),以及可能執行側車評估的其他評估任務 (請參閱下方的側車評估章節)。設定它們的需求如下

  • 協調器任務需要知道所有其他 TensorFlow 伺服器 (評估器除外) 的位址和連接埠。
  • 工作站和參數伺服器需要知道它們需要監聽哪個連接埠。為了簡化起見,您通常可以在這些任務上建立 TensorFlow 伺服器時傳入完整的叢集資訊。
  • 評估器任務不必知道訓練叢集的設定。如果知道,則不應嘗試連線到訓練叢集。
  • 工作站和參數伺服器的工作類型應分別為 "worker""ps"。由於舊版原因,協調器應使用 "chief" 作為工作類型。

在本教學課程中,您將建立一個程序內叢集,以便在 Colab 中執行整個參數伺服器訓練。您將在稍後的章節中學習如何設定實際叢集

程序內叢集

您將先預先建立多個 TensorFlow 伺服器,稍後再連線到這些伺服器。請注意,這僅用於本教學課程的示範目的,在實際訓練中,伺服器將在 "worker""ps" 機器上啟動。

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

程序內叢集設定經常在單元測試中使用,例如此處

本機測試的另一個選項是在本機機器上啟動程序—請查看搭配 Keras 的多工作站訓練以取得此方法的範例。

例項化 ParameterServerStrategy

在您深入研究訓練程式碼之前,讓我們先例項化 tf.distribute.ParameterServerStrategy 物件。請注意,無論您是要繼續使用 Model.fit 還是自訂訓練迴圈,都必須執行此操作。variable_partitioner 引數將在變數分 shard 章節中說明。

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)

為了將 GPU 用於訓練,請配置每個工作站可見的 GPU。ParameterServerStrategy 將使用每個工作站上的所有可用 GPU,但限制是所有工作站應具有相同數量的可用 GPU。

變數分 shard

變數分 shard 是指將變數分割成多個較小的變數,這些變數稱為shard。當存取這些 shard 時,變數分 shard 可能有助於分散網路負載。當使用可能不適合單一機器記憶體的超大型嵌入時,它也有助於分散一般變數在多個參數伺服器上的運算和儲存。

若要啟用變數分 shard,您可以在建構 ParameterServerStrategy 物件時傳入 variable_partitionervariable_partitioner 將在每次建立變數時叫用,並預期傳回沿著變數每個維度的 shard 數量。提供了一些現成的 variable_partitioner,例如 tf.distribute.experimental.partitioners.MinSizePartitioner。建議使用以大小為基礎的 partitioner,例如 tf.distribute.experimental.partitioners.MinSizePartitioner,以避免分割小型變數,這可能會對模型訓練速度產生負面影響。

當傳入 variable_partitioner,且您在 Strategy.scope 下直接建立變數時,該變數將變成具有 variables 屬性的容器類型,該屬性提供對 shard 清單的存取權。在大多數情況下,此容器將透過串連所有 shard 自動轉換為張量。因此,它可以作為一般變數使用。另一方面,某些 TensorFlow 方法 (例如 tf.nn.embedding_lookup) 為此容器類型提供有效率的實作,且在這些方法中將避免自動串連。

如需更多詳細資訊,請參閱 tf.distribute.ParameterServerStrategy 的 API 文件。

使用 Model.fit 進行訓練

Keras 透過 Model.fit 提供易於使用的訓練 API,可在幕後處理訓練迴圈,並具有可覆寫的 train_step 的彈性,以及提供功能 (例如檢查點儲存或 TensorBoard 摘要儲存) 的回呼。使用 Model.fit,相同的訓練程式碼可以與其他策略搭配使用,只需簡單交換策略物件即可。

輸入資料

搭配 tf.distribute.ParameterServerStrategy 的 Keras Model.fit 可以採用 tf.data.Datasettf.distribute.DistributedDatasettf.keras.utils.experimental.DatasetCreator 形式的輸入資料,其中建議使用 Dataset 以方便使用。但是,如果您在使用 Dataset 時遇到記憶體問題,您可能需要搭配可呼叫的 dataset_fn 引數使用 DatasetCreator (詳細資訊請參閱 tf.keras.utils.experimental.DatasetCreator API 文件)。

如果您將資料集轉換為 tf.data.Dataset,則應使用 Dataset.shuffleDataset.repeat,如下列程式碼範例所示。

  • 搭配參數伺服器訓練的 Keras Model.fit 假設每個工作站接收相同的資料集,但隨機排序方式不同時除外。因此,透過呼叫 Dataset.shuffle,您可以確保更均勻地疊代資料。
  • 由於工作站不會同步處理,因此它們可能會在不同的時間完成處理其資料集。因此,使用參數伺服器訓練定義 epoch 的最簡單方法是使用 Dataset.repeat (在不使用引數呼叫時,會無限期地重複資料集),並在 Model.fit 呼叫中指定 steps_per_epoch 引數。

如需關於 shufflerepeat 的更多詳細資訊,請參閱 tf.data 指南的「訓練工作流程」章節。

global_batch_size = 64

x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))

dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.batch(global_batch_size)
dataset = dataset.prefetch(2)

如果您改為使用 tf.keras.utils.experimental.DatasetCreator 建立資料集,則 dataset_fn 中的程式碼將在輸入裝置 (通常是 CPU) 上針對每個工作站機器叫用。

模型建構和編譯

現在,您將建立 tf.keras.Model (為了示範目的,這是一個簡單的 tf.keras.models.Sequential 模型),然後呼叫 Model.compile 以納入元件,例如最佳化工具、指標,以及其他參數,例如 steps_per_execution

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)

回呼和訓練

在您呼叫 Keras Model.fit 進行實際訓練之前,請準備任何所需回呼以執行常見任務,例如

  • tf.keras.callbacks.ModelCheckpoint:以特定頻率儲存模型,例如在每個 epoch 之後。
  • tf.keras.callbacks.BackupAndRestore:如果叢集遇到無法使用的情況 (例如中止或搶佔),則透過備份模型和目前 epoch 編號來提供容錯能力。然後,您可以在工作失敗後從重新啟動時還原訓練狀態,並從中斷的 epoch 開始繼續訓練。
  • tf.keras.callbacks.TensorBoard:定期在摘要檔案中寫入模型記錄,這些記錄可以在 TensorBoard 工具中視覺化。
working_dir = "/tmp/my_working_dir"
log_dir = os.path.join(working_dir, "log")
ckpt_filepath = os.path.join(working_dir, "ckpt")
backup_dir = os.path.join(working_dir, "backup")

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dataset, epochs=5, steps_per_epoch=20, callbacks=callbacks)

直接搭配 ClusterCoordinator 使用 (選用)

即使您選擇 Model.fit 訓練路徑,您也可以選擇性地實例化一個 tf.distribute.coordinator.ClusterCoordinator 物件,以排程您想要在工作站上執行的其他函式。請參閱使用自訂訓練迴圈進行訓練章節,以取得更多詳細資訊和範例。

使用自訂訓練迴圈進行訓練

搭配 tf.distribute.Strategy 使用自訂訓練迴圈,可提供極大的彈性來定義訓練迴圈。使用上述定義的 ParameterServerStrategy(作為 strategy),您將使用 tf.distribute.coordinator.ClusterCoordinator 來將訓練步驟的執行分派到遠端工作站。

然後,您將建立模型、定義資料集,並定義步驟函式,就像您在使用其他 tf.distribute.Strategy 的訓練迴圈中所做的那樣。您可以在使用 tf.distribute.Strategy 進行自訂訓練教學課程中找到更多詳細資訊。

為了確保有效率的資料集預先提取,請使用下方將訓練步驟分派到遠端工作站章節中提及的建議分散式資料集建立 API。此外,請務必在 worker_fn 內呼叫 Strategy.run,以充分利用分配給工作站的 GPU。其餘步驟與使用或不使用 GPU 進行訓練的步驟相同。

讓我們在以下步驟中建立這些元件

設定資料

首先,編寫一個建立資料集的函式。

如果您想要使用 Keras 預處理層Tensorflow Transform 層來預處理資料,請在 dataset_fn 外部Strategy.scope 下方建立這些層,就像您對任何其他 Keras 層所做的那樣。這是因為 dataset_fn 將被包裝到 tf.function 中,然後在每個工作站上執行以產生資料管線。

如果您不遵循上述程序,建立這些層可能會建立 Tensorflow 狀態,這些狀態將從 tf.function 中提升到協調器。因此,在工作站上存取它們將導致協調器和工作站之間重複的 RPC 呼叫,並導致嚴重的速度減慢。

將層放置在 Strategy.scope 下方,將改為在所有工作站上建立它們。然後,您將透過 tf.data.Dataset.mapdataset_fn 內套用轉換。請參閱分散式輸入教學課程中的資料預處理,以取得有關使用分散式輸入進行資料預處理的更多資訊。

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

在資料集中產生玩具範例

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

然後,建立包裝在 dataset_fn 中的訓練資料集

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

建立模型

接下來,建立模型和其他物件。請務必在 Strategy.scope 下方建立所有變數。

# These variables created under the `Strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(
      units=1, activation="sigmoid",
      kernel_regularizer=tf.keras.regularizers.L2(1e-4),
  )(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.legacy.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

讓我們確認使用 FixedShardsPartitioner 將所有變數分割成兩個分片,並且每個分片都分配給不同的參數伺服器

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)

print(emb_layer.weights[0].device)
print(emb_layer.weights[1].device)

定義訓練步驟

第三,建立包裝到 tf.function 中的訓練步驟

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
          reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      model_losses = model.losses
      if model_losses:
        loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
    gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

在上述訓練步驟函式中,在 step_fn 中呼叫 Strategy.runStrategy.reduce 可以支援每個工作站多個 GPU。如果工作站分配了 GPU,Strategy.run 會將資料集分散到多個複本 (GPU) 上。它們對 tf.nn.compute_average_loss() 的平行呼叫會計算一個工作站的複本 (GPU) 之間損失的平均值,與工作站總數無關。

將訓練步驟分派到遠端工作站

ParameterServerStrategy 定義所有計算之後,您將使用 tf.distribute.coordinator.ClusterCoordinator 類別來建立資源,並將訓練步驟分散到遠端工作站。

首先,讓我們建立一個 ClusterCoordinator 物件,並傳入策略物件

coordinator = tf.distribute.coordinator.ClusterCoordinator(strategy)

然後,使用 ClusterCoordinator.create_per_worker_dataset API 建立每個工作站的資料集和迭代器,這會將資料集複寫到所有工作站。在下方的 per_worker_dataset_fn 中,建議將 dataset_fn 包裝到 strategy.distribute_datasets_from_function 中,以允許有效率地將預先提取無縫地傳輸到 GPU。

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)

最後一個步驟是使用 ClusterCoordinator.schedule 將計算分散到遠端工作站

  • schedule 方法會將 tf.function 排入佇列,並立即傳回類似未來的 RemoteValueRemoteValue 會以非同步方式填入。排入佇列的函式將在背景執行緒中分派到遠端工作站,
  • join 方法 (ClusterCoordinator.join) 可用於等待直到所有排程的函式都執行完畢。
num_epochs = 4
steps_per_epoch = 5
for i in range(num_epochs):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))

以下是如何擷取 RemoteValue 的結果

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print("Final loss is %f" % loss.fetch())

或者,您可以啟動所有步驟,並在等待完成時執行某些操作

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

如需此特定範例的完整訓練和服務工作流程,請查看此測試

有關資料集建立的更多資訊

上述程式碼中的資料集是使用 ClusterCoordinator.create_per_worker_dataset API 建立的。它會為每個工作站建立一個資料集,並傳回容器物件。您可以在其上呼叫 iter 方法,以建立每個工作站的迭代器。每個工作站的迭代器包含每個工作站一個迭代器,並且在函式於特定工作站上執行之前,工作站的對應片段將取代為傳遞給 ClusterCoordinator.schedule 方法的函式的輸入引數。

ClusterCoordinator.schedule 方法假設工作站是等效的,因此假設不同工作站上的資料集是相同的(除了它們的隨機排序可能不同)。因此,也建議重複資料集,並排程有限數量的步驟,而不是依賴從資料集接收 OutOfRangeError

另一個重要的注意事項是,tf.data 資料集不支援跨任務邊界的隱含序列化和還原序列化。因此,務必在傳遞給 ClusterCoordinator.create_per_worker_dataset 的函式內建立整個資料集。create_per_worker_dataset API 也可以直接將 tf.data.Datasettf.distribute.DistributedDataset 作為輸入。

評估

使用 tf.distribute.ParameterServerStrategy 訓練執行評估的兩種主要方法是內嵌評估和側車評估。每種方法都有其自身的優缺點,如下所述。如果您沒有偏好,建議使用內嵌評估方法。對於使用 Model.fit 的使用者,Model.evaluate 在幕後使用內嵌(分散式)評估。

內嵌評估

在此方法中,協調器會在訓練和評估之間交替,因此稱為內嵌評估

內嵌評估有幾個優點。例如

  • 它可以支援單一任務無法容納的大型評估模型和評估資料集。
  • 評估結果可用於決定下一個 epoch 的訓練,例如,是否提早停止訓練。

有兩種實作內嵌評估的方法:直接評估和分散式評估。

  • 直接評估:對於小型模型和評估資料集,協調器可以使用協調器上的評估資料集,直接在分散式模型上執行評估
eval_dataset = tf.data.Dataset.from_tensor_slices(
    feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print("Evaluation accuracy: %f" % eval_accuracy.result())
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print("Evaluation accuracy: %f" % eval_accuracy.result())

啟用精確一次評估

tf.distribute.coordinator.ClusterCoordinatorschedulejoin 方法預設不支援訪問保證或精確一次語意。換句話說,在上述範例中,無法保證資料集中的所有評估範例都會被精確評估一次;有些可能不會被訪問,有些可能會被評估多次。

精確一次評估可能更適合用於減少跨 epoch 評估的變異數,並改進透過提早停止、超參數調整或其他方法完成的模型選擇。有不同的方法可以啟用精確一次評估

  • 使用 Model.fit/.evaluate 工作流程,可以透過將引數新增至 Model.compile 來啟用。請參閱 pss_evaluation_shards 引數的文件。
  • 當使用 ParameterServerStrategy 時,tf.data 服務 API 可用於為評估提供精確一次訪問(請參閱 tf.data.experimental.service API 文件中的動態分片章節)。
  • 側車評估預設提供精確一次評估,因為評估發生在單一機器上。但是,這可能比跨多個工作站執行分散式評估慢得多。

對於大多數使用者而言,第一個選項,使用 Model.compile,是建議的解決方案。

精確一次評估有一些限制

  • 不支援使用精確一次訪問保證來編寫自訂分散式評估迴圈。如果您需要這方面的支援,請提交 GitHub 問題。
  • 它無法自動處理使用 Layer.add_metric API 的指標計算。這些指標應從評估中排除,或重新設計為 Metric 物件。

側車評估

tf.distribute.ParameterServerStrategy 訓練中定義和執行評估迴圈的另一種方法稱為側車評估,您可以在其中建立專用的評估器任務,該任務重複讀取檢查點並對最新的檢查點執行評估(有關檢查點的更多詳細資訊,請參閱本指南)。協調器和工作站任務不會花費任何時間在評估上,因此對於固定次數的迭代,整體訓練時間應比使用其他評估方法短。但是,它需要額外的評估器任務和定期檢查點來觸發評估。

若要為側車評估編寫評估迴圈,您有兩個選項

  1. 使用 tf.keras.utils.SidecarEvaluator API。
  2. 建立自訂評估迴圈。

有關選項 1 的更多詳細資訊,請參閱 tf.keras.utils.SidecarEvaluator API 文件。

側車評估僅支援單一任務。這表示

  • 保證每個範例都評估一次。如果評估器被搶佔或重新啟動,它只會從最新的檢查點重新啟動評估迴圈,並且會捨棄重新啟動之前完成的部分評估進度。

  • 但是,在單一任務上執行評估表示完整評估可能需要很長時間。

  • 如果模型的大小太大而無法容納到評估器的記憶體中,則不適用單一側車評估。

另一個注意事項是,tf.keras.utils.SidecarEvaluator 實作和下方的自訂評估迴圈可能會跳過某些檢查點,因為它始終會選取可用的最新檢查點,並且在評估 epoch 期間,訓練叢集可能會產生多個檢查點。您可以編寫一個評估每個檢查點的自訂評估迴圈,但本教學課程未涵蓋。另一方面,如果檢查點產生的頻率低於執行評估所需的時間,則它可能會閒置。

自訂評估迴圈可提供對詳細資訊的更多控制,例如選擇要評估的檢查點,或提供與評估一起執行的任何其他邏輯。以下是一個可能的自訂側車評估迴圈

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epochs)):
    break

真實世界中的叢集

在真實的生產環境中,您將在不同機器上的不同程序中執行所有任務。在每個任務上設定叢集資訊的最簡單方法是設定 "TF_CONFIG" 環境變數,並使用 tf.distribute.cluster_resolver.TFConfigClusterResolver 來剖析 "TF_CONFIG"

如需 "TF_CONFIG" 環境變數的一般描述,請參閱分散式訓練指南中的「設定 TF_CONFIG 環境變數」。

如果您使用 Kubernetes 或其他組態範本啟動訓練任務,則這些範本可能已為您設定 “TF_CONFIG"

設定 "TF_CONFIG" 環境變數

假設您有 3 個工作站和 2 個參數伺服器。那麼工作站 1 的 "TF_CONFIG" 可以是

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

評估器的 "TF_CONFIG" 可以是

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

評估器的上述 "TF_CONFIG" 字串中的 "cluster" 部分是選用的。

如果您對所有任務使用相同的二進位檔

如果您偏好使用單一二進位檔執行所有這些任務,您需要在程式的一開始就讓您的程式分支到不同的角色

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run sidecar evaluation
else:
  # Run the coordinator.

以下程式碼會啟動 TensorFlow 伺服器並等待,這對於 "worker""ps" 角色很有用

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

處理任務失敗

工作站失敗

tf.distribute.coordinator.ClusterCoordinator 自訂訓練迴圈和 Model.fit 方法都為工作站失敗提供內建的容錯能力。工作站復原後,ClusterCoordinator 會在工作站上叫用資料集重新建立。

參數伺服器或協調器失敗

但是,當協調器看到參數伺服器錯誤時,它會立即引發 UnavailableErrorAbortedError。在這種情況下,您可以重新啟動協調器。協調器本身也可能變得無法使用。因此,建議使用某些工具,以免遺失訓練進度

  • 對於 Model.fit,您應該使用 BackupAndRestore 回呼,它會自動處理進度儲存和還原。有關範例,請參閱上方的回呼和訓練章節。

  • 對於自訂訓練迴圈,您應該定期檢查點模型變數,並在訓練開始之前從檢查點載入模型變數(如果有的話)。如果檢查點化了最佳化工具,則可以從 optimizer.iterations 大概推斷訓練進度

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epochs):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

擷取 RemoteValue

如果函式成功執行,則保證擷取 RemoteValue 會成功。這是因為目前傳回值會在函式執行後立即複製到協調器。如果在複製期間發生任何工作站失敗,則會在另一個可用的工作站上重試函式。因此,如果您想要最佳化效能,您可以排程沒有傳回值的函式。

錯誤報告

一旦協調器看到錯誤,例如來自參數伺服器的 UnavailableError 或其他應用程式錯誤,例如來自 tf.debugging.check_numericsInvalidArgument,它會在引發錯誤之前取消所有擱置和排入佇列的函式。擷取其對應的 RemoteValue 將會引發 CancelledError

引發錯誤後,協調器將不會引發相同的錯誤或來自已取消函式的任何錯誤。

效能提升

當您使用 tf.distribute.ParameterServerStrategytf.distribute.coordinator.ClusterCoordinator 進行訓練時,您可能會遇到效能問題,原因有幾個。

一個常見原因是參數伺服器的負載不平衡,並且某些負載過重的參數伺服器已達到容量。也可能有多個根本原因。減輕此問題的一些簡單方法是

  1. 在建構 ParameterServerStrategy 時,透過指定 variable_partitioner 來分片您的大型模型變數。
  2. 避免建立單一步驟中所有參數伺服器都需要的熱點變數,方法是

    1) 在最佳化工具中使用常數學習率或子類別 tf.keras.optimizers.schedules.LearningRateSchedule。這是因為預設行為是學習率將成為放置在特定參數伺服器上的變數,並在每個步驟中由所有其他參數伺服器請求);以及

    2) 使用 tf.keras.optimizers.legacy.Optimizer(標準 tf.keras.optimizers.Optimizer 仍可能導致熱點變數)。

  3. 在將大型詞彙表傳遞到 Keras 預處理層之前,先對其進行隨機排序。

效能問題的另一個可能原因是協調器。schedule/join 的實作是基於 Python 的,因此可能會有執行緒額外負荷。此外,協調器和工作站之間的延遲可能很大。如果是這種情況

  • 對於 Model.fit,您可以將在 Model.compile 中提供的 steps_per_execution 引數設定為大於 1 的值。

  • 對於自訂訓練迴圈,您可以將多個步驟封裝到單一 tf.function

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

隨著程式庫的進一步最佳化,希望大多數使用者在未來都不必手動封裝步驟。

此外,效能提升的一個小技巧是排程沒有傳回值的函式,如上方處理任務失敗章節中所述。

已知限制

大多數已知限制已在以上章節中涵蓋。本節提供摘要。

ParameterServerStrategy 一般

  • 每個任務(包括協調器)都需要 os.environment["grpc_fail_fast"]="use_caller",才能使容錯能力正常運作。
  • 不支援同步參數伺服器訓練。
  • 通常有必要將多個步驟封裝到單一函式中,以實現最佳效能。
  • 不支援透過 tf.saved_model.load 載入包含分片變數的 saved_model。請注意,預期使用 TensorFlow Serving 載入此類 saved_model 可以正常運作(有關詳細資訊,請參閱服務教學課程)。
  • 不支援從參數伺服器失敗中復原,而無需重新啟動協調器任務。
  • tf.lookup.StaticHashTable 的建立(通常由某些 Keras 預處理層使用,例如 tf.keras.layers.IntegerLookuptf.keras.layers.StringLookuptf.keras.layers.TextVectorization)應放置在 Strategy.scope 下方。否則,資源將放置在協調器上,並且從工作站到協調器的查閱 RPC 會產生效能影響。

Model.fit 具體事項

  • Model.fit 中需要 steps_per_epoch 引數。您可以選取提供 epoch 中適當間隔的值。
  • 由於效能原因,ParameterServerStrategy 不支援具有批次層級呼叫的自訂回呼。您應該使用適當選取的 steps_per_epoch 將這些呼叫轉換為 epoch 層級呼叫,以便它們在每 steps_per_epoch 個步驟數時呼叫。內建回呼不受影響:它們的批次層級呼叫已修改為高效能。正在計劃支援 ParameterServerStrategy 的批次層級呼叫。
  • 基於相同原因,與其他策略不同,進度列和指標僅在 epoch 邊界記錄。
  • 不支援 run_eagerly

自訂訓練迴圈具體事項