使用 DTensors 與 Keras

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 下載筆記本

總覽

在本教學課程中,您將學習如何搭配 Keras 使用 DTensors。

透過 DTensor 與 Keras 的整合,您可以重複使用現有的 Keras 層和模型,以建構及訓練分散式機器學習模型。

您將使用 MNIST 資料訓練多層分類模型。將示範如何為子類別模型、循序模型和函式模型設定版面配置。

本教學課程假設您已閱讀 DTensor 程式設計指南,並且熟悉基本的 DTensor 概念,例如 MeshLayout

本教學課程以使用 Keras 在 MNIST 上訓練神經網路為基礎。

設定

DTensor (自 2.9.0 版本起,tf.experimental.dtensor) 已成為 TensorFlow 的一部分。

首先,安裝或升級 TensorFlow Datasets

pip install --quiet --upgrade tensorflow-datasets

接著,匯入 TensorFlow 和 dtensor,並將 TensorFlow 設定為使用 8 個虛擬 CPU。

即使這個範例使用虛擬 CPU,DTensor 在 CPU、GPU 或 TPU 裝置上的運作方式也相同。

import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
def configure_virtual_cpus(ncpu):
  phy_devices = tf.config.list_physical_devices('CPU')
  tf.config.set_logical_device_configuration(
        phy_devices[0], 
        [tf.config.LogicalDeviceConfiguration()] * ncpu)

configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')

devices = [f'CPU:{i}' for i in range(8)]

決定性虛擬亂數產生器

您應該注意的一件事是,DTensor API 要求每個執行的用戶端都具有相同的隨機種子,以便在初始化權重時能有決定性行為。您可以透過 tf.keras.utils.set_random_seed() 在 Keras 中設定全域種子來達成此目的。

tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)

建立資料平行 Mesh

本教學課程示範資料平行訓練。調整為模型平行訓練和空間平行訓練可以像切換到一組不同的 Layout 物件一樣簡單。如需資料平行以外的分散式訓練相關資訊,請參閱搭配 DTensors 的分散式訓練教學課程。

資料平行訓練是一種常用的平行訓練方案,例如 tf.distribute.MirroredStrategy 也使用此方案。

透過 DTensor,資料平行訓練迴圈會使用由單一「批次」維度組成的 Mesh,其中每個裝置都會執行模型副本,並接收來自全域批次的碎片。

mesh = dtensor.create_mesh([("batch", 8)], devices=devices)

由於每個裝置都會執行完整的模型副本,因此模型變數應在整個 mesh 中完全複製 (未分片)。例如,此 Mesh 上等級 2 權重的完全複製版面配置如下

example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh)  # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)

Mesh 上等級 2 資料張量的版面配置會沿著第一個維度分片 (有時稱為 batch_sharded),

example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh)  # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)

使用版面配置建立 Keras 層

在資料平行方案中,您通常會使用完全複製的版面配置來建立模型權重,以便模型的每個副本都可以使用分片的輸入資料進行計算。

為了設定圖層權重的版面配置資訊,Keras 在大多數內建圖層的建構函式中公開了一個額外參數。

以下範例會使用完全複製的權重版面配置建立小型圖片分類模型。您可以在 tf.keras.layers.Dense 中透過引數 kernel_layoutbias_layout 指定版面配置資訊 kernelbias。大多數內建的 keras 層都已準備好明確指定圖層權重的 Layout

unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, 
                        activation='relu',
                        name='d1',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d),
  tf.keras.layers.Dense(10,
                        name='d2',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d)
])

您可以透過檢查權重上的 layout 屬性來檢查版面配置資訊。

for weight in model.weights:
  print(f'Weight name: {weight.name} with layout: {weight.layout}')
  break

載入資料集並建構輸入管線

載入 MNIST 資料集,並為其設定一些預先處理輸入管線。資料集本身不包含任何 DTensor 版面配置資訊。

(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label
batch_size = 128

ds_train = ds_train.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

定義模型的訓練邏輯

接下來,定義模型的訓練和評估邏輯。

在 TensorFlow 2.9 版中,您必須為啟用 DTensor 的 Keras 模型編寫自訂訓練迴圈。這是為了將輸入資料與適當的版面配置資訊封裝在一起,而這並未與 Keras 的標準 tf.keras.Model.fit()tf.keras.Model.eval() 函式整合。您將在即將發行的版本中獲得更多 tf.data 支援。

@tf.function
def train_step(model, x, y, optimizer, metrics):
  with tf.GradientTape() as tape:
    logits = model(x, training=True)
    # tf.reduce_sum sums the batch sharded per-example loss to a replicated
    # global loss (scalar).
    loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'loss': loss_per_sample}
  return results
@tf.function
def eval_step(model, x, y, metrics):
  logits = model(x, training=False)
  loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'eval_loss': loss_per_sample}
  return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
  num_local_devices = image_layout.mesh.num_local_devices()
  images = tf.split(images, num_local_devices)
  labels = tf.split(labels, num_local_devices)
  images = dtensor.pack(images, image_layout)
  labels = dtensor.pack(labels, label_layout)
  return  images, labels

指標和最佳化工具

將 DTensor API 與 Keras MetricOptimizer 搭配使用時,您需要提供額外的 mesh 資訊,以便任何內部狀態變數和張量都能與模型中的變數搭配運作。

  • 對於最佳化工具,DTensor 導入了新的實驗性命名空間 keras.dtensor.experimental.optimizers,其中許多現有的 Keras Optimizer 都已擴充為接收額外的 mesh 引數。在未來的版本中,可能會與 Keras 核心最佳化工具合併。

  • 對於指標,您可以直接在建構函式中將 mesh 指定為引數,使其成為與 DTensor 相容的 Metric

optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}

訓練模型

以下範例示範如何將輸入管線中的資料在批次維度上分片,並使用具有完全複製權重的模型進行訓練。

經過 3 個 epoch 後,模型應能達到約 97% 的準確度

num_epochs = 3

image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

for epoch in range(num_epochs):
  print("============================") 
  print("Epoch: ", epoch)
  for metric in metrics.values():
    metric.reset_state()
  step = 0
  results = {}
  pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
  for input in ds_train:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)

    results.update(train_step(model, images, labels, optimizer, metrics))
    for metric_name, metric in metrics.items():
      results[metric_name] = metric.result()

    pbar.update(step, values=results.items(), finalize=False)
    step += 1
  pbar.update(step, values=results.items(), finalize=True)

  for metric in eval_metrics.values():
    metric.reset_state()
  for input in ds_test:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)
    results.update(eval_step(model, images, labels, eval_metrics))

  for metric_name, metric in eval_metrics.items():
    results[metric_name] = metric.result()

  for metric_name, metric in results.items():
    print(f"{metric_name}: {metric.numpy()}")

為現有的模型程式碼指定版面配置

您通常會有適用於您的使用案例的模型。為模型中每個個別層指定 Layout 資訊會需要大量工作,且需要進行許多編輯。

為了協助您輕鬆轉換現有的 Keras 模型以搭配 DTensor API 運作,您可以使用新的 tf.keras.dtensor.experimental.LayoutMap API,讓您從全域角度指定 Layout

首先,您需要建立 LayoutMap 執行個體,這是一個類似字典的物件,其中包含您想要為模型權重指定的所有 Layout

LayoutMap 在初始化時需要 Mesh 執行個體,可用於為任何未設定版面配置的權重提供預設的複製 Layout。如果您希望所有模型權重都只是完全複製,您可以提供空的 LayoutMap,預設 mesh 將用於建立複製的 Layout

LayoutMap 使用字串做為鍵,並使用 Layout 做為值。一般 Python 字典與此類別之間存在行為差異。在擷取值時,字串鍵會被視為規則運算式。

子類別模型

請考慮使用 Keras 子類別模型語法定義的下列模型。

class SubclassedModel(tf.keras.Model):

  def __init__(self, name=None):
    super().__init__(name=name)
    self.feature = tf.keras.layers.Dense(16)
    self.feature_2 = tf.keras.layers.Dense(24)
    self.dropout = tf.keras.layers.Dropout(0.1)

  def call(self, inputs, training=None):
    x = self.feature(inputs)
    x = self.dropout(x, training=training)
    return self.feature_2(x)

此模型中有 4 個權重,分別是兩個 Dense 層的 kernelbias。它們各自根據物件路徑對應

  • model.feature.kernel
  • model.feature.bias
  • model.feature_2.kernel
  • model.feature_2.bias

現在定義下列 LayoutMap 並將其套用至模型

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

with layout_map.scope():
  subclassed_model = SubclassedModel()

模型權重是在第一次呼叫時建立的,因此請使用 DTensor 輸入呼叫模型,並確認權重具有預期的版面配置

dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)

print(subclassed_model.feature.kernel.layout)

如此一來,您就可以快速將 Layout 對應至您的模型,而無需更新任何現有的程式碼。

循序模型和函式模型

對於 Keras 函式和循序模型,您也可以使用 tf.keras.dtensor.experimental.LayoutMap

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
  inputs = tf.keras.Input((16,), batch_size=16)
  x = tf.keras.layers.Dense(16, name='feature')(inputs)
  x = tf.keras.layers.Dropout(0.1)(x)
  output = tf.keras.layers.Dense(32, name='feature_2')(x)
  model = tf.keras.Model(inputs, output)

print(model.layers[1].kernel.layout)
with layout_map.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
      tf.keras.layers.Dropout(0.1),
      tf.keras.layers.Dense(32, name='feature_2')
  ])

print(model.layers[2].kernel.layout)