Apache Beam 和 TFX

Apache Beam 提供一個框架,用於執行在各種執行引擎上執行的批次和串流資料處理作業。多個 TFX 程式庫使用 Beam 來執行工作,這可在運算叢集之間實現高度擴充性。Beam 包含對各種執行引擎或「執行器」的支援,包括直接執行器,該執行器在單一運算節點上執行,並且對於開發、測試或小型部署非常有用。Beam 提供一個抽象層,使 TFX 能夠在任何受支援的執行器上執行,而無需修改程式碼。TFX 使用 Beam Python API,因此它僅限於 Python API 支援的執行器。

部署與擴充性

隨著工作負載需求增加,Beam 可以擴充到大型運算叢集之間的大規模部署。這僅受限於底層執行器的擴充性。大型部署中的執行器通常會部署到容器協調系統 (例如 Kubernetes 或 Apache Mesos),以自動化應用程式部署、擴充和管理。

請參閱 Apache Beam 文件,以取得關於 Apache Beam 的更多資訊。

對於 Google Cloud 使用者,建議使用 Dataflow 執行器,它透過資源自動擴充、動態工作負載重新平衡、與其他 Google Cloud 服務的深度整合、內建安全性和監控,提供無伺服器且符合成本效益的平台。

自訂 Python 程式碼和依附元件

在 TFX 管道中使用 Beam 的一個顯著複雜性是處理自訂程式碼和/或額外 Python 模組所需的依附元件。以下是一些可能發生此問題的範例:

  • preprocessing_fn 需要參照使用者自己的 Python 模組
  • Evaluator 元件的自訂擷取器
  • 從 TFX 元件子類別化的自訂模組

TFX 依賴 Beam 對 管理 Python 管道依附元件 的支援來處理 Python 依附元件。目前有兩種方法可以管理此問題:

  1. 以來源套件形式提供 Python 程式碼和依附元件
  2. [僅限 Dataflow] 使用容器映像檔作為工作站

接下來將討論這些內容。

以來源套件形式提供 Python 程式碼和依附元件

建議以下使用者使用此方法:

  1. 熟悉 Python 封裝,且
  2. 僅使用 Python 原始程式碼 (即沒有 C 模組或共用程式庫)。

請遵循 管理 Python 管道依附元件 中的路徑之一,使用以下 beam_pipeline_args 之一來提供此資訊:

  • --setup_file
  • --extra_package
  • --requirements_file

注意:在上述任何情況下,請確保 tfx 的版本與列為依附元件的版本相同。

[僅限 Dataflow] 使用容器映像檔作為工作站

TFX 0.26.0 及更高版本實驗性支援使用 自訂容器映像檔 作為 Dataflow 工作站。

若要使用此功能,您必須:

  • 建構一個 Docker 映像檔,其中預先安裝了 tfx 和使用者的自訂程式碼與依附元件。
    • 對於 (1) 使用 tfx>=0.26 和 (2) 使用 Python 3.7 開發管道的使用者來說,最簡單的方法是擴充對應版本的官方 tensorflow/tfx 映像檔
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.

ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
  • 將建構的映像檔推送至 Dataflow 使用的專案可存取的容器映像檔登錄檔。
    • Google Cloud 使用者可以考慮使用 Cloud Build,它能很好地自動執行上述步驟。
  • 提供以下 beam_pipeline_args
beam_pipeline_args.extend([
    '--runner=DataflowRunner',
    '--project={project-id}',
    '--worker_harness_container_image={image-ref}',
    '--experiments=use_runner_v2',
])

TODO(b/171733562):一旦 use_runner_v2 成為 Dataflow 的預設選項,請移除它。

TODO(b/179738639):在 https://issues.apache.org/jira/browse/BEAM-5440 之後,建立關於如何在本地測試自訂容器的文件

Beam 管道引數

多個 TFX 元件依賴 Beam 進行分散式資料處理。它們透過 beam_pipeline_args 進行設定,這是在管道建立期間指定的

my_pipeline = Pipeline(
    ...,
    beam_pipeline_args=[...])

TFX 0.30 及更高版本新增了一個介面 with_beam_pipeline_args,用於擴充每個元件的管道層級 Beam 引數

example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])