在本機端建構 TFX 管線

TFX 可讓您更輕鬆地將機器學習 (ML) 工作流程編排為管線,以便:

  • 將您的 ML 流程自動化,讓您可以定期重新訓練、評估及部署模型。
  • 建立 ML 管線,其中包含模型效能的深入分析,以及新訓練模型的驗證,以確保效能和可靠性。
  • 監控訓練資料中的異常狀況,並消除訓練-服務偏移
  • 透過以不同的超參數集執行管線,提高實驗速度。

典型的管線開發流程始於本機,包括資料分析和元件設定,然後再部署到生產環境。本指南說明在本機端建構管線的兩種方式。

  • 自訂 TFX 管線範本,以符合 ML 工作流程的需求。TFX 管線範本是預先建構的工作流程,示範使用 TFX 標準元件的最佳做法。
  • 使用 TFX 建構管線。在此使用案例中,您定義管線,而無需從範本開始。

在開發管線時,您可以使用 LocalDagRunner 執行管線。接著,一旦管線元件經過完善定義和測試,您就可以使用生產環境級協調器,例如 Kubeflow 或 Airflow。

事前準備

TFX 是 Python 套件,因此您需要設定 Python 開發環境,例如虛擬環境或 Docker 容器。然後

pip install tfx

如果您是 TFX 管線新手,請先進一步瞭解 TFX 管線的核心概念,再繼續。

使用範本建構管線

TFX 管線範本提供一組預先建構的管線定義,您可以針對自己的使用案例自訂,讓您更輕鬆地開始管線開發。

以下章節說明如何建立範本副本並自訂以符合您的需求。

建立管線範本的副本

  1. 查看可用的 TFX 管線範本清單

    tfx template list
    
  2. 從清單中選取範本

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    取代下列項目

    • template:您要複製的範本名稱。
    • pipeline-name:要建立的管線名稱。
    • destination-path:要將範本複製到其中的路徑。

    進一步瞭解 tfx template copy 指令

  3. 管線範本的副本已在您指定的路徑中建立。

探索管線範本

本節概述範本建立的基礎結構。

  1. 探索已複製到管線根目錄的目錄和檔案

    • 包含下列項目的 pipeline 目錄:
      • pipeline.py - 定義管線,並列出正在使用的元件
      • configs.py - 保留組態詳細資料,例如資料來源或正在使用的協調器
    • data 目錄
      • 這通常包含 data.csv 檔案,這是 ExampleGen 的預設來源。configs.py 中可以變更資料來源。
    • models 目錄,其中包含預先處理程式碼和模型實作

    • 範本會複製本機環境和 Kubeflow 的 DAG 執行器。

    • 部分範本也包含 Python Notebook,方便您使用機器學習中繼資料探索資料和成品。

  2. 在您的管線目錄中執行下列指令

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    此指令會使用 LocalDagRunner 建立管線執行,這會將下列目錄新增至您的管線

    • tfx_metadata 目錄,其中包含在本機端使用的 ML 中繼資料儲存區。
    • tfx_pipeline_output 目錄,其中包含管線的檔案輸出。
  3. 開啟管線的 pipeline/configs.py 檔案並檢閱內容。此指令碼定義管線和元件函式使用的組態選項。您可以在此指定資料來源位置或執行中的訓練步驟數等項目。

  4. 開啟管線的 pipeline/pipeline.py 檔案並檢閱內容。此指令碼會建立 TFX 管線。最初,管線僅包含 ExampleGen 元件。

    • 按照 pipeline.pyTODO 註解中的操作說明,將更多步驟新增至管線。
  5. 開啟 local_runner.py 檔案並檢閱內容。此指令碼會建立管線執行,並指定執行的參數,例如 data_pathpreprocessing_fn

  6. 您已檢閱範本建立的基礎結構,並使用 LocalDagRunner 建立管線執行。接下來,自訂範本以符合您的需求。

自訂管線

本節概述如何開始自訂範本。

  1. 設計您的管線。範本提供的基礎結構可協助您使用 TFX 標準元件實作表格資料的管線。如果您要將現有的 ML 工作流程移至管線,可能需要修訂程式碼,才能充分利用TFX 標準元件。您可能也需要建立自訂元件,以實作工作流程獨有的功能,或 TFX 標準元件尚不支援的功能。

  2. 設計管線後,請使用下列流程以疊代方式自訂管線。從將資料擷取到管線中的元件開始,通常是 ExampleGen 元件。

    1. 自訂管線或元件以符合您的使用案例。這些自訂項目可能包含如下變更:

      • 變更管線參數。
      • 在管線中新增或移除元件。
      • 取代資料輸入來源。此資料來源可以是檔案,也可以是服務 (例如 BigQuery) 的查詢。
      • 變更管線中元件的組態。
      • 變更元件的自訂函式。
    2. 使用 local_runner.py 指令碼在本機端執行元件,如果您使用不同的協調器,則使用另一個適當的 DAG 執行器。如果指令碼失敗,請偵錯失敗並重試執行指令碼。

    3. 一旦此自訂項目運作正常,請繼續進行下一個自訂項目。

  3. 以疊代方式工作,您可以自訂範本工作流程中的每個步驟,以符合您的需求。

建構自訂管線

使用下列操作說明進一步瞭解如何在不使用範本的情況下建構自訂管線。

  1. 設計您的管線。TFX 標準元件提供經過驗證的功能,可協助您實作完整的 ML 工作流程。如果您要將現有的 ML 工作流程移至管線,可能需要修訂程式碼,才能充分利用 TFX 標準元件。您可能也需要建立自訂元件,以實作資料擴增等功能。

  2. 使用下列範例建立指令碼檔案,以定義您的管線。本指南將此檔案稱為 my_pipeline.py

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    在接下來的步驟中,您將在 create_pipeline 中定義管線,並使用本機執行器在本機端執行管線。

    使用下列流程以疊代方式建構管線。

    1. 自訂管線或元件以符合您的使用案例。這些自訂項目可能包含如下變更:

      • 變更管線參數。
      • 在管線中新增或移除元件。
      • 取代資料輸入檔案。
      • 變更管線中元件的組態。
      • 變更元件的自訂函式。
    2. 使用本機執行器在本機端執行元件,或直接執行指令碼。如果指令碼失敗,請偵錯失敗並重試執行指令碼。

    3. 一旦此自訂項目運作正常,請繼續進行下一個自訂項目。

    從管線工作流程中的第一個節點開始,通常第一個節點會將資料擷取到管線中。

  3. 將工作流程中的第一個節點新增至管線。在此範例中,管線使用 ExampleGen 標準元件從 ./data 目錄載入 CSV。

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen 會使用指定資料路徑中 CSV 的資料,建立序列化範例記錄。方法是使用資料根目錄設定 CsvExampleGen 元件的 input_base 參數。

  4. 在與 my_pipeline.py 相同的目錄中建立 data 目錄。將小型 CSV 檔案新增至 data 目錄。

  5. 使用下列指令執行 my_pipeline.py 指令碼。

    python my_pipeline.py
    

    結果應如下所示

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. 繼續以疊代方式將元件新增至管線。