分散式訓練

分散式訓練是一種模型訓練類型,其中運算資源需求 (例如 CPU、RAM) 分散在多部電腦之間。分散式訓練可加快訓練速度,並在更大的資料集 (最多數十億個範例) 上進行訓練。

分散式訓練也適用於自動超參數最佳化,其中多個模型平行訓練。

在本文件中,您將學習如何

  • 使用分散式訓練訓練 TF-DF 模型。
  • 使用分散式訓練調整 TF-DF 模型的超參數。

限制

截至目前,分散式訓練支援

如何啟用分散式訓練

本節列出啟用分散式訓練的步驟。如需完整範例,請參閱下一節。

ParameterServerStrategy 範圍

模型和資料集在 ParameterServerStrategy 範圍中定義。

strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()
  distributed_train_dataset = strategy.distribute_datasets_from_function(dataset_fn)
model.fit(distributed_train_dataset)

資料集格式

與非分散式訓練一樣,資料集可以下列形式提供

  1. 有限的 TensorFlow 分散式資料集,或
  2. 使用 相容資料集格式 之一的資料集檔案路徑。

使用分片檔案比使用有限的 TensorFlow 分散式資料集方法 (1 行程式碼對比約 20 行程式碼) 簡單得多。但是,只有 TensorFlow 資料集方法支援 TensorFlow 預先處理。如果您的管線不包含任何預先處理,建議使用分片資料集選項。

在這兩種情況下,資料集都應分片成多個檔案,以有效率地分散資料集讀取。

設定工作站

主程序是執行定義 TensorFlow 模型的 Python 程式碼的程式。此程序未執行任何繁重的運算。有效的訓練運算由工作站完成。工作站是執行 TensorFlow 參數伺服器的程序。

應使用工作站的 IP 位址來設定主程序。這可以使用 TF_CONFIG 環境變數或建立 ClusterResolver 來完成。如需更多詳細資訊,請參閱 使用 ParameterServerStrategy 進行參數伺服器訓練

TensorFlow 的 ParameterServerStrategy 定義了兩種工作站:「工作站」和「參數伺服器」。TensorFlow 要求至少實例化每種類型的一個工作站。但是,TF-DF 僅使用「工作站」。因此,需要實例化一個「參數伺服器」,但 TF-DF 不會使用它。例如,TF-DF 訓練的設定可能如下所示

  • 1 個主程序
  • 50 個工作站
  • 1 個參數伺服器

工作站需要存取 TensorFlow 決策樹森林的自訂訓練運算。有兩種選項可啟用存取

  1. 使用預先設定的 TF-DF C++ 參數伺服器 //third_party/tensorflow_decision_forests/tensorflow/distribute:tensorflow_std_server
  2. 透過呼叫 tf.distribute.Server() 建立參數伺服器。在這種情況下,應匯入 TF-DF import tensorflow_decision_forests

範例

本節顯示分散式訓練組態的完整範例。如需更多範例,請查看 TF-DF 單位測試

範例:資料集路徑上的分散式訓練

使用 相容資料集格式 之一將您的資料集劃分為一組分片檔案。建議依下列方式命名檔案:/path/to/dataset/train-<5 位數索引>-of-<總檔案數>,例如

/path/to/dataset/train-00000-of-00100
/path/to/dataset/train-00001-of-00005
/path/to/dataset/train-00002-of-00005
...

為了獲得最高效率,檔案數應至少為工作站數的 10 倍。例如,如果您要使用 100 個工作站進行訓練,請確保資料集分為至少 1000 個檔案。

然後可以使用分片運算式 (例如

  • /path/to/dataset/train@1000
  • /path/to/dataset/train@*

完成分散式訓練的方式如下。在此範例中,資料集儲存為 TensorFlow 範例的 TFRecord (由金鑰 tfrecord+tfe 定義)。

import tensorflow_decision_forests as tfdf
import tensorflow as tf

strategy = tf.distribute.experimental.ParameterServerStrategy(...)

with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

model.fit_on_dataset_path(
    train_path="/path/to/dataset/train@1000",
    label_key="label_key",
    dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()

範例:有限 TensorFlow 分散式資料集上的分散式訓練

TF-DF 預期分散式有限工作站分片 TensorFlow 資料集

  • 分散式:非分散式資料集包裝在 strategy.distribute_datasets_from_function 中。
  • 有限:資料集應完全讀取每個範例一次。資料集不應包含任何 repeat 指令。
  • 工作站分片:每個工作站應讀取資料集的不同部分。

以下是一個範例

import tensorflow_decision_forests as tfdf
import tensorflow as tf


def dataset_fn(context, paths):
  """Create a worker-sharded finite dataset from paths.

  Like for non-distributed training, each example should be visited exactly
  once (and by only one worker) during the training. In addition, for optimal
  training speed, the reading of the examples should be distributed among the
  workers (instead of being read by a single worker, or read and discarded
  multiple times).

  In other words, don't add a "repeat" statement and make sure to shard the
  dataset at the file level and not at the example level.
  """

  # List the dataset files
  ds_path = tf.data.Dataset.from_tensor_slices(paths)

  # Make sure the dataset is used with distributed training.
  assert context is not None


  # Split the among the workers.
  #
  # Note: The "shard" is applied on the file path. The shard should not be
  # applied on the examples directly.
  # Note: You cannot use 'context.num_input_pipelines' with ParameterServerV2.
  current_worker = tfdf.keras.get_worker_idx_and_num_workers(context)
  ds_path = ds_path.shard(
      num_shards=current_worker.num_workers,
      index=current_worker.worker_idx)

  def read_csv_file(path):
    """Reads a single csv file."""

    numerical = tf.constant([0.0], dtype=tf.float32)
    categorical_string = tf.constant(["NA"], dtype=tf.string)
    csv_columns = [
        numerical,  # feature 1
        categorical_string,  # feature 2
        numerical,  # feature 3
        # ... define the features here.
    ]
    return tf.data.experimental.CsvDataset(path, csv_columns, header=True)

  ds_columns = ds_path.interleave(read_csv_file)

  # We assume a binary classification label with the following possible values.
  label_values = ["<=50K", ">50K"]

  # Convert the text labels into integers:
  # "<=50K" => 0
  # ">50K" => 1
  init_label_table = tf.lookup.KeyValueTensorInitializer(
      keys=tf.constant(label_values),
      values=tf.constant(range(label_values), dtype=tf.int64))
  label_table = tf.lookup.StaticVocabularyTable(
      init_label_table, num_oov_buckets=1)

  def extract_label(*columns):
    return columns[0:-1], label_table.lookup(columns[-1])

  ds_dataset = ds_columns.map(extract_label)

  # The batch size has no impact on the quality of the model. However, a larger
  # batch size generally is faster.
  ds_dataset = ds_dataset.batch(500)
  return ds_dataset


strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

  train_dataset = strategy.distribute_datasets_from_function(
      lambda context: dataset_fn(context, [...list of csv files...])
  )

model.fit(train_dataset)

print("Trained model")
model.summary()

範例:資料集路徑上的分散式超參數調整

資料集路徑上的分散式超參數調整與分散式訓練類似。唯一的區別是此選項與非分散式模型相容。例如,您可以分散 (非分散式) 梯度提升樹模型的超參數調整。

with strategy.scope():
  tuner = tfdf.tuner.RandomSearch(num_trials=30, use_predefined_hps=True)
  model = tfdf.keras.GradientBoostedTreesModel(tuner=tuner)

training_history = model.fit_on_dataset_path(
  train_path=train_path,
  label_key=label,
  dataset_format="csv",
  valid_path=test_path)

logging.info("Trained model:")
model.summary()

範例:單元測試

若要單元測試分散式訓練,您可以建立模擬工作站程序。如需更多資訊,請參閱 TF-DF 單位測試 中的方法 _create_in_process_tf_ps_cluster