使用 TensorFlow Transform 預先處理資料

TensorFlow Extended (TFX) 的特徵工程元件

這個範例 Colab 筆記本提供了一個非常簡單的範例,說明如何使用 TensorFlow Transform (tf.Transform),在模型訓練和生產環境推論服務中使用完全相同的程式碼來預先處理資料。

TensorFlow Transform 是一個用於預先處理 TensorFlow 輸入資料的程式庫,包括建立需要完整傳遞訓練資料集的特徵。例如,使用 TensorFlow Transform,您可以:

  • 使用平均值和標準差來正規化輸入值
  • 透過產生所有輸入值的詞彙表,將字串轉換為整數
  • 根據觀察到的資料分佈,將浮點數指派到值組來將其轉換為整數

TensorFlow 具有對單一範例或一批範例進行操作的內建支援。tf.Transform 擴充了這些功能,以支援完整傳遞整個訓練資料集。

tf.Transform 的輸出會匯出為 TensorFlow 圖表,可用於訓練和推論服務。由於在訓練和推論服務階段都套用相同的轉換,因此使用相同的圖表有助於避免偏差。

升級 Pip

為了避免在本機執行時升級系統中的 Pip,請檢查以確保我們在 Colab 中執行。當然,本機系統可以單獨升級。

try:
  import colab
  !pip install --upgrade pip
except:
  pass

安裝 TensorFlow Transform

pip install -q -U tensorflow_transform
# This cell is only necessary because packages were installed while python was
# running. It avoids the need to restart the runtime when running in Colab.
import pkg_resources
import importlib

importlib.reload(pkg_resources)
/tmpfs/tmp/ipykernel_192169/639106435.py:3: DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
  import pkg_resources
<module 'pkg_resources' from '/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/pkg_resources/__init__.py'>

匯入

import pathlib
import pprint
import tempfile

import tensorflow as tf
import tensorflow_transform as tft

import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils
from tensorflow_transform.keras_lib import tf_keras
2024-04-30 10:54:48.029467: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-30 10:54:48.029516: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-30 10:54:48.030987: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered

資料:建立一些虛擬資料

我們將為簡單範例建立一些簡單的虛擬資料

  • raw_data 是我們要預先處理的初始原始資料
  • raw_data_metadata 包含綱要,告知我們 raw_data 中每個資料欄的類型。在本例中,它非常簡單。
raw_data = [
      {'x': 1, 'y': 1, 's': 'hello'},
      {'x': 2, 'y': 2, 's': 'world'},
      {'x': 3, 'y': 3, 's': 'hello'}
  ]

raw_data_metadata = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec({
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
        's': tf.io.FixedLenFeature([], tf.string),
    }))

轉換:建立預先處理函式

預先處理函式是 tf.Transform 最重要的概念。預先處理函式是資料集的轉換真正發生的位置。它接受並傳回張量字典,其中張量表示 TensorSparseTensor。通常構成預先處理函式核心的 API 呼叫有兩大類:

  1. TensorFlow 運算:任何接受並傳回張量的函式,通常表示 TensorFlow 運算。這些運算會將 TensorFlow 運算新增至圖表,以每次一個特徵向量的方式將原始資料轉換為已轉換資料。這些運算會在訓練和推論服務期間針對每個範例執行。
  2. Tensorflow Transform 分析器/對應器:tf.Transform 提供的任何分析器/對應器。這些也接受並傳回張量,通常包含 TensorFlow 運算和 Beam 計算的組合,但與 TensorFlow 運算不同的是,它們僅在分析期間的 Beam 管線中執行,需要完整傳遞整個訓練資料集。Beam 計算僅執行一次 (在訓練之前,在分析期間),通常會完整傳遞整個訓練資料集。它們會建立 tf.constant 張量,並將其新增至您的圖表。例如,tft.min 會計算訓練資料集中張量的最小值。
def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns."""
    x = inputs['x']
    y = inputs['y']
    s = inputs['s']
    x_centered = x - tft.mean(x)
    y_normalized = tft.scale_to_0_1(y)
    s_integerized = tft.compute_and_apply_vocabulary(s)
    x_centered_times_y_normalized = (x_centered * y_normalized)
    return {
        'x_centered': x_centered,
        'y_normalized': y_normalized,
        's_integerized': s_integerized,
        'x_centered_times_y_normalized': x_centered_times_y_normalized,
    }

語法

您幾乎已準備好將所有內容組合在一起並使用 Apache Beam 來執行它。

Apache Beam 使用特殊語法來定義和叫用轉換。例如,在此行中:

result = pass_this | 'name this step' >> to_this_call

方法 to_this_call 正在被叫用,並傳遞稱為 pass_this 的物件,並且 此操作將在堆疊追蹤中稱為 name this stepto_this_call 呼叫的結果會在 result 中傳回。您經常會看到管線的階段像這樣鏈結在一起:

result = apache_beam.Pipeline() | 'first step' >> do_this_first() | 'second step' >> do_this_last()

由於它從新的管線開始,因此您可以像這樣繼續:

next_result = result | 'doing more stuff' >> another_function()

將所有內容組合在一起

現在我們準備好轉換資料了。我們將搭配直接執行器使用 Apache Beam,並提供三個輸入:

  1. raw_data - 我們在上面建立的原始輸入資料
  2. raw_data_metadata - 原始資料的綱要
  3. preprocessing_fn - 我們建立用於執行轉換的函式
def main(output_dir):
  # Ignore the warnings
  with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_dataset, transform_fn = (  # pylint: disable=unused-variable
        (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn))

  transformed_data, transformed_metadata = transformed_dataset  # pylint: disable=unused-variable

  # Save the transform_fn to the output_dir
  _ = (
      transform_fn
      | 'WriteTransformFn' >> tft_beam.WriteTransformFn(output_dir))

  return transformed_data, transformed_metadata
output_dir = pathlib.Path(tempfile.mkdtemp())

transformed_data, transformed_metadata = main(str(output_dir))

print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
WARNING:absl:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:absl:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).
WARNING:absl:You are outputting instance dicts from `TransformDataset` which will not provide optimal performance. Consider setting  `output_record_batches=True` to upgrade to the TFXIO format (Apache Arrow RecordBatch). Encoding functionality in this module works with both formats.
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpgsoge9im.json', '--HistoryManager.hist_file=:memory:']
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/c576d13575254973b6f7263cfcf3ffc3/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/c576d13575254973b6f7263cfcf3ffc3/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/b9fda3835766458d8e33d05f6357bed2/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/b9fda3835766458d8e33d05f6357bed2/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpgsoge9im.json', '--HistoryManager.hist_file=:memory:']
Raw data:
[{'s': 'hello', 'x': 1, 'y': 1},
 {'s': 'world', 'x': 2, 'y': 2},
 {'s': 'hello', 'x': 3, 'y': 3}]

Transformed data:
[{'s_integerized': 0,
  'x_centered': -1.0,
  'x_centered_times_y_normalized': -0.0,
  'y_normalized': 0.0},
 {'s_integerized': 1,
  'x_centered': 0.0,
  'x_centered_times_y_normalized': 0.0,
  'y_normalized': 0.5},
 {'s_integerized': 0,
  'x_centered': 1.0,
  'x_centered_times_y_normalized': 1.0,
  'y_normalized': 1.0}]

這是正確答案嗎?

先前,我們使用 tf.Transform 來執行此操作:

x_centered = x - tft.mean(x)
y_normalized = tft.scale_to_0_1(y)
s_integerized = tft.compute_and_apply_vocabulary(s)
x_centered_times_y_normalized = (x_centered * y_normalized)
  • x_centered - 輸入為 [1, 2, 3] 時,x 的平均值為 2,我們從 x 中減去它以將 x 值置中於 0。因此,我們的結果 [-1.0, 0.0, 1.0] 是正確的。
  • y_normalized - 我們想要將 y 值縮放介於 0 和 1 之間。我們的輸入為 [1, 2, 3],因此我們的結果 [0.0, 0.5, 1.0] 是正確的。
  • s_integerized - 我們想要將字串對應到詞彙表中的索引,而我們的詞彙表中只有 2 個字詞 ("hello" 和 "world")。因此,輸入為 ["hello", "world", "hello"] 時,我們的結果 [0, 1, 0] 是正確的。由於 "hello" 在此資料中出現頻率最高,因此它將成為詞彙表中的第一個項目。
  • x_centered_times_y_normalized - 我們想要透過使用乘法交叉 x_centeredy_normalized 來建立新特徵。請注意,這會將結果 (而非原始值) 相乘,而我們的新結果 [-0.0, 0.0, 1.0] 是正確的。

使用產生的 transform_fn

ls -l {output_dir}
total 8
drwxr-xr-x 4 kbuilder kbuilder 4096 Apr 30 10:54 transform_fn
drwxr-xr-x 2 kbuilder kbuilder 4096 Apr 30 10:54 transformed_metadata

transform_fn/ 目錄包含 tf.saved_model 實作,其中所有常數 tensorflow-transform 分析結果都內建在圖表中。

可以使用 tf.saved_model.load 直接載入,但這並不容易使用

loaded = tf.saved_model.load(str(output_dir/'transform_fn'))
loaded.signatures['serving_default']
<ConcreteFunction (*, inputs: TensorSpec(shape=(None,), dtype=tf.string, name='inputs'), inputs_1: TensorSpec(shape=(None,), dtype=tf.float32, name='inputs_1'), inputs_2: TensorSpec(shape=(None,), dtype=tf.float32, name='inputs_2')) -> Dict[['x_centered', TensorSpec(shape=(None,), dtype=tf.float32, name='x_centered')], ['s_integerized', TensorSpec(shape=<unknown>, dtype=tf.int64, name='s_integerized')], ['x_centered_times_y_normalized', TensorSpec(shape=(None,), dtype=tf.float32, name='x_centered_times_y_normalized')], ['y_normalized', TensorSpec(shape=(None,), dtype=tf.float32, name='y_normalized')]] at 0x7F452C2C6400>

更好的方法是使用 tft.TFTransformOutput 載入。 TFTransformOutput.transform_features_layer 方法會傳回 tft.TransformFeaturesLayer 物件,可用於套用轉換

tf_transform_output = tft.TFTransformOutput(output_dir)

tft_layer = tf_transform_output.transform_features_layer()
tft_layer
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
<tensorflow_transform.output_wrapper.TransformFeaturesLayer at 0x7f46bc272700>

tft.TransformFeaturesLayer 預期會是批次特徵字典。因此,請從 raw_data 中的 List[Dict[str, Any]] 建立 Dict[str, tf.Tensor]

raw_data_batch = {
    's': tf.constant([ex['s'] for ex in raw_data]),
    'x': tf.constant([ex['x'] for ex in raw_data], dtype=tf.float32),
    'y': tf.constant([ex['y'] for ex in raw_data], dtype=tf.float32),
}

您可以單獨使用 tft.TransformFeaturesLayer

transformed_batch = tft_layer(raw_data_batch)

{key: value.numpy() for key, value in transformed_batch.items()}
{'x_centered': array([-1.,  0.,  1.], dtype=float32),
 'x_centered_times_y_normalized': array([-0.,  0.,  1.], dtype=float32),
 'y_normalized': array([0. , 0.5, 1. ], dtype=float32),
 's_integerized': array([0, 1, 0])}

匯出

更典型的使用案例會使用 tf.Transform 將轉換套用至訓練和評估資料集 (如需範例,請參閱下一個教學課程)。然後,在訓練之後,在匯出模型之前,將 tft.TransformFeaturesLayer 作為第一層附加,以便您可以將其匯出為 tf.saved_model 的一部分。如需具體範例,請繼續閱讀。

範例訓練模型

以下是一個模型,它會:

  1. 取得已轉換的批次
  2. 將它們全部堆疊成一個簡單的 (batch, features) 矩陣
  3. 透過幾個密集層執行它們,以及
  4. 產生 10 個線性輸出。

在實際使用案例中,您會將單熱編碼套用至 s_integerized 特徵。

您可以在由 tf.Transform 轉換的資料集上訓練此模型

class StackDict(tf_keras.layers.Layer):
  def call(self, inputs):
    values = [
        tf.cast(v, tf.float32)
        for k,v in sorted(inputs.items(), key=lambda kv: kv[0])]
    return tf.stack(values, axis=1)
class TrainedModel(tf_keras.Model):
  def __init__(self):
    super().__init__(self)
    self.concat = StackDict()
    self.body = tf_keras.Sequential([
        tf_keras.layers.Dense(64, activation='relu'),
        tf_keras.layers.Dense(64, activation='relu'),
        tf_keras.layers.Dense(10),
    ])

  def call(self, inputs, training=None):
    x = self.concat(inputs)
    return self.body(x, training)
trained_model = TrainedModel()

假設我們已訓練模型。

trained_model.compile(loss=..., optimizer='adam')
trained_model.fit(...)

此模型在已轉換的輸入上執行

trained_model_output = trained_model(transformed_batch)
trained_model_output.shape
TensorShape([3, 10])

範例匯出包裝函式

假設您已訓練上述模型並想要匯出它。

您會想要在匯出的模型中包含轉換函式

class ExportModel(tf.Module):
  def __init__(self, trained_model, input_transform):
    self.trained_model = trained_model
    self.input_transform = input_transform

  @tf.function
  def __call__(self, inputs, training=None):
    x = self.input_transform(inputs)
    return self.trained_model(x)
export_model = ExportModel(trained_model=trained_model,
                           input_transform=tft_layer)

此組合模型適用於原始資料,並產生與直接呼叫已訓練模型完全相同的結果

export_model_output = export_model(raw_data_batch)
export_model_output.shape
TensorShape([3, 10])
tf.reduce_max(abs(export_model_output - trained_model_output)).numpy()
0.0

export_model 包含 tft.TransformFeaturesLayer,並且完全獨立。您可以儲存並在另一個環境中還原它,並且仍然獲得完全相同的結果

import tempfile
model_dir = tempfile.mkdtemp(suffix='tft')

tf.saved_model.save(export_model, model_dir)
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpjz93eylstft/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpjz93eylstft/assets
reloaded = tf.saved_model.load(model_dir)

reloaded_model_output = reloaded(raw_data_batch)
reloaded_model_output.shape
TensorShape([3, 10])
tf.reduce_max(abs(export_model_output - reloaded_model_output)).numpy()
0.0