Apache Beam 提供一個框架,用於執行在各種執行引擎上執行的批次和串流資料處理作業。多個 TFX 程式庫使用 Beam 來執行工作,這可在運算叢集之間實現高度擴充性。Beam 包含對各種執行引擎或「執行器」的支援,包括直接執行器,該執行器在單一運算節點上執行,並且對於開發、測試或小型部署非常有用。Beam 提供一個抽象層,使 TFX 能夠在任何受支援的執行器上執行,而無需修改程式碼。TFX 使用 Beam Python API,因此它僅限於 Python API 支援的執行器。
部署與擴充性
隨著工作負載需求增加,Beam 可以擴充到大型運算叢集之間的大規模部署。這僅受限於底層執行器的擴充性。大型部署中的執行器通常會部署到容器協調系統 (例如 Kubernetes 或 Apache Mesos),以自動化應用程式部署、擴充和管理。
請參閱 Apache Beam 文件,以取得關於 Apache Beam 的更多資訊。
對於 Google Cloud 使用者,建議使用 Dataflow 執行器,它透過資源自動擴充、動態工作負載重新平衡、與其他 Google Cloud 服務的深度整合、內建安全性和監控,提供無伺服器且符合成本效益的平台。
自訂 Python 程式碼和依附元件
在 TFX 管道中使用 Beam 的一個顯著複雜性是處理自訂程式碼和/或額外 Python 模組所需的依附元件。以下是一些可能發生此問題的範例:
- preprocessing_fn 需要參照使用者自己的 Python 模組
- Evaluator 元件的自訂擷取器
- 從 TFX 元件子類別化的自訂模組
TFX 依賴 Beam 對 管理 Python 管道依附元件 的支援來處理 Python 依附元件。目前有兩種方法可以管理此問題:
- 以來源套件形式提供 Python 程式碼和依附元件
- [僅限 Dataflow] 使用容器映像檔作為工作站
接下來將討論這些內容。
以來源套件形式提供 Python 程式碼和依附元件
建議以下使用者使用此方法:
- 熟悉 Python 封裝,且
- 僅使用 Python 原始程式碼 (即沒有 C 模組或共用程式庫)。
請遵循 管理 Python 管道依附元件 中的路徑之一,使用以下 beam_pipeline_args 之一來提供此資訊:
- --setup_file
- --extra_package
- --requirements_file
注意:在上述任何情況下,請確保 tfx
的版本與列為依附元件的版本相同。
[僅限 Dataflow] 使用容器映像檔作為工作站
TFX 0.26.0 及更高版本實驗性支援使用 自訂容器映像檔 作為 Dataflow 工作站。
若要使用此功能,您必須:
- 建構一個 Docker 映像檔,其中預先安裝了
tfx
和使用者的自訂程式碼與依附元件。- 對於 (1) 使用
tfx>=0.26
和 (2) 使用 Python 3.7 開發管道的使用者來說,最簡單的方法是擴充對應版本的官方tensorflow/tfx
映像檔
- 對於 (1) 使用
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.
ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
- 將建構的映像檔推送至 Dataflow 使用的專案可存取的容器映像檔登錄檔。
- Google Cloud 使用者可以考慮使用 Cloud Build,它能很好地自動執行上述步驟。
- 提供以下
beam_pipeline_args
:
beam_pipeline_args.extend([
'--runner=DataflowRunner',
'--project={project-id}',
'--worker_harness_container_image={image-ref}',
'--experiments=use_runner_v2',
])
TODO(b/171733562):一旦 use_runner_v2 成為 Dataflow 的預設選項,請移除它。
TODO(b/179738639):在 https://issues.apache.org/jira/browse/BEAM-5440 之後,建立關於如何在本地測試自訂容器的文件
Beam 管道引數
多個 TFX 元件依賴 Beam 進行分散式資料處理。它們透過 beam_pipeline_args
進行設定,這是在管道建立期間指定的
my_pipeline = Pipeline(
...,
beam_pipeline_args=[...])
TFX 0.30 及更高版本新增了一個介面 with_beam_pipeline_args
,用於擴充每個元件的管道層級 Beam 引數
example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])