使用 TPU

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 下載筆記本

本指南示範如何使用 張量處理單元 (TPU) 和 TPU Pod (由專用高速網路介面連接的 TPU 裝置集合),透過 tf.keras 和自訂訓練迴圈執行基本訓練。

TPU 是 Google 客製化開發的應用程式特定積體電路 (ASIC),用於加速機器學習工作負載。它們可透過 Google ColabTPU Research CloudCloud TPU 取得。

設定

在您執行此 Colab 筆記本之前,請檢查您的筆記本設定,確認您的硬體加速器是 TPU:「執行階段」>「變更執行階段類型」>「硬體加速器」>「TPU」。

匯入一些必要的程式庫,包括 TensorFlow Datasets

import tensorflow as tf

import os
import tensorflow_datasets as tfds
2023-06-09 12:13:32.486552: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT

TPU 初始化

TPU 通常是 Cloud TPU 工作站,這與執行使用者 Python 程式的本機程序不同。因此,您需要執行一些初始化工作來連線到遠端叢集並初始化 TPU。請注意,tf.distribute.cluster_resolver.TPUClusterResolvertpu 引數是 Colab 專用的特殊位址。如果您在 Google Compute Engine (GCE) 上執行程式碼,則應改為傳入您的 Cloud TPU 名稱。

resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Deallocate tpu buffers before initializing tpu system.
2023-06-09 12:13:34.011755: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:266] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Deallocate tpu buffers before initializing tpu system.
INFO:tensorflow:Initializing the TPU system: grpc://10.25.167.66:8470
INFO:tensorflow:Initializing the TPU system: grpc://10.25.167.66:8470
INFO:tensorflow:Finished initializing TPU system.
INFO:tensorflow:Finished initializing TPU system.
All devices:  [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]

手動裝置放置

在 TPU 初始化之後,您可以使用手動裝置放置將運算放置在單一 TPU 裝置上

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

with tf.device('/TPU:0'):
  c = tf.matmul(a, b)

print("c device: ", c.device)
print(c)
c device:  /job:worker/replica:0/task:0/device:TPU:0
tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)

分散策略

通常,您會以資料平行方式在多個 TPU 上執行模型。為了在多個 TPU (以及多個 GPU 或多部機器) 上分散模型,TensorFlow 提供了 tf.distribute.Strategy API。您可以取代您的分散策略,模型將在任何給定的 (TPU) 裝置上執行。在 TensorFlow 分散式訓練指南中瞭解更多資訊。

使用 tf.distribute.TPUStrategy 選項可實作同步分散式訓練。TPU 提供其自身的高效率 all-reduce 和其他跨多個 TPU 核心的集體運算實作,這些實作用於 TPUStrategy 中。

為了示範這一點,建立一個 tf.distribute.TPUStrategy 物件

strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system:
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

為了複製運算使其可以在所有 TPU 核心中執行,您可以將其傳遞到 Strategy.run API 中。以下範例顯示所有核心接收相同的輸入 (a, b),並在每個核心上獨立執行矩陣乘法。輸出將是來自所有複本的值。

@tf.function
def matmul_fn(x, y):
  z = tf.matmul(x, y)
  return z

z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{
  0: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  1: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  2: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  3: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  4: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  5: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  6: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  7: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)
}

在 TPU 上進行分類

涵蓋基本概念後,請考慮一個更具體的範例。本節示範如何使用分散策略—tf.distribute.TPUStrategy—在 Cloud TPU 上訓練 Keras 模型。

定義 Keras 模型

從 MNIST 資料集上影像分類的 Sequential Keras 模型定義開始。這與您在 CPU 或 GPU 上訓練時使用的模型沒有什麼不同。請注意,Keras 模型建立需要位於 Strategy.scope 內,以便可以在每個 TPU 裝置上建立變數。程式碼的其他部分不需要位於 Strategy 範圍內。

def create_model():
  return tf.keras.Sequential(
      [tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
       tf.keras.layers.Conv2D(256, 3, activation='relu'),
       tf.keras.layers.Flatten(),
       tf.keras.layers.Dense(256, activation='relu'),
       tf.keras.layers.Dense(128, activation='relu'),
       tf.keras.layers.Dense(10)])

載入資料集

在使用 Cloud TPU 時,有效率地使用 tf.data.Dataset API 至關重要。您可以在輸入管道效能指南中瞭解更多關於資料集效能的資訊。

如果您使用的是 TPU 節點,則需要將 TensorFlow Dataset 讀取的所有資料檔案儲存在 Google Cloud Storage (GCS) 儲存桶中。如果您使用的是 TPU VM,則可以將資料儲存在任何您喜歡的位置。有關 TPU 節點和 TPU VM 的更多資訊,請參閱 TPU 系統架構文件。

對於大多數使用案例,建議將您的資料轉換為 TFRecord 格式,並使用 tf.data.TFRecordDataset 讀取它。請查看 TFRecord 和 tf.Example 教學課程,以瞭解如何執行此操作的詳細資訊。這不是硬性要求,您可以使用其他資料集讀取器,例如 tf.data.FixedLengthRecordDatasettf.data.TextLineDataset

您可以使用 tf.data.Dataset.cache 將整個小型資料集載入記憶體中。

無論使用哪種資料格式,都強烈建議您使用約 100MB 的大型檔案。這在網路環境中尤其重要,因為開啟檔案的額外負荷明顯更高。

如下面的程式碼所示,您應該使用 Tensorflow Datasets tfds.load 模組來取得 MNIST 訓練和測試資料的副本。請注意,try_gcs 指定使用公用 GCS 儲存桶中可用的副本。如果您未指定此項,TPU 將無法存取下載的資料。

def get_dataset(batch_size, is_training=True):
  split = 'train' if is_training else 'test'
  dataset, info = tfds.load(name='mnist', split=split, with_info=True,
                            as_supervised=True, try_gcs=True)

  # Normalize the input data.
  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0
    return image, label

  dataset = dataset.map(scale)

  # Only shuffle and repeat the dataset in training. The advantage of having an
  # infinite dataset for training is to avoid the potential last partial batch
  # in each epoch, so that you don't need to think about scaling the gradients
  # based on the actual batch size.
  if is_training:
    dataset = dataset.shuffle(10000)
    dataset = dataset.repeat()

  dataset = dataset.batch(batch_size)

  return dataset

使用 Keras 高階 API 訓練模型

您可以使用 Keras Model.fitModel.compile API 訓練模型。此步驟中沒有任何 TPU 特定的內容—您編寫程式碼的方式就好像您使用的是多個 GPU 和 MirroredStrategy 而不是 TPUStrategy。您可以在 Keras 分散式訓練教學課程中瞭解更多資訊。

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size

train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)
Epoch 1/5
300/300 [==============================] - 17s 32ms/step - loss: 0.1235 - sparse_categorical_accuracy: 0.9620 - val_loss: 0.0462 - val_sparse_categorical_accuracy: 0.9856
Epoch 2/5
300/300 [==============================] - 7s 24ms/step - loss: 0.0333 - sparse_categorical_accuracy: 0.9894 - val_loss: 0.0401 - val_sparse_categorical_accuracy: 0.9878
Epoch 3/5
300/300 [==============================] - 7s 24ms/step - loss: 0.0186 - sparse_categorical_accuracy: 0.9938 - val_loss: 0.0352 - val_sparse_categorical_accuracy: 0.9900
Epoch 4/5
300/300 [==============================] - 7s 25ms/step - loss: 0.0127 - sparse_categorical_accuracy: 0.9957 - val_loss: 0.0482 - val_sparse_categorical_accuracy: 0.9879
Epoch 5/5
300/300 [==============================] - 7s 24ms/step - loss: 0.0111 - sparse_categorical_accuracy: 0.9962 - val_loss: 0.0448 - val_sparse_categorical_accuracy: 0.9894
<keras.callbacks.History at 0x7f79107c8d30>

為了減少 Python 額外負荷並最大化 TPU 的效能,請將 steps_per_execution 引數傳遞給 Keras Model.compile。在此範例中,它將輸送量提高了約 50%

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                # Anything between 2 and `steps_per_epoch` could help here.
                steps_per_execution = 50,
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)
Epoch 1/5
300/300 [==============================] - 14s 45ms/step - loss: 0.1306 - sparse_categorical_accuracy: 0.9591 - val_loss: 0.0420 - val_sparse_categorical_accuracy: 0.9863
Epoch 2/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0333 - sparse_categorical_accuracy: 0.9900 - val_loss: 0.0502 - val_sparse_categorical_accuracy: 0.9846
Epoch 3/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0193 - sparse_categorical_accuracy: 0.9936 - val_loss: 0.0406 - val_sparse_categorical_accuracy: 0.9879
Epoch 4/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0135 - sparse_categorical_accuracy: 0.9955 - val_loss: 0.0416 - val_sparse_categorical_accuracy: 0.9882
Epoch 5/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0110 - sparse_categorical_accuracy: 0.9962 - val_loss: 0.0463 - val_sparse_categorical_accuracy: 0.9882
<keras.callbacks.History at 0x7f7898488e20>

使用自訂訓練迴圈訓練模型

您也可以直接使用 tf.functiontf.distribute API 建立和訓練模型。您可以使用 Strategy.experimental_distribute_datasets_from_function API,根據資料集函式分散 tf.data.Dataset。請注意,在以下範例中,傳遞到 Dataset 的批次大小是每個複本的批次大小,而不是全域批次大小。若要瞭解更多資訊,請查看使用 tf.distribute.Strategy 進行自訂訓練教學課程。

首先,建立模型、資料集和 tf.function

# Create the model, optimizer and metrics inside the `tf.distribute.Strategy`
# scope, so that the variables can be mirrored on each device.
with strategy.scope():
  model = create_model()
  optimizer = tf.keras.optimizers.Adam()
  training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
  training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      'training_accuracy', dtype=tf.float32)

# Calculate per replica batch size, and distribute the `tf.data.Dataset`s
# on each TPU worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync

train_dataset = strategy.experimental_distribute_datasets_from_function(
    lambda _: get_dataset(per_replica_batch_size, is_training=True))

@tf.function
def train_step(iterator):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From /tmpfs/tmp/ipykernel_9094/1509474074.py:14: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
rename to distribute_datasets_from_function
WARNING:tensorflow:From /tmpfs/tmp/ipykernel_9094/1509474074.py:14: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
rename to distribute_datasets_from_function

然後,執行訓練迴圈

steps_per_eval = 10000 // batch_size

train_iterator = iter(train_dataset)
for epoch in range(5):
  print('Epoch: {}/5'.format(epoch))

  for step in range(steps_per_epoch):
    train_step(train_iterator)
  print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
  training_loss.reset_states()
  training_accuracy.reset_states()
Epoch: 0/5
Current step: 300, training loss: 0.1465, accuracy: 95.4%
Epoch: 1/5
Current step: 600, training loss: 0.035, accuracy: 98.94%
Epoch: 2/5
Current step: 900, training loss: 0.0197, accuracy: 99.39%
Epoch: 3/5
Current step: 1200, training loss: 0.0126, accuracy: 99.59%
Epoch: 4/5
Current step: 1500, training loss: 0.0109, accuracy: 99.64%

使用 tf.function 內的多個步驟提升效能

您可以透過在 tf.function 中執行多個步驟來提升效能。這是透過將 Strategy.run 呼叫與 tf.function 內的 tf.range 包裝在一起來實現的,AutoGraph 會將其轉換為 TPU 工作站上的 tf.while_loop。您可以在使用 tf.function 提升效能指南中瞭解更多關於 tf.function 的資訊。

儘管效能有所提升,但與在 tf.function 中執行單一步驟相比,此方法存在權衡取捨。在 tf.function 中執行多個步驟的彈性較差—您無法在步驟中急切地執行或使用任意 Python 程式碼。

@tf.function
def train_multiple_steps(iterator, steps):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  for _ in tf.range(steps):
    strategy.run(step_fn, args=(next(iterator),))

# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))

print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.009, accuracy: 99.72%

後續步驟

若要瞭解更多關於 Cloud TPU 以及如何使用它們的資訊