TensorFlow Transform 入門

本指南介紹 tf.Transform 的基本概念,以及如何使用這些概念。本指南將:

  • 定義預先處理函式,也就是將原始資料轉換為用於訓練機器學習模型的資料的管線邏輯描述。
  • 展示用於轉換資料的 Apache Beam 實作,方法是將預先處理函式轉換為 Beam 管線
  • 展示其他使用範例。

設定

pip install -U tensorflow_transform
pip install pyarrow
import pkg_resources
import importlib
importlib.reload(pkg_resources)
<module 'pkg_resources' from '/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/pkg_resources/__init__.py'>
import os
import tempfile

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

from tfx_bsl.public import tfxio
2023-04-13 09:15:54.685940: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-04-13 09:15:54.686060: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
2023-04-13 09:15:54.686073: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.

定義預先處理函式

預先處理函式tf.Transform 最重要的概念。預先處理函式是資料集轉換的邏輯描述。預先處理函式接受並傳回張量字典,其中張量是指 TensorSparseTensor。有兩種函式可用來定義預先處理函式:

  1. 任何接受並傳回張量的函式。這些函式會將 TensorFlow 運算新增至圖表中,以便將原始資料轉換為已轉換的資料。
  2. tf.Transform 提供的任何分析器。分析器也會接受並傳回張量,但與 TensorFlow 函式不同的是,分析器不會將運算新增至圖表中。相反地,分析器會導致 tf.Transform 在 TensorFlow 外部計算完整傳遞運算。分析器會使用整個資料集的輸入張量值,產生常數張量,並以輸出形式傳回。例如,tft.min 會計算資料集上張量的最小值。tf.Transform 提供一組固定的分析器,但在未來版本中會擴充。

預先處理函式範例

透過結合分析器和一般 TensorFlow 函式,使用者可以建立彈性的管線來轉換資料。下列預先處理函式會以不同方式轉換三個特徵中的每一個特徵,並結合其中兩個特徵:

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

在此範例中,xys 是代表輸入特徵的 Tensor。建立的第一個新張量 x_centered 是透過將 tft.mean 套用至 x,然後從 x 中減去此值來建置。tft.mean(x) 會傳回代表張量 x 平均值的張量。x_centered 是減去平均值的張量 x

第二個新張量 y_normalized 的建立方式類似,但使用便利方法 tft.scale_to_0_1。此方法的功能與計算 x_centered 類似,也就是計算最大值和最小值,並使用這些值來縮放 y

張量 s_integerized 顯示字串操控的範例。在此範例中,我們取得字串並將其對應至整數。這會使用便利函式 tft.compute_and_apply_vocabulary。此函式會使用分析器來計算輸入字串採用的唯一值,然後使用 TensorFlow 運算將輸入字串轉換為唯一值表格中的索引。

最後一欄顯示可以使用 TensorFlow 運算來結合張量,藉此建立新特徵。

預先處理函式定義資料集上的一系列運算。為了套用管線,我們仰賴 tf.Transform API 的具體實作。Apache Beam 實作提供 PTransform,可將使用者的預先處理函式套用至資料。tf.Transform 使用者的典型工作流程是建構預先處理函式,然後將其併入較大的 Beam 管線中,藉此建立用於訓練的資料。

批次處理

批次處理是 TensorFlow 的重要環節。tf.Transform 的目標之一是提供 TensorFlow 圖表以進行預先處理,以便將其併入服務圖表中 (以及選擇性地併入訓練圖表中),因此批次處理也是 tf.Transform 中的重要概念。

雖然在上述範例中不明顯,但使用者定義的預先處理函式會傳遞代表批次而非個別執行個體的張量,這與使用 TensorFlow 進行訓練和服務時的情況相同。另一方面,分析器會對整個資料集執行運算,傳回單一值而非批次值。x 是形狀為 (batch_size,)Tensor,而 tft.mean(x) 是形狀為 ()Tensor。減法 x - tft.mean(x) 會廣播,其中 tft.mean(x) 的值會從 x 代表的批次中每個元素減去。

Apache Beam 實作

雖然預先處理函式旨在作為在多個資料處理架構上實作的預先處理管線的邏輯描述,但 tf.Transform 提供在 Apache Beam 上使用的標準實作。此實作示範實作所需的功能。此功能沒有正式的 API,因此每個實作都可以使用適合其特定資料處理架構的 API。

Apache Beam 實作提供兩個 PTransform,用於處理預先處理函式的資料。以下顯示複合 PTransform - tft_beam.AnalyzeAndTransformDataset 的用法:

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec({
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
        's': tf.io.FixedLenFeature([], tf.string),
    }))

with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
  transformed_dataset, transform_fn = (
      (raw_data, raw_data_metadata) |
      tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_transform/tf_utils.py:324: Tensor.experimental_ref (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use ref() instead.
2023-04-13 09:15:56.867283: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_transform/tf_utils.py:324: Tensor.experimental_ref (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use ref() instead.
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpzu0d2pwa.json', '--HistoryManager.hist_file=:memory:']
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpdhm3m_yu/tftransform_tmp/88750e1500194862a87b2f23e04367bc/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpdhm3m_yu/tftransform_tmp/88750e1500194862a87b2f23e04367bc/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpdhm3m_yu/tftransform_tmp/8fad0af5a26242cc9733a752a7652277/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpdhm3m_yu/tftransform_tmp/8fad0af5a26242cc9733a752a7652277/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
transformed_data, transformed_metadata = transformed_dataset

transformed_data 內容如下所示,包含與原始資料格式相同的轉換後欄位。特別是,s_integerized 的值為 [0, 1, 0],這些值取決於單字 helloworld 如何對應至整數,這是具決定性的。對於欄位 x_centered,我們減去了平均值,因此欄位 x 的值 (原本為 [1.0, 2.0, 3.0]) 變成了 [-1.0, 0.0, 1.0]。同樣地,其餘欄位也符合預期值。

transformed_data
[{'s_integerized': 0,
  'x_centered': -1.0,
  'x_centered_times_y_normalized': -0.0,
  'y_normalized': 0.0},
 {'s_integerized': 1,
  'x_centered': 0.0,
  'x_centered_times_y_normalized': 0.0,
  'y_normalized': 0.5},
 {'s_integerized': 0,
  'x_centered': 1.0,
  'x_centered_times_y_normalized': 1.0,
  'y_normalized': 1.0}]

raw_datatransformed_data 都是資料集。接下來兩節說明 Beam 實作如何表示資料集,以及如何從磁碟讀取資料和將資料寫入磁碟。另一個傳回值 transform_fn 代表套用至資料的轉換,詳情如下。

tft_beam.AnalyzeAndTransformDataset 類別是實作提供的兩個基本轉換 tft_beam.AnalyzeDatasettft_beam.TransformDataset 的組合。因此,下列兩個程式碼片段是等效的:

my_data = (raw_data, raw_data_metadata)
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
  transformed_data, transform_fn = (
      my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpzu0d2pwa.json', '--HistoryManager.hist_file=:memory:']
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8afa0l99/tftransform_tmp/8dc250e431e848a386d53f050ae886df/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8afa0l99/tftransform_tmp/8dc250e431e848a386d53f050ae886df/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8afa0l99/tftransform_tmp/46d2e23e8b9745219e9812f9b7f5aee1/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8afa0l99/tftransform_tmp/46d2e23e8b9745219e9812f9b7f5aee1/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
  transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
  transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpzu0d2pwa.json', '--HistoryManager.hist_file=:memory:']
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpoezjiky4/tftransform_tmp/2f6feb69b15d4a429fa4f56dd7fb02a3/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpoezjiky4/tftransform_tmp/2f6feb69b15d4a429fa4f56dd7fb02a3/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpoezjiky4/tftransform_tmp/26cbcc6000e947c798b5af9ad57c0b42/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpoezjiky4/tftransform_tmp/26cbcc6000e947c798b5af9ad57c0b42/assets
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpzu0d2pwa.json', '--HistoryManager.hist_file=:memory:']
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.

transform_fn 是純函式,代表套用至資料集每一列的運算。特別是,分析器值已計算並視為常數。在此範例中,transform_fn 包含欄位 x 的平均值、欄位 y 的最小值和最大值,以及用於將字串對應至整數的詞彙表。

tf.Transform 的重要功能是 transform_fn 代表跨列的對應,也就是套用至每個列的純函式。用於彙總列的所有計算都在 AnalyzeDataset 中完成。此外,transform_fn 會表示為 TensorFlow Graph,可以嵌入至服務圖表中。

在此特殊情況下,提供了 AnalyzeAndTransformDataset 以進行最佳化。這與 scikit-learn 中使用的模式相同,提供 fittransformfit_transform 方法。

資料格式和結構定義

TFT Beam 實作接受兩種不同的輸入資料格式。「執行個體字典」格式 (如上述範例和 simple.ipynbsimple_example.py 中所示) 是一種直覺式格式,適用於小型資料集,而 TFXIO (Apache Arrow) 格式提供更佳的效能,適用於大型資料集。

伴隨 PCollection 的「中繼資料」會告知 Beam 實作 PCollection 的格式。

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • 如果 raw_data_metadatadataset_metadata.DatasetMetadata (請參閱下方的「『執行個體字典』格式」一節),則 raw_data 預期為「執行個體字典」格式。
  • 如果 raw_data_metadatatfxio.TensorAdapterConfig (請參閱下方的「TFXIO 格式」一節),則 raw_data 預期為 TFXIO 格式。

「執行個體字典」格式

先前的程式碼範例使用此格式。中繼資料包含結構定義,其中定義資料的版面配置,以及如何從各種格式讀取資料和將資料寫入各種格式。即使是這種記憶體內格式也不是自我描述的,也需要結構定義才能解譯為張量。

再次強調,以下是範例資料的結構定義:

import tensorflow_transform as tft

raw_data_metadata = tft.DatasetMetadata.from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    })

Schema Proto 包含將資料從磁碟或記憶體內格式剖析為張量所需資訊。通常會透過呼叫 schema_utils.schema_from_feature_spec 以及將特徵鍵對應至 tf.io.FixedLenFeaturetf.io.VarLenFeaturetf.io.SparseFeature 值來建構。詳情請參閱 tf.parse_example 的說明文件。

在上方範例中,我們使用 tf.io.FixedLenFeature 表示每個特徵都包含固定數量的值,在此範例中為單一純量值。由於 tf.Transform 會批次處理執行個體,因此代表特徵的實際 Tensor 形狀會是 (None,),其中未知維度是批次維度。

TFXIO 格式

使用此格式時,資料預期會包含在 pyarrow.RecordBatch 中。對於表格資料,我們的 Apache Beam 實作接受由下列類型欄位組成的 Arrow RecordBatch

  • pa.list_(<primitive>),其中 <primitive>pa.int64()pa.float32() pa.binary()pa.large_binary()

  • pa.large_list(<primitive>)

我們在上方使用的玩具輸入資料集,以 RecordBatch 表示時,看起來如下所示:

import pyarrow as pa

raw_data = [
    pa.record_batch(
    data=[
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ],
    names=['x', 'y', 's'])
]

與伴隨「執行個體字典」格式的 dataset_metadata.DatasetMetadata 執行個體類似,tfxio.TensorAdapterConfig 必須伴隨 RecordBatch。它由 RecordBatch 的 Arrow 結構定義和 tfxio.TensorRepresentations 組成,以唯一判斷 RecordBatch 中的欄位如何解譯為 TensorFlow 張量 (包括但不限於 tf.Tensortf.SparseTensor)。

tfxio.TensorRepresentationsDict[str, tensorflow_metadata.proto.v0.schema_pb2.TensorRepresentation] 的類型別名,可建立 preprocessing_fn 接受的張量與 RecordBatch 中欄位之間的關係。例如:

from google.protobuf import text_format
from tensorflow_metadata.proto.v0 import schema_pb2

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }""",
        schema_pb2.TensorRepresentation())
}

表示 preprocessing_fn 中的 inputs['x'] 應為密集 tf.Tensor,其值來自輸入 RecordBatch 中名為 'col1' 的欄位,且其 (批次) 形狀應為 [batch_size, 2]

schema_pb2.TensorRepresentation 是在 TensorFlow Metadata 中定義的 Protobuf。

與 TensorFlow 的相容性

tf.Transform 支援將 transform_fn 匯出為 SavedModel,範例請參閱 simple tutorial。在 0.30 版本之前的預設行為是匯出 TF 1.x SavedModel。從 0.30 版本開始,預設行為是匯出 TF 2.x SavedModel,除非明確停用 TF 2.x 行為 (透過呼叫 tf.compat.v1.disable_v2_behavior())。

如果使用 TF 1.x 概念 (例如 tf.estimatortf.Sessions),您可以透過將 force_tf_compat_v1=True 傳遞至 tft_beam.Context (如果將 tf.Transform 作為獨立程式庫使用) 或 TFX 中的 Transform 元件,來保留先前的行為。

transform_fn 匯出為 TF 2.x SavedModel 時,preprocessing_fn 預期可使用 tf.function 追蹤。此外,如果遠端執行管線 (例如使用 DataflowRunner),請確保 preprocessing_fn 和任何依附元件已正確封裝,如此處所述。

使用 tf.Transform 匯出 TF 2.x SavedModel 的已知問題已記錄在此處

搭配 Apache Beam 的輸入和輸出

到目前為止,我們已在 Python 清單 (的 RecordBatch 或執行個體字典) 中看到輸入和輸出資料。這是簡化,仰賴 Apache Beam 處理清單的能力,以及其主要資料表示法 PCollection

PCollection 是構成 Beam 管線一部分的資料表示法。Beam 管線是透過套用各種 PTransform (包括 AnalyzeDatasetTransformDataset) 並執行管線來形成。PCollection 不是在主要二進位檔的記憶體中建立,而是在工作站之間散佈 (雖然本節使用記憶體內執行模式)。

預先封裝的 PCollection 來源 (TFXIO)

我們的實作接受的 RecordBatch 格式是其他 TFX 程式庫接受的通用格式。因此,TFX 提供便利的「來源」(又名 TFXIO),可讀取磁碟上各種格式的檔案並產生 RecordBatch,也可以提供 tfxio.TensorAdapterConfig,包括推論的 tfxio.TensorRepresentations

這些 TFXIO 可以在 tfx_bsl 套件中找到 (tfx_bsl.public.tfxio)。

範例:「普查收入」資料集

下列範例需要讀取和寫入磁碟上的資料,並將資料表示為 PCollection (而非清單),請參閱:census_example.py。以下說明如何下載資料並執行此範例。「普查收入」資料集由 UCI 機器學習儲存庫提供。此資料集包含類別和數值資料。

以下是一些下載和預覽此資料的程式碼:

wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data
--2023-04-13 09:16:10--  https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.203.128, 74.125.141.128, 142.250.98.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.203.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3974305 (3.8M) [application/octet-stream]
Saving to: ‘adult.data’

adult.data          100%[===================>]   3.79M  --.-KB/s    in 0.02s   

2023-04-13 09:16:10 (153 MB/s) - ‘adult.data’ saved [3974305/3974305]
import pandas as pd

train_data_file = "adult.data"

以下儲存格中隱藏了一些設定程式碼。

pd.read_csv(train_data_file, names = ORDERED_CSV_COLUMNS).head()

資料集的欄位可以是類別或數值。此資料集描述分類問題:預測個人年收入是否超過 5 萬美元的最後一欄。不過,從 tf.Transform 的角度來看,此標籤只是另一個類別欄位。

我們使用預先封裝的 tfxio.BeamRecordCsvTFXIO 將 CSV 行轉換為 RecordBatchTFXIO 需要兩個重要的資訊:

  • TensorFlow Metadata 結構定義 tfmd.proto.v0.shema_pb2,其中包含每個 CSV 欄位的類型和形狀資訊。schema_pb2.TensorRepresentation 是結構定義的選用部分;如果未提供 (此範例即為如此),則會從類型和形狀資訊推論。您可以透過使用我們提供的輔助函式從 TF 剖析規格轉換 (本範例中顯示),或透過執行 TensorFlow Data Validation,取得結構定義。
  • CSV 檔案中顯示的欄位名稱清單 (依序排列)。請注意,這些名稱必須與結構定義中的特徵名稱相符。
pip install -U -q tfx_bsl
from tfx_bsl.public import tfxio
from tfx_bsl.coders.example_coder import RecordBatchToExamples

import apache_beam as beam
pipeline = beam.Pipeline()

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA)

raw_data = (
    pipeline
    | 'ReadTrainData' >> beam.io.ReadFromText(
        train_data_file, coder=beam.coders.BytesCoder())
    | 'FixCommasTrainData' >> beam.Map(
        lambda line: line.replace(b', ', b','))
    | 'DecodeTrainData' >> csv_tfxio.BeamSource())
raw_data
<PCollection[[21]: DecodeTrainData/RawRecordToRecordBatch/CollectRecordBatchTelemetry/ProfileRecordBatches.None] at 0x7feeaa6fd5b0>

請注意,我們必須在讀取 CSV 行後進行一些額外的修正。否則,我們可以仰賴 tfxio.CsvTFXIO 來處理讀取檔案和轉換為 RecordBatch

csv_tfxio = tfxio.CsvTFXIO(train_data_file,
                           telemetry_descriptors=[], #???
                           column_names=ORDERED_CSV_COLUMNS,
                           schema=SCHEMA)

p2 = beam.Pipeline()
raw_data_2 = p2 | 'TFXIORead' >> csv_tfxio.BeamSource()

此資料集的預先處理與先前的範例類似,不同之處在於預先處理函式是以程式設計方式產生,而不是手動指定每個欄位。在下方的預先處理函式中,NUMERICAL_COLUMNSCATEGORICAL_COLUMNS 是包含數值和類別欄位名稱的清單:

NUM_OOV_BUCKETS = 1

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    outputs[key] = tft.compute_and_apply_vocabulary(
        tf.strings.strip(inputs[key]),
        num_oov_buckets=NUM_OOV_BUCKETS,
        vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  with tf.init_scope():
    # `init_scope` - Only initialize the table once.
    initializer = tf.lookup.KeyValueTensorInitializer(
        keys=['>50K', '<=50K'],
        values=tf.cast(tf.range(2), tf.int64),
        key_dtype=tf.string,
        value_dtype=tf.int64)
    table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

與先前範例的一個差異在於標籤欄位手動指定從字串到索引的對應。因此,'>50' 對應至 0,而 '<=50K' 對應至 1,因為瞭解訓練模型中的哪個索引對應至哪個標籤會很有幫助。

record_batches 變數代表 pyarrow.RecordBatchPCollectiontensor_adapter_configcsv_tfxio 提供,這是從 SCHEMA 推論而來 (最終在此範例中從 TF 剖析規格推論而來)。

最後階段是將轉換後的資料寫入磁碟,其形式與讀取原始資料類似。用於執行此作業的結構定義是 tft_beam.AnalyzeAndTransformDataset 輸出的部分,用於推論輸出資料的結構定義。以下顯示寫入磁碟的程式碼。結構定義是中繼資料的一部分,但在 tf.Transform API 中可互換使用這兩者 (也就是將中繼資料傳遞至 tft.coders.ExampleProtoCoder)。請注意,這會寫入不同的格式。請勿使用 textio.WriteToText,而是使用 Beam 內建的 TFRecord 格式支援,並使用編碼器將資料編碼為 Example Proto。這是用於訓練的更佳格式,如下一節所示。transformed_eval_data_base 提供所寫入個別分片的基礎檔案名稱。

raw_dataset = (raw_data, csv_tfxio.TensorAdapterConfig())
working_dir = tempfile.mkdtemp()
with tft_beam.Context(temp_dir=working_dir):
  transformed_dataset, transform_fn = (
      raw_dataset | tft_beam.AnalyzeAndTransformDataset(
          preprocessing_fn, output_record_batches=True))
output_dir = tempfile.mkdtemp()
transformed_data, _ = transformed_dataset

_ = (
    transformed_data
    | 'EncodeTrainData' >>
    beam.FlatMapTuple(lambda batch, _: RecordBatchToExamples(batch))
    | 'WriteTrainData' >> beam.io.WriteToTFRecord(
        os.path.join(output_dir , 'transformed.tfrecord')))

除了訓練資料外,transform_fn 也會與中繼資料一起寫出:

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(output_dir))

使用 pipeline.run().wait_until_finish() 執行整個 Beam 管線。到目前為止,Beam 管線代表延遲的分散式運算。它提供將執行的指示,但指示尚未執行。最後一次呼叫會執行指定的管線。

result = pipeline.run().wait_until_finish()
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmphiyrst4f/tftransform_tmp/c633cd0eb0c14a2bba2bc6f7ba556ce3/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmphiyrst4f/tftransform_tmp/c633cd0eb0c14a2bba2bc6f7ba556ce3/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmphiyrst4f/tftransform_tmp/9080e8c73e2443fea34d6505feed4129/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmphiyrst4f/tftransform_tmp/9080e8c73e2443fea34d6505feed4129/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.

執行管線後,輸出目錄包含兩個成品:

  • 轉換後的資料和描述資料的中繼資料。
  • 包含產生的 preprocessing_fntf.saved_model
ls {output_dir}
transform_fn  transformed.tfrecord-00000-of-00001  transformed_metadata

若要瞭解如何使用這些成品,請參閱進階預先處理教學課程