本指南介紹 tf.Transform
的基本概念,以及如何使用這些概念。本指南將:
- 定義預先處理函式,也就是將原始資料轉換為用於訓練機器學習模型的資料的管道邏輯描述。
- 展示用於轉換資料的 Apache Beam 實作方式,方法是將預先處理函式轉換為 Beam 管道。
- 展示其他使用範例。
設定
pip install -U tensorflow_transform
pip install pyarrow
import pkg_resources
import importlib
importlib.reload(pkg_resources)
<module 'pkg_resources' from '/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/pkg_resources/__init__.py'>
import os
import tempfile
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils
from tfx_bsl.public import tfxio
2023-04-13 09:15:54.685940: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory 2023-04-13 09:15:54.686060: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory 2023-04-13 09:15:54.686073: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
定義預先處理函式
預先處理函式是 tf.Transform
最重要的概念。預先處理函式是資料集轉換的邏輯描述。預先處理函式接受並傳回張量字典,其中張量表示 Tensor
或 SparseTensor
。以下兩種函式可用於定義預先處理函式:
- 任何接受並傳回張量的函式。這些函式會將 TensorFlow 運算新增至圖形,以將原始資料轉換為已轉換資料。
tf.Transform
提供的任何分析器。分析器也會接受並傳回張量,但與 TensorFlow 函式不同的是,分析器不會將運算新增至圖形。分析器會讓tf.Transform
在 TensorFlow 外部計算完整傳遞運算。分析器會使用整個資料集的輸入張量值,產生一個常數張量做為輸出傳回。例如,tft.min
會計算資料集上張量的最小值。tf.Transform
提供一組固定的分析器,但未來版本將會擴充。
預先處理函式範例
使用者可以結合分析器和一般 TensorFlow 函式,建立彈性的管道來轉換資料。以下預先處理函式會以不同方式轉換三個特徵,並結合其中兩個特徵:
def preprocessing_fn(inputs):
x = inputs['x']
y = inputs['y']
s = inputs['s']
x_centered = x - tft.mean(x)
y_normalized = tft.scale_to_0_1(y)
s_integerized = tft.compute_and_apply_vocabulary(s)
x_centered_times_y_normalized = x_centered * y_normalized
return {
'x_centered': x_centered,
'y_normalized': y_normalized,
'x_centered_times_y_normalized': x_centered_times_y_normalized,
's_integerized': s_integerized
}
在這裡,x
、y
和 s
是代表輸入特徵的 Tensor
。第一個建立的新張量 x_centered
是透過將 tft.mean
套用至 x
,然後從 x
中減去這個值來建構。tft.mean(x)
會傳回一個張量,代表張量 x
的平均值。x_centered
是減去平均值後的張量 x
。
第二個新張量 y_normalized
的建立方式類似,但使用的是便利方法 tft.scale_to_0_1
。這個方法的作用類似於計算 x_centered
,也就是計算最大值和最小值,並使用這些值來縮放 y
。
張量 s_integerized
示範字串操作的範例。在這個範例中,我們會取得字串並將其對應至整數。這使用了便利函式 tft.compute_and_apply_vocabulary
。這個函式會使用分析器來計算輸入字串所採用的唯一值,然後使用 TensorFlow 運算將輸入字串轉換為唯一值表格中的索引。
最後一欄顯示可以使用 TensorFlow 運算,透過結合張量來建立新特徵。
預先處理函式定義了資料集上的一系列運算。為了套用管道,我們仰賴 tf.Transform
API 的具體實作方式。Apache Beam 實作方式提供 PTransform
,可將使用者的預先處理函式套用至資料。tf.Transform
使用者的典型工作流程是建構預先處理函式,然後將其納入較大的 Beam 管道中,藉此建立用於訓練的資料。
批次處理
批次處理是 TensorFlow 的重要環節。由於 tf.Transform
的目標之一是提供可用於預先處理的 TensorFlow 圖形,以便納入服務圖形 (以及選擇性的訓練圖形),因此批次處理也是 tf.Transform
中的重要概念。
雖然在以上範例中不明顯,但使用者定義的預先處理函式會傳遞代表批次的張量,而不是個別執行個體,就像使用 TensorFlow 進行訓練和服務時一樣。另一方面,分析器會針對整個資料集執行運算,傳回單一值,而不是一批值。x
是形狀為 (batch_size,)
的 Tensor
,而 tft.mean(x)
是形狀為 ()
的 Tensor
。減法 x - tft.mean(x)
會進行廣播,其中 tft.mean(x)
的值會從 x
代表的批次中每個元素減去。
Apache Beam 實作方式
雖然預先處理函式的目的是做為在多個資料處理架構上實作的預先處理管道邏輯描述,但 tf.Transform
提供在 Apache Beam 上使用的標準實作方式。這個實作方式示範實作方式所需的功能。這個功能沒有正式的 API,因此每個實作方式都可以使用對其特定資料處理架構來說慣用的 API。
Apache Beam 實作方式提供兩個 PTransform
,用於處理預先處理函式的資料。以下顯示複合 PTransform
的用法 - tft_beam.AnalyzeAndTransformDataset
raw_data = [
{'x': 1, 'y': 1, 's': 'hello'},
{'x': 2, 'y': 2, 's': 'world'},
{'x': 3, 'y': 3, 's': 'hello'}
]
raw_data_metadata = dataset_metadata.DatasetMetadata(
schema_utils.schema_from_feature_spec({
'y': tf.io.FixedLenFeature([], tf.float32),
'x': tf.io.FixedLenFeature([], tf.float32),
's': tf.io.FixedLenFeature([], tf.string),
}))
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transformed_dataset, transform_fn = (
(raw_data, raw_data_metadata) |
tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features. WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_transform/tf_utils.py:324: Tensor.experimental_ref (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version. Instructions for updating: Use ref() instead. 2023-04-13 09:15:56.867283: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_transform/tf_utils.py:324: Tensor.experimental_ref (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version. Instructions for updating: Use ref() instead. WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpzu0d2pwa.json', '--HistoryManager.hist_file=:memory:'] INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpdhm3m_yu/tftransform_tmp/88750e1500194862a87b2f23e04367bc/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpdhm3m_yu/tftransform_tmp/88750e1500194862a87b2f23e04367bc/assets INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpdhm3m_yu/tftransform_tmp/8fad0af5a26242cc9733a752a7652277/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpdhm3m_yu/tftransform_tmp/8fad0af5a26242cc9733a752a7652277/assets INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available.
transformed_data, transformed_metadata = transformed_dataset
transformed_data
內容如下所示,其中包含與原始資料格式相同的轉換後資料欄。尤其是,s_integerized
的值是 [0, 1, 0]
,這些值取決於 hello
和 world
這兩個字詞如何對應至整數,而這是具決定性的。對於 x_centered
欄,我們減去了平均值,因此 x
欄的值 (原本是 [1.0, 2.0, 3.0]
) 變成 [-1.0, 0.0, 1.0]
。同樣地,其餘資料欄也符合其預期值。
transformed_data
[{'s_integerized': 0, 'x_centered': -1.0, 'x_centered_times_y_normalized': -0.0, 'y_normalized': 0.0}, {'s_integerized': 1, 'x_centered': 0.0, 'x_centered_times_y_normalized': 0.0, 'y_normalized': 0.5}, {'s_integerized': 0, 'x_centered': 1.0, 'x_centered_times_y_normalized': 1.0, 'y_normalized': 1.0}]
raw_data
和 transformed_data
都是資料集。接下來兩節將說明 Beam 實作方式如何表示資料集,以及如何將資料讀取和寫入磁碟。另一個傳回值 transform_fn
代表套用至資料的轉換,詳情請見下文。
tft_beam.AnalyzeAndTransformDataset
類別是實作方式提供的兩個基本轉換 (tft_beam.AnalyzeDataset
和 tft_beam.TransformDataset
) 的組合。因此,以下兩個程式碼片段是等效的:
my_data = (raw_data, raw_data_metadata)
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transformed_data, transform_fn = (
my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpzu0d2pwa.json', '--HistoryManager.hist_file=:memory:'] INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8afa0l99/tftransform_tmp/8dc250e431e848a386d53f050ae886df/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8afa0l99/tftransform_tmp/8dc250e431e848a386d53f050ae886df/assets INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8afa0l99/tftransform_tmp/46d2e23e8b9745219e9812f9b7f5aee1/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8afa0l99/tftransform_tmp/46d2e23e8b9745219e9812f9b7f5aee1/assets INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available.
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()
WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpzu0d2pwa.json', '--HistoryManager.hist_file=:memory:'] INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpoezjiky4/tftransform_tmp/2f6feb69b15d4a429fa4f56dd7fb02a3/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpoezjiky4/tftransform_tmp/2f6feb69b15d4a429fa4f56dd7fb02a3/assets INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpoezjiky4/tftransform_tmp/26cbcc6000e947c798b5af9ad57c0b42/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpoezjiky4/tftransform_tmp/26cbcc6000e947c798b5af9ad57c0b42/assets WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:tensorflow:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpzu0d2pwa.json', '--HistoryManager.hist_file=:memory:'] INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available.
transform_fn
是一個純函式,代表套用至資料集每個資料列的運算。尤其是,分析器值已計算並視為常數。在範例中,transform_fn
包含 x
欄的平均值、y
欄的最小值和最大值,以及用於將字串對應至整數的詞彙表做為常數。
tf.Transform
的一個重要功能是,transform_fn
代表跨資料列的地圖,也就是套用至每個資料列的純函式。所有用於彙總資料列的計算都在 AnalyzeDataset
中完成。此外,transform_fn
會表示為 TensorFlow Graph
,可嵌入服務圖形中。
在此特殊情況下,提供 AnalyzeAndTransformDataset
是為了進行最佳化。這與 scikit-learn 中使用的模式相同,提供 fit
、transform
和 fit_transform
方法。
資料格式和結構定義
TFT Beam 實作方式接受兩種不同的輸入資料格式。「執行個體字典」格式 (如以上範例以及 simple.ipynb 和 simple_example.py 中所示) 是一種直覺式的格式,適用於小型資料集,而 TFXIO (Apache Arrow) 格式則提供更優異的效能,適用於大型資料集。
隨附於 PCollection
的「中繼資料」會告知 Beam 實作方式 PCollection
的格式。
(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
- 如果
raw_data_metadata
是dataset_metadata.DatasetMetadata
(請參閱下方的「「執行個體字典」格式」章節),則raw_data
預期會採用「執行個體字典」格式。 - 如果
raw_data_metadata
是tfxio.TensorAdapterConfig
(請參閱下方的「TFXIO 格式」章節),則raw_data
預期會採用 TFXIO 格式。
「執行個體字典」格式
先前的程式碼範例使用此格式。中繼資料包含結構定義,可定義資料的版面配置,以及資料從各種格式讀取和寫入的方式。即使是這種記憶體內格式也不是自我描述的,而且需要結構定義才能解譯為張量。
以下再次說明範例資料的結構定義:
import tensorflow_transform as tft
raw_data_metadata = tft.DatasetMetadata.from_feature_spec({
's': tf.io.FixedLenFeature([], tf.string),
'y': tf.io.FixedLenFeature([], tf.float32),
'x': tf.io.FixedLenFeature([], tf.float32),
})
Schema
Proto 包含將資料從磁碟或記憶體內格式剖析為張量所需資訊。Proto 通常是透過呼叫 schema_utils.schema_from_feature_spec
和將特徵索引鍵對應至 tf.io.FixedLenFeature
、tf.io.VarLenFeature
和 tf.io.SparseFeature
值來建構。詳情請參閱 tf.parse_example
的說明文件。
在上方,我們使用 tf.io.FixedLenFeature
指出每個特徵都包含固定數量的值,在本例中為單一純量值。由於 tf.Transform
會將執行個體分批處理,因此代表特徵的實際 Tensor
形狀會是 (None,)
,其中不明維度是批次維度。
TFXIO 格式
採用此格式時,資料預期會包含在 pyarrow.RecordBatch
中。對於表格資料,我們的 Apache Beam 實作方式接受 Arrow RecordBatch
,其包含下列類型的資料欄:
pa.list_(<primitive>)
,其中<primitive>
是pa.int64()
、pa.float32()
pa.binary()
或pa.large_binary()
。pa.large_list(<primitive>)
我們在上方使用的玩具輸入資料集,在表示為 RecordBatch
時,看起來會像這樣:
import pyarrow as pa
raw_data = [
pa.record_batch(
data=[
pa.array([[1], [2], [3]], pa.list_(pa.float32())),
pa.array([[1], [2], [3]], pa.list_(pa.float32())),
pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
],
names=['x', 'y', 's'])
]
與隨附於「執行個體字典」格式的 dataset_metadata.DatasetMetadata
執行個體類似,tfxio.TensorAdapterConfig
必須隨附於 RecordBatch
。它包含 RecordBatch
的 Arrow 結構定義,以及 tfxio.TensorRepresentations
,以明確判斷 RecordBatch
中的資料欄要如何解譯為 TensorFlow 張量 (包括但不限於 tf.Tensor
、tf.SparseTensor
)。
tfxio.TensorRepresentations
是 Dict[str, tensorflow_metadata.proto.v0.schema_pb2.TensorRepresentation]
的類型別名,可建立 preprocessing_fn
接受的張量與 RecordBatch
中的資料欄之間的關係。例如:
from google.protobuf import text_format
from tensorflow_metadata.proto.v0 import schema_pb2
tensor_representation = {
'x': text_format.Parse(
"""dense_tensor { column_name: "col1" shape { dim { size: 2 } } }""",
schema_pb2.TensorRepresentation())
}
表示 preprocessing_fn
中的 inputs['x']
應為密集 tf.Tensor
,其值來自輸入 RecordBatch
中名為 'col1'
的資料欄,且其 (批次) 形狀應為 [batch_size, 2]
。
schema_pb2.TensorRepresentation
是在 TensorFlow Metadata 中定義的 Protobuf。
與 TensorFlow 相容
tf.Transform
支援將 transform_fn
匯出為 SavedModel,範例請參閱 簡易教學課程。在 0.30
版本之前的預設行為是匯出 TF 1.x SavedModel。從 0.30
版本開始,除非明確停用 TF 2.x 行為 (透過呼叫 tf.compat.v1.disable_v2_behavior()
),否則預設行為是匯出 TF 2.x SavedModel。
如果使用 TF 1.x 概念 (例如 tf.estimator
和 tf.Sessions
),您可以將 force_tf_compat_v1=True
傳遞至 tft_beam.Context
(如果將 tf.Transform
做為獨立程式庫使用) 或 TFX 中的 Transform 元件,藉此保留先前的行為。
將 transform_fn
匯出為 TF 2.x SavedModel 時,preprocessing_fn
預期可使用 tf.function
追蹤。此外,如果遠端執行管道 (例如使用 DataflowRunner
),請確保 preprocessing_fn
和任何依附元件都已正確封裝,如這裡所述。
使用 tf.Transform
匯出 TF 2.x SavedModel 的已知問題記錄在這裡。
搭配 Apache Beam 的輸入和輸出
到目前為止,我們已在 Python 清單 (包含 RecordBatch
或執行個體字典) 中看到輸入和輸出資料。這是一種簡化方式,仰賴 Apache Beam 使用清單以及其主要資料表示法 PCollection
的能力。
PCollection
是資料表示法,構成 Beam 管道的一部分。Beam 管道的形成方式是套用各種 PTransform
,包括 AnalyzeDataset
和 TransformDataset
,然後執行管道。PCollection
不是在主要二進位檔的記憶體中建立,而是在工作站之間分散 (雖然本節使用記憶體內執行模式)。
預先建立的 PCollection
來源 (TFXIO
)
我們的實作方式接受的 RecordBatch
格式是其他 TFX 程式庫接受的常見格式。因此,TFX 提供便利的「來源」(又稱 TFXIO
),可讀取磁碟上各種格式的檔案並產生 RecordBatch
,也可以提供 tfxio.TensorAdapterConfig
,包括推論的 tfxio.TensorRepresentations
。
這些 TFXIO
可以在套件 tfx_bsl
(tfx_bsl.public.tfxio
) 中找到。
範例:「人口普查收入」資料集
以下範例需要讀取和寫入磁碟上的資料,並以 PCollection
(而非清單) 表示資料,請參閱:census_example.py
。下方說明如何下載資料並執行此範例。「人口普查收入」資料集由 UCI Machine Learning Repository 提供。這個資料集包含類別和數值資料。
以下是一些程式碼,可用於下載和預覽此資料:
wget https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data
--2023-04-13 09:16:10-- https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.203.128, 74.125.141.128, 142.250.98.128, ... Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.203.128|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 3974305 (3.8M) [application/octet-stream] Saving to: ‘adult.data’ adult.data 100%[===================>] 3.79M --.-KB/s in 0.02s 2023-04-13 09:16:10 (153 MB/s) - ‘adult.data’ saved [3974305/3974305]
import pandas as pd
train_data_file = "adult.data"
以下儲存格中隱藏了一些設定程式碼。
pd.read_csv(train_data_file, names = ORDERED_CSV_COLUMNS).head()
資料集的資料欄不是類別資料欄,就是數值資料欄。這個資料集說明了一個分類問題:預測個人年收入是否超過 5 萬美元的最後一欄。不過,從 tf.Transform
的角度來看,這個標籤只不過是另一個類別資料欄。
我們使用預先建立的 tfxio.BeamRecordCsvTFXIO
將 CSV 行轉換為 RecordBatch
。TFXIO
需要兩項重要資訊:
- TensorFlow Metadata 結構定義
tfmd.proto.v0.shema_pb2
,其中包含每個 CSV 資料欄的類型和形狀資訊。schema_pb2.TensorRepresentation
是結構定義的選用部分;如果未提供 (本例中即是如此),則會從類型和形狀資訊推論而得。您可以透過使用我們提供的輔助函式從 TF 剖析規格轉換 (本範例中所示),或執行 TensorFlow Data Validation,來取得結構定義。 - 資料欄名稱清單,順序與 CSV 檔中的顯示順序相同。請注意,這些名稱必須與結構定義中的特徵名稱相符。
pip install -U -q tfx_bsl
from tfx_bsl.public import tfxio
from tfx_bsl.coders.example_coder import RecordBatchToExamples
import apache_beam as beam
pipeline = beam.Pipeline()
csv_tfxio = tfxio.BeamRecordCsvTFXIO(
physical_format='text', column_names=ORDERED_CSV_COLUMNS, schema=SCHEMA)
raw_data = (
pipeline
| 'ReadTrainData' >> beam.io.ReadFromText(
train_data_file, coder=beam.coders.BytesCoder())
| 'FixCommasTrainData' >> beam.Map(
lambda line: line.replace(b', ', b','))
| 'DecodeTrainData' >> csv_tfxio.BeamSource())
raw_data
<PCollection[[21]: DecodeTrainData/RawRecordToRecordBatch/CollectRecordBatchTelemetry/ProfileRecordBatches.None] at 0x7feeaa6fd5b0>
請注意,我們必須在讀取 CSV 行之後執行一些額外的修正。否則,我們可以仰賴 tfxio.CsvTFXIO
來處理讀取檔案和轉換為 RecordBatch
這兩項作業
csv_tfxio = tfxio.CsvTFXIO(train_data_file,
telemetry_descriptors=[], #???
column_names=ORDERED_CSV_COLUMNS,
schema=SCHEMA)
p2 = beam.Pipeline()
raw_data_2 = p2 | 'TFXIORead' >> csv_tfxio.BeamSource()
這個資料集的預先處理方式與先前的範例類似,差別在於預先處理函式是以程式設計方式產生,而不是手動指定每個資料欄。在下方的預先處理函式中,NUMERICAL_COLUMNS
和 CATEGORICAL_COLUMNS
是包含數值和類別資料欄名稱的清單:
NUM_OOV_BUCKETS = 1
def preprocessing_fn(inputs):
"""Preprocess input columns into transformed columns."""
# Since we are modifying some features and leaving others unchanged, we
# start by setting `outputs` to a copy of `inputs.
outputs = inputs.copy()
# Scale numeric columns to have range [0, 1].
for key in NUMERIC_FEATURE_KEYS:
outputs[key] = tft.scale_to_0_1(outputs[key])
# For all categorical columns except the label column, we generate a
# vocabulary but do not modify the feature. This vocabulary is instead
# used in the trainer, by means of a feature column, to convert the feature
# from a string to an integer id.
for key in CATEGORICAL_FEATURE_KEYS:
outputs[key] = tft.compute_and_apply_vocabulary(
tf.strings.strip(inputs[key]),
num_oov_buckets=NUM_OOV_BUCKETS,
vocab_filename=key)
# For the label column we provide the mapping from string to index.
with tf.init_scope():
# `init_scope` - Only initialize the table once.
initializer = tf.lookup.KeyValueTensorInitializer(
keys=['>50K', '<=50K'],
values=tf.cast(tf.range(2), tf.int64),
key_dtype=tf.string,
value_dtype=tf.int64)
table = tf.lookup.StaticHashTable(initializer, default_value=-1)
outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])
return outputs
與先前範例的一個不同之處在於,標籤資料欄手動指定從字串到索引的對應。因此,'>50'
會對應至 0
,而 '<=50K'
會對應至 1
,因為知道訓練模型中的哪個索引對應至哪個標籤很有用。
record_batches
變數代表 pyarrow.RecordBatch
的 PCollection
。tensor_adapter_config
由 csv_tfxio
提供,這是從 SCHEMA
推論而得 (並最終在本範例中從 TF 剖析規格推論而得)。
最後一個階段是將轉換後的資料寫入磁碟,這與讀取原始資料的形式類似。用於執行這項作業的結構定義是 tft_beam.AnalyzeAndTransformDataset
輸出的其中一部分,會推論輸出資料的結構定義。下方顯示寫入磁碟的程式碼。結構定義是中繼資料的一部分,但在 tf.Transform
API 中可互換使用這兩者 (也就是將中繼資料傳遞至 tft.coders.ExampleProtoCoder
)。請注意,這會寫入不同的格式。請勿使用 textio.WriteToText
,改用 Beam 內建的 TFRecord
格式支援,並使用編碼器將資料編碼為 Example
Proto。這是較適合用於訓練的格式,如下一節所示。transformed_eval_data_base
提供寫入的個別分片的基礎檔案名稱。
raw_dataset = (raw_data, csv_tfxio.TensorAdapterConfig())
working_dir = tempfile.mkdtemp()
with tft_beam.Context(temp_dir=working_dir):
transformed_dataset, transform_fn = (
raw_dataset | tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn, output_record_batches=True))
output_dir = tempfile.mkdtemp()
transformed_data, _ = transformed_dataset
_ = (
transformed_data
| 'EncodeTrainData' >>
beam.FlatMapTuple(lambda batch, _: RecordBatchToExamples(batch))
| 'WriteTrainData' >> beam.io.WriteToTFRecord(
os.path.join(output_dir , 'transformed.tfrecord')))
除了訓練資料之外,transform_fn
也會與中繼資料一起寫出:
_ = (
transform_fn
| 'WriteTransformFn' >> tft_beam.WriteTransformFn(output_dir))
使用 pipeline.run().wait_until_finish()
執行整個 Beam 管道。在此之前,Beam 管道代表延遲的分散式計算。它提供將會執行的操作指示,但指示尚未執行。最後這個呼叫會執行指定的管道。
result = pipeline.run().wait_until_finish()
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmphiyrst4f/tftransform_tmp/c633cd0eb0c14a2bba2bc6f7ba556ce3/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmphiyrst4f/tftransform_tmp/c633cd0eb0c14a2bba2bc6f7ba556ce3/assets INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:Assets written to: /tmpfs/tmp/tmphiyrst4f/tftransform_tmp/9080e8c73e2443fea34d6505feed4129/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmphiyrst4f/tftransform_tmp/9080e8c73e2443fea34d6505feed4129/assets INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
執行管道之後,輸出目錄會包含兩項成品:
- 轉換後的資料,以及描述資料的中繼資料。
- 包含產生的
preprocessing_fn
的tf.saved_model
ls {output_dir}
transform_fn transformed.tfrecord-00000-of-00001 transformed_metadata
如要瞭解如何使用這些成品,請參閱進階預先處理教學課程。