![]() |
![]() |
![]() |
![]() |
總覽
在本教學課程中,您將學習如何搭配 Keras 使用 DTensors。
透過 DTensor 與 Keras 的整合,您可以重複使用現有的 Keras 層和模型,以建構及訓練分散式機器學習模型。
您將使用 MNIST 資料訓練多層分類模型。將示範如何為子類別模型、循序模型和函式模型設定版面配置。
本教學課程假設您已閱讀 DTensor 程式設計指南,並且熟悉基本的 DTensor 概念,例如 Mesh
和 Layout
。
本教學課程以使用 Keras 在 MNIST 上訓練神經網路為基礎。
設定
DTensor (自 2.9.0 版本起,tf.experimental.dtensor
) 已成為 TensorFlow 的一部分。
首先,安裝或升級 TensorFlow Datasets
pip install --quiet --upgrade tensorflow-datasets
接著,匯入 TensorFlow 和 dtensor
,並將 TensorFlow 設定為使用 8 個虛擬 CPU。
即使這個範例使用虛擬 CPU,DTensor 在 CPU、GPU 或 TPU 裝置上的運作方式也相同。
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
def configure_virtual_cpus(ncpu):
phy_devices = tf.config.list_physical_devices('CPU')
tf.config.set_logical_device_configuration(
phy_devices[0],
[tf.config.LogicalDeviceConfiguration()] * ncpu)
configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')
devices = [f'CPU:{i}' for i in range(8)]
決定性虛擬亂數產生器
您應該注意的一件事是,DTensor API 要求每個執行的用戶端都具有相同的隨機種子,以便在初始化權重時能有決定性行為。您可以透過 tf.keras.utils.set_random_seed()
在 Keras 中設定全域種子來達成此目的。
tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)
建立資料平行 Mesh
本教學課程示範資料平行訓練。調整為模型平行訓練和空間平行訓練可以像切換到一組不同的 Layout
物件一樣簡單。如需資料平行以外的分散式訓練相關資訊,請參閱搭配 DTensors 的分散式訓練教學課程。
資料平行訓練是一種常用的平行訓練方案,例如 tf.distribute.MirroredStrategy
也使用此方案。
透過 DTensor,資料平行訓練迴圈會使用由單一「批次」維度組成的 Mesh
,其中每個裝置都會執行模型副本,並接收來自全域批次的碎片。
mesh = dtensor.create_mesh([("batch", 8)], devices=devices)
由於每個裝置都會執行完整的模型副本,因此模型變數應在整個 mesh 中完全複製 (未分片)。例如,此 Mesh
上等級 2 權重的完全複製版面配置如下
example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh) # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)
此 Mesh
上等級 2 資料張量的版面配置會沿著第一個維度分片 (有時稱為 batch_sharded
),
example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh) # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
使用版面配置建立 Keras 層
在資料平行方案中,您通常會使用完全複製的版面配置來建立模型權重,以便模型的每個副本都可以使用分片的輸入資料進行計算。
為了設定圖層權重的版面配置資訊,Keras 在大多數內建圖層的建構函式中公開了一個額外參數。
以下範例會使用完全複製的權重版面配置建立小型圖片分類模型。您可以在 tf.keras.layers.Dense
中透過引數 kernel_layout
和 bias_layout
指定版面配置資訊 kernel
和 bias
。大多數內建的 keras 層都已準備好明確指定圖層權重的 Layout
。
unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128,
activation='relu',
name='d1',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d),
tf.keras.layers.Dense(10,
name='d2',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d)
])
您可以透過檢查權重上的 layout
屬性來檢查版面配置資訊。
for weight in model.weights:
print(f'Weight name: {weight.name} with layout: {weight.layout}')
break
載入資料集並建構輸入管線
載入 MNIST 資料集,並為其設定一些預先處理輸入管線。資料集本身不包含任何 DTensor 版面配置資訊。
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
batch_size = 128
ds_train = ds_train.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
定義模型的訓練邏輯
接下來,定義模型的訓練和評估邏輯。
在 TensorFlow 2.9 版中,您必須為啟用 DTensor 的 Keras 模型編寫自訂訓練迴圈。這是為了將輸入資料與適當的版面配置資訊封裝在一起,而這並未與 Keras 的標準 tf.keras.Model.fit()
或 tf.keras.Model.eval()
函式整合。您將在即將發行的版本中獲得更多 tf.data
支援。
@tf.function
def train_step(model, x, y, optimizer, metrics):
with tf.GradientTape() as tape:
logits = model(x, training=True)
# tf.reduce_sum sums the batch sharded per-example loss to a replicated
# global loss (scalar).
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'loss': loss_per_sample}
return results
@tf.function
def eval_step(model, x, y, metrics):
logits = model(x, training=False)
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'eval_loss': loss_per_sample}
return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
num_local_devices = image_layout.mesh.num_local_devices()
images = tf.split(images, num_local_devices)
labels = tf.split(labels, num_local_devices)
images = dtensor.pack(images, image_layout)
labels = dtensor.pack(labels, label_layout)
return images, labels
指標和最佳化工具
將 DTensor API 與 Keras Metric
和 Optimizer
搭配使用時,您需要提供額外的 mesh 資訊,以便任何內部狀態變數和張量都能與模型中的變數搭配運作。
對於最佳化工具,DTensor 導入了新的實驗性命名空間
keras.dtensor.experimental.optimizers
,其中許多現有的 Keras Optimizer 都已擴充為接收額外的mesh
引數。在未來的版本中,可能會與 Keras 核心最佳化工具合併。對於指標,您可以直接在建構函式中將
mesh
指定為引數,使其成為與 DTensor 相容的Metric
。
optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
訓練模型
以下範例示範如何將輸入管線中的資料在批次維度上分片,並使用具有完全複製權重的模型進行訓練。
經過 3 個 epoch 後,模型應能達到約 97% 的準確度
num_epochs = 3
image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
for epoch in range(num_epochs):
print("============================")
print("Epoch: ", epoch)
for metric in metrics.values():
metric.reset_state()
step = 0
results = {}
pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
for input in ds_train:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(train_step(model, images, labels, optimizer, metrics))
for metric_name, metric in metrics.items():
results[metric_name] = metric.result()
pbar.update(step, values=results.items(), finalize=False)
step += 1
pbar.update(step, values=results.items(), finalize=True)
for metric in eval_metrics.values():
metric.reset_state()
for input in ds_test:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(eval_step(model, images, labels, eval_metrics))
for metric_name, metric in eval_metrics.items():
results[metric_name] = metric.result()
for metric_name, metric in results.items():
print(f"{metric_name}: {metric.numpy()}")
為現有的模型程式碼指定版面配置
您通常會有適用於您的使用案例的模型。為模型中每個個別層指定 Layout
資訊會需要大量工作,且需要進行許多編輯。
為了協助您輕鬆轉換現有的 Keras 模型以搭配 DTensor API 運作,您可以使用新的 tf.keras.dtensor.experimental.LayoutMap
API,讓您從全域角度指定 Layout
。
首先,您需要建立 LayoutMap
執行個體,這是一個類似字典的物件,其中包含您想要為模型權重指定的所有 Layout
。
LayoutMap
在初始化時需要 Mesh
執行個體,可用於為任何未設定版面配置的權重提供預設的複製 Layout
。如果您希望所有模型權重都只是完全複製,您可以提供空的 LayoutMap
,預設 mesh 將用於建立複製的 Layout
。
LayoutMap
使用字串做為鍵,並使用 Layout
做為值。一般 Python 字典與此類別之間存在行為差異。在擷取值時,字串鍵會被視為規則運算式。
子類別模型
請考慮使用 Keras 子類別模型語法定義的下列模型。
class SubclassedModel(tf.keras.Model):
def __init__(self, name=None):
super().__init__(name=name)
self.feature = tf.keras.layers.Dense(16)
self.feature_2 = tf.keras.layers.Dense(24)
self.dropout = tf.keras.layers.Dropout(0.1)
def call(self, inputs, training=None):
x = self.feature(inputs)
x = self.dropout(x, training=training)
return self.feature_2(x)
此模型中有 4 個權重,分別是兩個 Dense
層的 kernel
和 bias
。它們各自根據物件路徑對應
model.feature.kernel
model.feature.bias
model.feature_2.kernel
model.feature_2.bias
現在定義下列 LayoutMap
並將其套用至模型
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
subclassed_model = SubclassedModel()
模型權重是在第一次呼叫時建立的,因此請使用 DTensor 輸入呼叫模型,並確認權重具有預期的版面配置
dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)
print(subclassed_model.feature.kernel.layout)
如此一來,您就可以快速將 Layout
對應至您的模型,而無需更新任何現有的程式碼。
循序模型和函式模型
對於 Keras 函式和循序模型,您也可以使用 tf.keras.dtensor.experimental.LayoutMap
。
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
inputs = tf.keras.Input((16,), batch_size=16)
x = tf.keras.layers.Dense(16, name='feature')(inputs)
x = tf.keras.layers.Dropout(0.1)(x)
output = tf.keras.layers.Dense(32, name='feature_2')(x)
model = tf.keras.Model(inputs, output)
print(model.layers[1].kernel.layout)
with layout_map.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.Dense(32, name='feature_2')
])
print(model.layers[2].kernel.layout)