使用 Orbit 進行訓練

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視 下載筆記本

這個範例將逐步說明如何使用 Orbit 訓練程式庫微調 BERT 模型。

Orbit 是一個彈性、輕量的程式庫,旨在讓您輕鬆地在 TensorFlow 中編寫自訂訓練迴圈。Orbit 處理常見的模型訓練工作,例如儲存檢查點、執行模型評估,以及設定摘要寫入,同時讓使用者完全掌控內部訓練迴圈的實作。它與 tf.distribute 整合,並支援在不同的裝置類型 (CPU、GPU 和 TPU) 上執行。

tensorflow.org 上的大多數範例都使用自訂訓練迴圈或 Keras 的 model.fit()。如果您的模型很複雜,且您的訓練迴圈需要更高的彈性、控制權或自訂性,Orbit 是 model.fit 的理想替代方案。此外,當有多種不同的模型架構都使用相同的自訂訓練迴圈時,使用 Orbit 可以簡化程式碼。

本教學課程著重於設定與使用 Orbit,而非 BERT、模型建構和資料處理的細節。如需關於這些主題的更深入教學課程,請參閱下列教學課程

安裝 TensorFlow Models 套件

安裝並匯入必要的套件,然後設定訓練模型所需的所有物件。

pip install -q opencv-python
pip install tensorflow>=2.9.0 tf-models-official

tf-models-official 套件同時包含 orbittensorflow_models 模組。

import tensorflow_models as tfm
import orbit
2023-10-17 11:55:57.421119: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-10-17 11:55:57.421164: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-10-17 11:55:57.421203: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered

訓練設定

本教學課程未著重於設定環境、建構模型和最佳化器,以及載入資料。在微調 BERT使用 GLUE 微調 BERT 教學課程中,更詳細地介紹了所有這些技巧。

若要查看本教學課程的訓練設定方式,請展開本節的其餘部分。

匯入必要的套件

Tensorflow Model Garden 匯入 BERT 模型和資料集建構程式庫。

import glob
import os
import pathlib
import tempfile
import time

import numpy as np

import tensorflow as tf
from official.nlp.data import sentence_prediction_dataloader
from official.nlp import optimization

設定分散式策略

雖然如果您在單一機器或 GPU 上執行,tf.distribute 無法協助模型的執行時間,但對於 TPU 而言,這是必要的。設定分散式策略可讓您使用相同的程式碼,而不論組態為何。

logical_device_names = [logical_device.name for logical_device in tf.config.list_logical_devices()]

if 'GPU' in ''.join(logical_device_names):
  strategy = tf.distribute.MirroredStrategy()
elif 'TPU' in ''.join(logical_device_names):
  resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
  tf.config.experimental_connect_to_cluster(resolver)
  tf.tpu.experimental.initialize_tpu_system(resolver)
  strategy = tf.distribute.TPUStrategy(resolver)
else:
  strategy = tf.distribute.OneDeviceStrategy(logical_device_names[0])
2023-10-17 11:56:02.076511: W tensorflow/core/common_runtime/gpu/gpu_device.cc:2211] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://tensorflow.dev.org.tw/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...

如需 TPU 設定的詳細資訊,請參閱 TPU 指南

建立模型和最佳化器

max_seq_length = 128
learning_rate = 3e-5
num_train_epochs = 3
train_batch_size = 32
eval_batch_size = 64

train_data_size = 3668
steps_per_epoch = int(train_data_size / train_batch_size)

train_steps = steps_per_epoch * num_train_epochs
warmup_steps = int(train_steps * 0.1)

print("train batch size: ", train_batch_size)
print("train epochs:     ", num_train_epochs)
print("steps_per_epoch:  ", steps_per_epoch)
train batch size:  32
train epochs:      3
steps_per_epoch:   114
model_dir = pathlib.Path(tempfile.mkdtemp())
print(model_dir)
/tmpfs/tmp/tmpjbwlp79l

建立 BERT 分類器模型和簡單的最佳化器。它們必須在 strategy.scope 內建立,以便可以分散變數。

with strategy.scope():
  encoder_network = tfm.nlp.encoders.build_encoder(
      tfm.nlp.encoders.EncoderConfig(type="bert"))
  classifier_model = tfm.nlp.models.BertClassifier(
      network=encoder_network, num_classes=2)

  optimizer = optimization.create_optimizer(
      init_lr=3e-5,
      num_train_steps=steps_per_epoch * num_train_epochs,
      num_warmup_steps=warmup_steps,
      end_lr=0.0,
      optimizer_type='adamw')
tf.keras.utils.plot_model(classifier_model)

png

從檢查點初始化

bert_dir = 'gs://cloud-tpu-checkpoints/bert/v3/uncased_L-12_H-768_A-12/'
tf.io.gfile.listdir(bert_dir)
['bert_config.json',
 'bert_model.ckpt.data-00000-of-00001',
 'bert_model.ckpt.index',
 'vocab.txt']
bert_checkpoint = bert_dir + 'bert_model.ckpt'
def init_from_ckpt_fn():
  init_checkpoint = tf.train.Checkpoint(**classifier_model.checkpoint_items)
  with strategy.scope():
    (init_checkpoint
     .read(bert_checkpoint)
     .expect_partial()
     .assert_existing_objects_matched())
with strategy.scope():
  init_from_ckpt_fn()

若要使用 Orbit,請建立 tf.train.CheckpointManager 物件。

checkpoint = tf.train.Checkpoint(model=classifier_model, optimizer=optimizer)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint,
    directory=model_dir,
    max_to_keep=5,
    step_counter=optimizer.iterations,
    checkpoint_interval=steps_per_epoch,
    init_fn=init_from_ckpt_fn)

建立分散式資料集

作為本教學課程的捷徑,GLUE/MPRC 資料集 已轉換為一對 TFRecord 檔案,其中包含序列化的 tf.train.Example 原始通訊協定。

資料是使用 這個指令碼 轉換而來。

train_data_path = "gs://download.tensorflow.org/data/model_garden_colab/mrpc_train.tf_record"
eval_data_path = "gs://download.tensorflow.org/data/model_garden_colab/mrpc_eval.tf_record"

def _dataset_fn(input_file_pattern, 
                global_batch_size, 
                is_training, 
                input_context=None):
  data_config = sentence_prediction_dataloader.SentencePredictionDataConfig(
    input_path=input_file_pattern,
    seq_length=max_seq_length,
    global_batch_size=global_batch_size,
    is_training=is_training)
  return sentence_prediction_dataloader.SentencePredictionDataLoader(
      data_config).load(input_context=input_context)

train_dataset = orbit.utils.make_distributed_dataset(
    strategy, _dataset_fn, input_file_pattern=train_data_path,
    global_batch_size=train_batch_size, is_training=True)
eval_dataset = orbit.utils.make_distributed_dataset(
    strategy, _dataset_fn, input_file_pattern=eval_data_path,
    global_batch_size=eval_batch_size, is_training=False)

建立損失函數

def loss_fn(labels, logits):
  """Classification loss."""
  labels = tf.squeeze(labels)
  log_probs = tf.nn.log_softmax(logits, axis=-1)
  one_hot_labels = tf.one_hot(
      tf.cast(labels, dtype=tf.int32), depth=2, dtype=tf.float32)
  per_example_loss = -tf.reduce_sum(
      tf.cast(one_hot_labels, dtype=tf.float32) * log_probs, axis=-1)
  return tf.reduce_mean(per_example_loss)

控制器、訓練器與評估器

使用 Orbit 時,orbit.Controller 類別會驅動訓練。控制器會處理分散式策略、步驟計數、TensorBoard 摘要和檢查點的細節。

若要實作訓練和評估,請傳遞 trainerevaluator,它們是 orbit.AbstractTrainerorbit.AbstractEvaluator 的子類別執行個體。為了與 Orbit 的輕量設計保持一致,這兩個類別具有最小的介面。

控制器會透過呼叫 trainer.train(num_steps)evaluator.evaluate(num_steps) 來驅動訓練和評估。這些 trainevaluate 方法會傳回結果字典以進行記錄。

訓練會分成 num_steps 長度的區塊。這是由控制器的 steps_per_loop 引數設定。使用訓練器和評估器抽象基底類別,num_steps 的意義完全由實作者決定。

一些常見範例包括

  • 讓區塊代表資料集週期邊界,例如預設 keras 設定。
  • 使用它更有效率地將多個訓練步驟與單一 tf.function 呼叫 (例如 Model.compilesteps_per_execution 引數) 分派至加速器。
  • 根據需要細分為較小的區塊。

StandardTrainer 和 StandardEvaluator

Orbit 提供兩個額外的類別 orbit.StandardTrainerorbit.StandardEvaluator,以便為訓練和評估迴圈提供更多結構。

使用 StandardTrainer 時,您只需要設定 train_loop_begintrain_steptrain_loop_end。基底類別會處理迴圈、資料集邏輯和 tf.function (根據其 orbit.StandardTrainerOptions 設定的選項)。這比 orbit.AbstractTrainer 更簡單,後者需要您處理整個迴圈。StandardEvaluator 具有與 StandardTrainer 類似的結構和簡化。

這實際上是 Keras 使用的 steps_per_execution 方法的實作。

將此與 Keras 進行比較,其中訓練分為週期 (資料集上的單次傳遞) 和 steps_per_execution (在 Model.compile 內設定) 兩者。在 Keras 中,指標平均值通常會在一個週期內累積,並在週期之間報告和重設。為了提高效率,steps_per_execution 僅控制每次呼叫進行的訓練步驟數。

在這個簡單的案例中,steps_per_loop (在 StandardTrainer 內) 將處理指標重設和每次呼叫的步驟數。

使用這些基底類別時,最小設定是實作下列方法

  1. StandardTrainer.train_loop_begin - 重設您的訓練指標。
  2. StandardTrainer.train_step - 套用單一梯度更新。
  3. StandardTrainer.train_loop_end - 報告您的訓練指標。

  1. StandardEvaluator.eval_begin - 重設您的評估指標。
  2. StandardEvaluator.eval_step - 執行單一評估步驟。
  3. StandardEvaluator.eval_reduce - 在這個簡單的設定中,這不是必要的。
  4. StandardEvaluator.eval_end - 報告您的評估指標。

根據設定,基底類別可能會將 train_stepeval_step 程式碼包裝在 tf.functiontf.while_loop 中,與標準 Python 相比,這有一些限制。

定義訓練器類別

在本節中,您將為此任務建立 orbit.StandardTrainer 的子類別。

訓練器需要存取訓練資料、模型、最佳化器和分散式策略。將這些作為引數傳遞至初始化程式。

使用 tf.keras.metrics.Mean 定義單一訓練指標 training_loss

def trainer_init(self,
                 train_dataset,
                 model,
                 optimizer,
                 strategy):
  self.strategy = strategy
  with self.strategy.scope():
    self.model = model
    self.optimizer = optimizer
    self.global_step = self.optimizer.iterations


    self.train_loss = tf.keras.metrics.Mean(
        'training_loss', dtype=tf.float32)
    orbit.StandardTrainer.__init__(self, train_dataset)

在開始執行訓練迴圈之前,train_loop_begin 方法將重設 train_loss 指標。

def train_loop_begin(self):
  self.train_loss.reset_states()

train_step 是一個直接的損失計算和梯度更新,由分散式策略執行。這是透過將梯度步驟定義為巢狀函式 (step_fn) 來完成的。

該方法接收 tf.distribute.DistributedIterator 以處理分散式輸入。該方法使用 Strategy.run 來執行 step_fn,並從分散式迭代器饋送它。

def train_step(self, iterator):

  def step_fn(inputs):
    labels = inputs.pop("label_ids")
    with tf.GradientTape() as tape:
      model_outputs = self.model(inputs, training=True)
      # Raw loss is used for reporting in metrics/logs.
      raw_loss = loss_fn(labels, model_outputs)
      # Scales down the loss for gradients to be invariant from replicas.
      loss = raw_loss / self.strategy.num_replicas_in_sync

    grads = tape.gradient(loss, self.model.trainable_variables)
    optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
    # For reporting, the metric takes the mean of losses.
    self.train_loss.update_state(raw_loss)

  self.strategy.run(step_fn, args=(next(iterator),))

orbit.StandardTrainer 處理 @tf.function 和迴圈。

在執行 num_steps 訓練之後,StandardTrainer 會呼叫 train_loop_end。函式會傳回指標結果

def train_loop_end(self):
  return {
      self.train_loss.name: self.train_loss.result(),
  }

使用這些方法建立 orbit.StandardTrainer 的子類別。

class BertClassifierTrainer(orbit.StandardTrainer):
  __init__ = trainer_init
  train_loop_begin = train_loop_begin
  train_step = train_step
  train_loop_end = train_loop_end

定義評估器類別

對於此任務而言,評估器甚至更簡單。它需要存取評估資料集、模型和策略。在儲存對這些物件的參考之後,建構函式只需要建立指標即可。

def evaluator_init(self,
                   eval_dataset,
                   model,
                   strategy):
  self.strategy = strategy
  with self.strategy.scope():
    self.model = model

    self.eval_loss = tf.keras.metrics.Mean(
        'evaluation_loss', dtype=tf.float32)
    self.eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
        name='accuracy', dtype=tf.float32)
    orbit.StandardEvaluator.__init__(self, eval_dataset)

與訓練器類似,eval_begineval_end 方法只需要在迴圈之前重設指標,然後在迴圈之後報告結果即可。

def eval_begin(self):
  self.eval_accuracy.reset_states()
  self.eval_loss.reset_states()

def eval_end(self):
  return {
      self.eval_accuracy.name: self.eval_accuracy.result(),
      self.eval_loss.name: self.eval_loss.result(),
  }

eval_step 方法的運作方式與 train_step 類似。內部 step_fn 定義計算損失和準確度並更新指標的實際工作。外部 eval_step 接收 tf.distribute.DistributedIterator 作為輸入,並使用 Strategy.run 來啟動分散式執行至 step_fn,並從分散式迭代器饋送它。

def eval_step(self, iterator):

  def step_fn(inputs):
    labels = inputs.pop("label_ids")
    model_outputs = self.model(inputs, training=True)
    loss = loss_fn(labels, model_outputs)
    self.eval_loss.update_state(loss)
    self.eval_accuracy.update_state(labels, model_outputs)

  self.strategy.run(step_fn, args=(next(iterator),))

使用這些方法建立 orbit.StandardEvaluator 的子類別。

class BertClassifierEvaluator(orbit.StandardEvaluator):
  __init__ = evaluator_init
  eval_begin = eval_begin
  eval_end = eval_end
  eval_step = eval_step

端對端訓練與評估

若要執行訓練和評估,只需建立訓練器、評估器和 orbit.Controller 執行個體即可。然後呼叫 Controller.train_and_evaluate 方法。

trainer = BertClassifierTrainer(
    train_dataset, classifier_model, optimizer, strategy)

evaluator = BertClassifierEvaluator(
    eval_dataset, classifier_model, strategy)

controller = orbit.Controller(
    trainer=trainer,
    evaluator=evaluator,
    global_step=trainer.global_step,
    steps_per_loop=20,
    checkpoint_manager=checkpoint_manager)

result = controller.train_and_evaluate(
    train_steps=steps_per_epoch * num_train_epochs,
    eval_steps=-1,
    eval_interval=steps_per_epoch)
restoring or initializing model...
INFO:tensorflow:Customized initialization is done through the passed `init_fn`.
INFO:tensorflow:Customized initialization is done through the passed `init_fn`.
train | step:      0 | training until step 114...
2023-10-17 11:56:16.208773: W tensorflow/core/framework/dataset.cc:959] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
train | step:     20 | steps/sec:    0.2 | output: {'training_loss': 0.776852}
saved checkpoint to /tmpfs/tmp/tmpjbwlp79l/ckpt-20.
train | step:     40 | steps/sec:    0.2 | output: {'training_loss': 0.71298754}
train | step:     60 | steps/sec:    0.2 | output: {'training_loss': 0.6112895}
train | step:     80 | steps/sec:    0.2 | output: {'training_loss': 0.57813513}
train | step:    100 | steps/sec:    0.2 | output: {'training_loss': 0.56901103}
train | step:    114 | steps/sec:    0.2 | output: {'training_loss': 0.5472072}
 eval | step:    114 | running complete evaluation...
2023-10-17 12:04:29.320401: W tensorflow/core/framework/dataset.cc:959] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
eval | step:    114 | eval time:   20.0 sec | output: {'accuracy': 0.7630208, 'evaluation_loss': 0.52163863}
train | step:    114 | training until step 228...
train | step:    134 | steps/sec:    0.2 | output: {'training_loss': 0.51722306}
saved checkpoint to /tmpfs/tmp/tmpjbwlp79l/ckpt-134.
train | step:    154 | steps/sec:    0.2 | output: {'training_loss': 0.524362}
train | step:    174 | steps/sec:    0.2 | output: {'training_loss': 0.39253792}
train | step:    194 | steps/sec:    0.2 | output: {'training_loss': 0.35146618}
train | step:    214 | steps/sec:    0.2 | output: {'training_loss': 0.3962813}
train | step:    228 | steps/sec:    0.2 | output: {'training_loss': 0.27635574}
 eval | step:    228 | running complete evaluation...
2023-10-17 12:12:42.261016: W tensorflow/core/framework/dataset.cc:959] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
eval | step:    228 | eval time:   18.7 sec | output: {'accuracy': 0.8020833, 'evaluation_loss': 0.4823281}
train | step:    228 | training until step 342...
train | step:    248 | steps/sec:    0.2 | output: {'training_loss': 0.33371425}
saved checkpoint to /tmpfs/tmp/tmpjbwlp79l/ckpt-248.
train | step:    268 | steps/sec:    0.2 | output: {'training_loss': 0.32890704}
train | step:    288 | steps/sec:    0.2 | output: {'training_loss': 0.21134928}
train | step:    308 | steps/sec:    0.2 | output: {'training_loss': 0.21237397}
train | step:    328 | steps/sec:    0.2 | output: {'training_loss': 0.2372253}
train | step:    342 | steps/sec:    0.2 | output: {'training_loss': 0.18402448}
 eval | step:    342 | running complete evaluation...
2023-10-17 12:20:51.500609: W tensorflow/core/framework/dataset.cc:959] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
eval | step:    342 | eval time:   18.5 sec | output: {'accuracy': 0.8098958, 'evaluation_loss': 0.4728314}
saved checkpoint to /tmpfs/tmp/tmpjbwlp79l/ckpt-342.