本指南說明如何使用 TFX API 建構完全自訂元件。完全自訂元件可讓您透過定義元件規格、執行器和元件介面類別來建構元件。這種方法可讓您重複使用和擴充標準元件,以符合您的需求。
如果您是 TFX 管線的新手,請進一步瞭解 TFX 管線的核心概念。
自訂執行器或自訂元件
如果只需要自訂處理邏輯,且元件的輸入、輸出和執行屬性與現有元件相同,則自訂執行器已足夠。當任何輸入、輸出或執行屬性與任何現有 TFX 元件不同時,則需要完全自訂元件。
如何建立自訂元件?
開發完全自訂元件需要
- 為新元件定義一組輸入和輸出成品規格。特別是,輸入成品的類型應與產生成品的元件的輸出成品類型一致,且輸出成品的類型應與取用成品的元件的輸入成品類型一致 (若有的話)。
- 新元件所需的非成品執行參數。
ComponentSpec
ComponentSpec
類別透過定義元件的輸入和輸出成品,以及用於元件執行的參數,來定義元件合約。它包含三個部分
- INPUTS:輸入成品的類型參數字典,這些成品會傳遞至元件執行器。一般來說,輸入成品是來自上游元件的輸出,因此共用相同的類型。
- OUTPUTS:元件產生的輸出成品的類型參數字典。
- PARAMETERS:額外 ExecutionParameter 項目的字典,這些項目將傳遞至元件執行器。這些是非成品參數,我們希望在管線 DSL 中彈性定義,並傳遞至執行。
以下是 ComponentSpec 的範例
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
Executor
接下來,為新元件編寫執行器程式碼。基本上,需要建立 base_executor.BaseExecutor
的新子類別,並覆寫其 Do
函式。在 Do
函式中,傳入的引數 input_dict
、output_dict
和 exec_properties
會分別對應至 ComponentSpec 中定義的 INPUTS
、OUTPUTS
和 PARAMETERS
。對於 exec_properties
,值可以直接透過字典查閱來擷取。對於 input_dict
和 output_dict
中的成品,artifact_utils 類別中提供方便的函式,可用於擷取成品執行個體或成品 URI。
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
單元測試自訂執行器
可以建立自訂執行器的單元測試,類似於這個範例。
元件介面
既然最複雜的部分已完成,下一步是將這些部分組合成元件介面,以便在管線中使用元件。有幾個步驟
- 將元件介面設為
base_component.BaseComponent
的子類別 - 將類別變數
SPEC_CLASS
指派給先前定義的ComponentSpec
類別 - 將類別變數
EXECUTOR_SPEC
指派給先前定義的 Executor 類別 - 透過使用函式的引數來建構 ComponentSpec 類別的執行個體,並使用該值 (以及選用的名稱) 叫用 super 函式,來定義
__init__()
建構函式
建立元件的執行個體時,將會叫用 base_component.BaseComponent
類別中的類型檢查邏輯,以確保傳入的引數與 ComponentSpec
類別中定義的類型資訊相容。
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
組合成 TFX 管線
最後一步是將新的自訂元件插入 TFX 管線。除了新增新元件的執行個體之外,還需要下列項目
- 將新元件的上游和下游元件正確連接到它。這可透過在新元件中參考上游元件的輸出,以及在下游元件中參考新元件的輸出來完成
- 在建構管線時,將新元件執行個體新增至元件清單。
以下範例重點說明了上述變更。完整範例可在 TFX GitHub 存放區中找到。
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
部署完全自訂元件
除了程式碼變更之外,所有新加入的部分 (ComponentSpec
、Executor
、元件介面) 都需要在管線執行環境中可存取,才能正確執行管線。