運用 ML Metadata 提升 ML 工程效率

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 下載筆記本

假設您設定了生產環境 ML 管線來分類企鵝。這個管線會擷取您的訓練資料、訓練及評估模型,然後將模型推送至生產環境。

然而,當您稍後嘗試將這個模型用於包含不同種類企鵝的較大型資料集時,您會發現模型行為不如預期,並開始錯誤分類物種。

此時,您會想知道

  • 當唯一可用的成品是生產環境中的模型時,最有效率的偵錯模型方式是什麼?
  • 是使用哪個訓練資料集來訓練模型?
  • 哪個訓練執行作業導致這個錯誤模型?
  • 模型評估結果在哪裡?
  • 要從哪裡開始偵錯?

ML Metadata (MLMD) 是一個程式庫,可運用與 ML 模型相關聯的中繼資料,協助您解答這些問題及其他問題。一個有用的類比是將這個中繼資料視為軟體開發中的記錄。MLMD 可讓您可靠地追蹤與 ML 管線各種元件相關聯的成品和沿襲。

在本教學課程中,您會設定 TFX 管線來建立模型,根據企鵝的體重、喙的長度和深度以及鰭狀肢的長度,將企鵝分類為三個物種。然後,您會使用 MLMD 來追蹤管線元件的沿襲。

Colab 中的 TFX 管線

Colab 是一種輕量級開發環境,與生產環境有顯著差異。在生產環境中,您可能會有多個管線元件,例如資料擷取、轉換、模型訓練、執行記錄等,這些元件分散在多個分散式系統中。在本教學課程中,您應該注意 Orchestration 和 Metadata 儲存空間存在顯著差異 - 它們都在 Colab 中在本機處理。如要進一步瞭解 Colab 中的 TFX,請按一下這裡

設定

首先,我們會安裝及匯入必要的套件、設定路徑,並下載資料。

升級 Pip

為了避免在本機執行時升級系統中的 Pip,請檢查以確認我們是否在 Colab 中執行。本機系統當然可以個別升級。

try:
  import colab
  !pip install --upgrade pip
except:
  pass

安裝及匯入 TFX

pip install -q tfx

匯入套件

您是否重新啟動執行階段?

如果您使用 Google Colab,第一次執行上述儲存格時,您必須按一下上方的「RESTART RUNTIME」按鈕或使用「Runtime > Restart runtime ...」選單來重新啟動執行階段。這是因為 Colab 載入套件的方式。

import os
import tempfile
import urllib
import pandas as pd

import tensorflow_model_analysis as tfma
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
2024-04-30 10:32:39.287985: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-30 10:32:39.288034: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-30 10:32:39.289482: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered

檢查 TFX 和 MLMD 版本。

from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import ml_metadata as mlmd
print('MLMD version: {}'.format(mlmd.__version__))
TFX version: 1.15.0
MLMD version: 1.15.0

下載資料集

在這個 Colab 中,我們使用 Palmer Penguins 資料集,這個資料集可在 Github 上找到。我們處理這個資料集的方式是排除任何不完整的記錄,並捨棄 islandsex 欄,以及將標籤轉換為 int32。這個資料集包含 334 筆企鵝的體重、喙的長度和深度以及鰭狀肢長度的記錄。您可以使用這些資料將企鵝分類為三個物種之一。

DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_root = tempfile.mkdtemp(prefix='tfx-data')
_data_filepath = os.path.join(_data_root, "penguins_processed.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)
('/tmpfs/tmp/tfx-data4bx2jr3d/penguins_processed.csv',
 <http.client.HTTPMessage at 0x7f82e19047c0>)

建立 InteractiveContext

如要在這個筆記本中以互動方式執行 TFX 元件,請建立 InteractiveContextInteractiveContext 使用暫時目錄和暫時性 MLMD 資料庫執行個體。請注意,在 Colab 環境之外呼叫 InteractiveContext 不會執行任何作業。

一般而言,最好將類似的管線執行作業分組在 Context 下。

interactive_context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/metadata.sqlite.

建構 TFX 管線

TFX 管線包含多個元件,這些元件會執行 ML 工作流程的不同層面。在這個筆記本中,您會建立及執行 ExampleGenStatisticsGenSchemaGenTrainer 元件,並使用 EvaluatorPusher 元件來評估及推送已訓練模型。

如要進一步瞭解 TFX 管線元件,請參閱元件教學課程

例項化並執行 ExampleGen 元件

example_gen = tfx.components.CsvExampleGen(input_base=_data_root)
interactive_context.run(example_gen)
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.

例項化並執行 StatisticsGen 元件

statistics_gen = tfx.components.StatisticsGen(
    examples=example_gen.outputs['examples'])
interactive_context.run(statistics_gen)

例項化並執行 SchemaGen 元件

infer_schema = tfx.components.SchemaGen(
    statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
interactive_context.run(infer_schema)

例項化並執行 Trainer 元件

# Define the module file for the Trainer component
trainer_module_file = 'penguin_trainer.py'
%%writefile {trainer_module_file}

# Define the training algorithm for the Trainer module file
import os
from typing import List, Text

import tensorflow as tf
from tensorflow import keras

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

# Features used for classification - culmen length and depth, flipper length,
# body mass, and species.

_LABEL_KEY = 'species'

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]


def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema, batch_size: int) -> tf.data.Dataset:
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY), schema).repeat()


def _build_keras_model():
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  d = keras.layers.Dense(8, activation='relu')(d)
  d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)
  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])
  return model


def run_fn(fn_args: tfx.components.FnArgs):
  schema = schema_pb2.Schema()
  tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema)
  train_dataset = _input_fn(
      fn_args.train_files, fn_args.data_accessor, schema, batch_size=10)
  eval_dataset = _input_fn(
      fn_args.eval_files, fn_args.data_accessor, schema, batch_size=10)
  model = _build_keras_model()
  model.fit(
      train_dataset,
      epochs=int(fn_args.train_steps / 20),
      steps_per_epoch=20,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

執行 Trainer 元件。

trainer = tfx.components.Trainer(
    module_file=os.path.abspath(trainer_module_file),
    examples=example_gen.outputs['examples'],
    schema=infer_schema.outputs['schema'],
    train_args=tfx.proto.TrainArgs(num_steps=100),
    eval_args=tfx.proto.EvalArgs(num_steps=50))
interactive_context.run(trainer)
running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying penguin_trainer.py -> build/lib
installing to /tmpfs/tmp/tmp2bjhph4h
running install
running install_lib
copying build/lib/penguin_trainer.py -> /tmpfs/tmp/tmp2bjhph4h
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmpfs/tmp/tmp2bjhph4h/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4-py3.9.egg-info
running install_scripts
creating /tmpfs/tmp/tmp2bjhph4h/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/WHEEL
creating '/tmpfs/tmp/tmp1r3ydm1_/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4-py3-none-any.whl' and adding '/tmpfs/tmp/tmp2bjhph4h' to it
adding 'penguin_trainer.py'
adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/METADATA'
adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/WHEEL'
adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/top_level.txt'
adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/RECORD'
removing /tmpfs/tmp/tmp2bjhph4h
/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/setuptools/_distutils/cmd.py:66: SetuptoolsDeprecationWarning: setup.py install is deprecated.
!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()
Processing /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/_wheels/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4-py3-none-any.whl
Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx_bsl/tfxio/tf_example_record.py:343: parse_example_dataset (from tensorflow.python.data.experimental.ops.parsing_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.map(tf.io.parse_example(...))` instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx_bsl/tfxio/tf_example_record.py:343: parse_example_dataset (from tensorflow.python.data.experimental.ops.parsing_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.map(tf.io.parse_example(...))` instead.
Epoch 1/5
WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1714473175.420733  172568 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.
20/20 [==============================] - 2s 17ms/step - loss: 0.9629 - sparse_categorical_accuracy: 0.7000 - val_loss: 0.8934 - val_sparse_categorical_accuracy: 0.7600
Epoch 2/5
20/20 [==============================] - 0s 9ms/step - loss: 0.7868 - sparse_categorical_accuracy: 0.7650 - val_loss: 0.7069 - val_sparse_categorical_accuracy: 0.7700
Epoch 3/5
20/20 [==============================] - 0s 9ms/step - loss: 0.5864 - sparse_categorical_accuracy: 0.8150 - val_loss: 0.5397 - val_sparse_categorical_accuracy: 0.7800
Epoch 4/5
20/20 [==============================] - 0s 10ms/step - loss: 0.4492 - sparse_categorical_accuracy: 0.8150 - val_loss: 0.4520 - val_sparse_categorical_accuracy: 0.7800
Epoch 5/5
20/20 [==============================] - 0s 9ms/step - loss: 0.4016 - sparse_categorical_accuracy: 0.7900 - val_loss: 0.3730 - val_sparse_categorical_accuracy: 0.8200
INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/Trainer/model/4/Format-Serving/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/Trainer/model/4/Format-Serving/assets

評估及推送模型

使用 Evaluator 元件評估模型並「祝福」模型,然後再使用 Pusher 元件將模型推送至服務目錄。

_serving_model_dir = os.path.join(tempfile.mkdtemp(),
                                  'serving_model/penguins_classification')
eval_config = tfma.EvalConfig(
    model_specs=[
        tfma.ModelSpec(label_key='species', signature_name='serving_default')
    ],
    metrics_specs=[
        tfma.MetricsSpec(metrics=[
            tfma.MetricConfig(
                class_name='SparseCategoricalAccuracy',
                threshold=tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.6})))
        ])
    ],
    slicing_specs=[tfma.SlicingSpec()])
evaluator = tfx.components.Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    schema=infer_schema.outputs['schema'],
    eval_config=eval_config)
interactive_context.run(evaluator)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_model_analysis/writers/metrics_plots_and_validations_writer.py:112: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_model_analysis/writers/metrics_plots_and_validations_writer.py:112: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
pusher = tfx.components.Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=tfx.proto.PushDestination(
        filesystem=tfx.proto.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
interactive_context.run(pusher)

執行 TFX 管線會填入 MLMD 資料庫。在下一個章節中,您會使用 MLMD API 查詢這個資料庫以取得中繼資料資訊。

查詢 MLMD 資料庫

MLMD 資料庫儲存三種中繼資料

  • 關於管線的中繼資料和與管線元件相關聯的沿襲資訊
  • 關於管線執行期間產生的成品的中繼資料
  • 關於管線執行作業的中繼資料

典型的生產環境管線會在有新資料送達時提供多個模型。當您在提供的模型中遇到錯誤結果時,可以查詢 MLMD 資料庫來隔離錯誤模型。然後,您可以追蹤與這些模型對應的管線元件沿襲,以偵錯模型

使用先前定義的 InteractiveContext 設定中繼資料 (MD) 儲存空間,以查詢 MLMD 資料庫。

connection_config = interactive_context.metadata_connection_config
store = mlmd.MetadataStore(connection_config)

# All TFX artifacts are stored in the base directory
base_dir = connection_config.sqlite.filename_uri.split('metadata.sqlite')[0]

建立一些輔助函式,以檢視來自 MD 儲存空間的資料。

def display_types(types):
  # Helper function to render dataframes for the artifact and execution types
  table = {'id': [], 'name': []}
  for a_type in types:
    table['id'].append(a_type.id)
    table['name'].append(a_type.name)
  return pd.DataFrame(data=table)
def display_artifacts(store, artifacts):
  # Helper function to render dataframes for the input artifacts
  table = {'artifact id': [], 'type': [], 'uri': []}
  for a in artifacts:
    table['artifact id'].append(a.id)
    artifact_type = store.get_artifact_types_by_id([a.type_id])[0]
    table['type'].append(artifact_type.name)
    table['uri'].append(a.uri.replace(base_dir, './'))
  return pd.DataFrame(data=table)
def display_properties(store, node):
  # Helper function to render dataframes for artifact and execution properties
  table = {'property': [], 'value': []}
  for k, v in node.properties.items():
    table['property'].append(k)
    table['value'].append(
        v.string_value if v.HasField('string_value') else v.int_value)
  for k, v in node.custom_properties.items():
    table['property'].append(k)
    table['value'].append(
        v.string_value if v.HasField('string_value') else v.int_value)
  return pd.DataFrame(data=table)

首先,查詢 MD 儲存空間以取得其所有已儲存 ArtifactTypes 的清單。

display_types(store.get_artifact_types())

接下來,查詢所有 PushedModel 成品。

pushed_models = store.get_artifacts_by_type("PushedModel")
display_artifacts(store, pushed_models)

查詢 MD 儲存空間以取得最新推送的模型。本教學課程只有一個推送的模型。

pushed_model = pushed_models[-1]
display_properties(store, pushed_model)

偵錯推送模型的第一步是查看推送了哪個已訓練模型,以及查看使用哪個訓練資料來訓練該模型。

MLMD 提供遍歷 API 來逐步執行沿襲圖,您可以使用這些 API 來分析模型沿襲。

def get_one_hop_parent_artifacts(store, artifacts):
  # Get a list of artifacts within a 1-hop of the artifacts of interest
  artifact_ids = [artifact.id for artifact in artifacts]
  executions_ids = set(
      event.execution_id
      for event in store.get_events_by_artifact_ids(artifact_ids)
      if event.type == mlmd.proto.Event.OUTPUT)
  artifacts_ids = set(
      event.artifact_id
      for event in store.get_events_by_execution_ids(executions_ids)
      if event.type == mlmd.proto.Event.INPUT)
  return [artifact for artifact in store.get_artifacts_by_id(artifacts_ids)]

查詢推送模型的父項成品。

parent_artifacts = get_one_hop_parent_artifacts(store, [pushed_model])
display_artifacts(store, parent_artifacts)

查詢模型的屬性。

exported_model = parent_artifacts[0]
display_properties(store, exported_model)

查詢模型的上游成品。

model_parents = get_one_hop_parent_artifacts(store, [exported_model])
display_artifacts(store, model_parents)

取得模型訓練時使用的訓練資料。

used_data = model_parents[0]
display_properties(store, used_data)

現在您有了模型訓練時使用的訓練資料,請再次查詢資料庫以尋找訓練步驟 (執行作業)。查詢 MD 儲存空間以取得已註冊執行類型清單。

display_types(store.get_execution_types())

訓練步驟是名為 tfx.components.trainer.component.Trainer 的 ExecutionType。遍歷 MD 儲存空間以取得與推送模型對應的訓練器執行作業。

def find_producer_execution(store, artifact):
  executions_ids = set(
      event.execution_id
      for event in store.get_events_by_artifact_ids([artifact.id])
      if event.type == mlmd.proto.Event.OUTPUT)
  return store.get_executions_by_id(executions_ids)[0]

trainer = find_producer_execution(store, exported_model)
display_properties(store, trainer)

摘要

在本教學課程中,您已瞭解如何運用 MLMD 來追蹤 TFX 管線元件的沿襲並解決問題。

如要進一步瞭解如何使用 MLMD,請查看下列其他資源