建構完全自訂元件

本指南說明如何使用 TFX API 建構完全自訂元件。完全自訂元件可讓您透過定義元件規格、執行器和元件介面類別來建構元件。這種方法可讓您重複使用和擴充標準元件,以符合您的需求。

如果您是 TFX 管線的新手,請進一步瞭解 TFX 管線的核心概念

自訂執行器或自訂元件

如果只需要自訂處理邏輯,且元件的輸入、輸出和執行屬性與現有元件相同,則自訂執行器已足夠。當任何輸入、輸出或執行屬性與任何現有 TFX 元件不同時,則需要完全自訂元件。

如何建立自訂元件?

開發完全自訂元件需要

  • 為新元件定義一組輸入和輸出成品規格。特別是,輸入成品的類型應與產生成品的元件的輸出成品類型一致,且輸出成品的類型應與取用成品的元件的輸入成品類型一致 (若有的話)。
  • 新元件所需的非成品執行參數。

ComponentSpec

ComponentSpec 類別透過定義元件的輸入和輸出成品,以及用於元件執行的參數,來定義元件合約。它包含三個部分

  • INPUTS:輸入成品的類型參數字典,這些成品會傳遞至元件執行器。一般來說,輸入成品是來自上游元件的輸出,因此共用相同的類型。
  • OUTPUTS:元件產生的輸出成品的類型參數字典。
  • PARAMETERS:額外 ExecutionParameter 項目的字典,這些項目將傳遞至元件執行器。這些是非成品參數,我們希望在管線 DSL 中彈性定義,並傳遞至執行。

以下是 ComponentSpec 的範例

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

Executor

接下來,為新元件編寫執行器程式碼。基本上,需要建立 base_executor.BaseExecutor 的新子類別,並覆寫其 Do 函式。在 Do 函式中,傳入的引數 input_dictoutput_dictexec_properties 會分別對應至 ComponentSpec 中定義的 INPUTSOUTPUTSPARAMETERS。對於 exec_properties,值可以直接透過字典查閱來擷取。對於 input_dictoutput_dict 中的成品,artifact_utils 類別中提供方便的函式,可用於擷取成品執行個體或成品 URI。

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

單元測試自訂執行器

可以建立自訂執行器的單元測試,類似於這個範例

元件介面

既然最複雜的部分已完成,下一步是將這些部分組合成元件介面,以便在管線中使用元件。有幾個步驟

  • 將元件介面設為 base_component.BaseComponent 的子類別
  • 將類別變數 SPEC_CLASS 指派給先前定義的 ComponentSpec 類別
  • 將類別變數 EXECUTOR_SPEC 指派給先前定義的 Executor 類別
  • 透過使用函式的引數來建構 ComponentSpec 類別的執行個體,並使用該值 (以及選用的名稱) 叫用 super 函式,來定義 __init__() 建構函式

建立元件的執行個體時,將會叫用 base_component.BaseComponent 類別中的類型檢查邏輯,以確保傳入的引數與 ComponentSpec 類別中定義的類型資訊相容。

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

組合成 TFX 管線

最後一步是將新的自訂元件插入 TFX 管線。除了新增新元件的執行個體之外,還需要下列項目

  • 將新元件的上游和下游元件正確連接到它。這可透過在新元件中參考上游元件的輸出,以及在下游元件中參考新元件的輸出來完成
  • 在建構管線時,將新元件執行個體新增至元件清單。

以下範例重點說明了上述變更。完整範例可在 TFX GitHub 存放區中找到。

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

部署完全自訂元件

除了程式碼變更之外,所有新加入的部分 (ComponentSpecExecutor、元件介面) 都需要在管線執行環境中可存取,才能正確執行管線。