TFX 中的 TensorFlow 2.x

TensorFlow 2.0 已於 2019 年發布,其中包含Keras 的緊密整合、預設Eager ExecutionPythonic 函式執行,以及其他新功能和改進

本指南提供 TFX 中 TF 2.x 的完整技術總覽。

應使用哪個版本?

TFX 與 TensorFlow 2.x 相容,且 TensorFlow 1.x (尤其是 Estimators) 中存在的高階 API 仍可繼續運作。

在 TensorFlow 2.x 中開始新專案

由於 TensorFlow 2.x 保留了 TensorFlow 1.x 的高階功能,因此即使您不打算使用新功能,在新專案中使用舊版也沒有任何優勢。

因此,如果您要開始新的 TFX 專案,我們建議您使用 TensorFlow 2.x。您可能稍後會想要更新程式碼,因為對 Keras 和其他新功能的完整支援將會推出,而且如果您從 TensorFlow 2.x 開始,而非嘗試在未來從 TensorFlow 1.x 升級,則變更範圍將會受到更多限制。

將現有專案轉換為 TensorFlow 2.x

為 TensorFlow 1.x 撰寫的程式碼在很大程度上與 TensorFlow 2.x 相容,並將繼續在 TFX 中運作。

但是,如果您想要利用 TF 2.x 中可用的改進和新功能,您可以按照移轉至 TF 2.x 的操作說明進行操作。

Estimator

Estimator API 已保留在 TensorFlow 2.x 中,但不是新功能和開發的重點。使用 Estimators 以 TensorFlow 1.x 或 2.x 撰寫的程式碼將會繼續在 TFX 中如預期般運作。

以下是使用純 Estimator 的端對端 TFX 範例:計程車範例 (Estimator)

搭配 model_to_estimator 使用 Keras

Keras 模型可以使用 tf.keras.estimator.model_to_estimator 函式包裝,這可讓它們像 Estimators 一樣運作。若要使用此功能

  1. 建構 Keras 模型。
  2. 將已編譯的模型傳遞至 model_to_estimator
  3. 在 Trainer 中使用 model_to_estimator 的結果,就像您通常使用 Estimator 一樣。
# Build a Keras model.
def _keras_model_builder():
  """Creates a Keras model."""
  ...

  model = tf.keras.Model(inputs=inputs, outputs=output)
  model.compile()

  return model


# Write a typical trainer function
def trainer_fn(trainer_fn_args, schema):
  """Build the estimator, using model_to_estimator."""
  ...

  # Model to estimator
  estimator = tf.keras.estimator.model_to_estimator(
      keras_model=_keras_model_builder(), config=run_config)

  return {
      'estimator': estimator,
      ...
  }

除了 Trainer 的使用者模組檔案外,管線的其餘部分保持不變。

原生 Keras (即不含 model_to_estimator 的 Keras)

範例和 Colab

以下是數個具有原生 Keras 的範例

我們還有每個元件的Keras Colab

TFX 元件

以下章節說明相關的 TFX 元件如何支援原生 Keras。

Transform

Transform 目前對 Keras 模型提供實驗性支援。

Transform 元件本身可用於原生 Keras,而無需變更。preprocessing_fn 定義保持不變,使用 TensorFlowtf.Transform 運算元。

服務函式和評估函式會針對原生 Keras 進行變更。詳細資訊將在以下 Trainer 和 Evaluator 章節中討論。

Trainer

若要設定原生 Keras,需要為 Trainer 元件設定 GenericExecutor,以取代預設以 Estimator 為基礎的執行器。如需詳細資訊,請查看此處

具有 Transform 的 Keras 模組檔案

訓練模組檔案必須包含 run_fn,此函式將由 GenericExecutor 呼叫,典型的 Keras run_fn 看起來會像這樣

def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  # Train and eval files contains transformed examples.
  # _input_fn read dataset based on transformed schema from tft.
  train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
                            tf_transform_output.transformed_metadata.schema)
  eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
                           tf_transform_output.transformed_metadata.schema)

  model = _build_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

在上述 run_fn 中,匯出訓練後的模型時需要服務簽名,以便模型可以取得原始範例以進行預測。典型的服務函式看起來會像這樣

def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example."""

  # the layer is added as an attribute to the model in order to make sure that
  # the model assets are handled correctly when exporting.
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    transformed_features = model.tft_layer(parsed_features)

    return model(transformed_features)

  return serve_tf_examples_fn

在上述服務函式中,需要使用 tft.TransformFeaturesLayer 層,將 tf.Transform 轉換套用至推論的原始資料。先前 Estimators 所需的 _serving_input_receiver_fn 將不再與 Keras 一起使用。

不含 Transform 的 Keras 模組檔案

這與上述模組檔案類似,但不含轉換

def _get_serve_tf_examples_fn(model, schema):

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    feature_spec = _get_raw_feature_spec(schema)
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
    return model(parsed_features)

  return serve_tf_examples_fn


def run_fn(fn_args: TrainerFnArgs):
  schema = io_utils.parse_pbtxt_file(fn_args.schema_file, schema_pb2.Schema())

  # Train and eval files contains raw examples.
  # _input_fn reads the dataset based on raw data schema.
  train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, schema)
  eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, schema)

  model = _build_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model, schema).get_concrete_function(
              tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
tf.distribute.Strategy

目前 TFX 僅支援單一工作站策略 (例如,MirroredStrategyOneDeviceStrategy)。

若要使用分配策略,請建立適當的 tf.distribute.Strategy,並將 Keras 模型的建立和編譯移至策略範圍內。

例如,將上述 model = _build_keras_model() 取代為

  mirrored_strategy = tf.distribute.MirroredStrategy()
  with mirrored_strategy.scope():
    model = _build_keras_model()

  # Rest of the code can be unchanged.
  model.fit(...)

若要驗證 MirroredStrategy 使用的裝置 (CPU/GPU),請啟用資訊層級 TensorFlow 記錄

import logging
logging.getLogger("tensorflow").setLevel(logging.INFO)

您應該可以在記錄中看到 Using MirroredStrategy with devices (...)

Evaluator

在 TFMA v0.2x 中,ModelValidator 和 Evaluator 已合併為單一新 Evaluator 元件。新的 Evaluator 元件可以執行單一模型評估,也可以驗證目前模型與先前模型相比的效能。透過此變更,Pusher 元件現在會取用 Evaluator 而非 ModelValidator 的 blessing 結果。

新的 Evaluator 支援 Keras 模型以及 Estimator 模型。先前所需的 _eval_input_receiver_fn 和評估儲存模型將不再與 Keras 一起使用,因為 Evaluator 現在是以用於服務的相同 SavedModel 為基礎。

請參閱 Evaluator 以取得更多資訊.