用於訓練 TensorFlow Recommenders 排名模型做為 TFX 管線的教學課程。
![]() |
![]() |
![]() |
![]() |
在本以筆記本為基礎的教學課程中,我們將建立並執行 TFX 管線,以訓練排名模型,使用 TensorFlow Recommenders (TFRS) 預測電影評分。此管線將包含三個基本的 TFX 元件:ExampleGen、Trainer 和 Pusher。此管線包含最簡化的 ML 工作流程,例如匯入資料、訓練模型,以及匯出經過訓練的 TFRS 排名模型。
設定
我們首先需要安裝 TFX Python 套件,並下載將用於模型的資料集。
升級 Pip
為了避免在本機執行時在系統中升級 Pip,請檢查以確保我們在 Colab 中執行。本機系統當然可以個別升級。
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
安裝 TFX
pip install -U tfx
pip install -U tensorflow-recommenders
您是否重新啟動執行階段?
如果您使用 Google Colab,第一次執行上述儲存格時,您必須按一下上方的「重新啟動執行階段」按鈕,或使用「執行階段 > 重新啟動執行階段...」選單來重新啟動執行階段。這是因為 Colab 載入套件的方式。
在我們定義管線之前,我們需要為 Trainer 元件編寫模型程式碼,並將其儲存到檔案中。
檢查 TensorFlow 和 TFX 版本。
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
2022-12-14 13:03:42.352068: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered 2022-12-14 13:03:43.224089: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory 2022-12-14 13:03:43.224183: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory 2022-12-14 13:03:43.224193: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly. TensorFlow version: 2.10.1 TFX version: 1.11.0
設定變數
有些變數用於定義管線。您可以自訂這些變數。根據預設,管線的所有輸出都會在目前目錄下產生。對於本教學課程,我們將建立硬式編碼結構描述,而不是使用 SchemaGen 元件來產生結構描述。
import os
PIPELINE_NAME = 'TFRS-ranking'
# Directory where MovieLens 100K rating data lives
DATA_ROOT = os.path.join('data', PIPELINE_NAME)
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.
準備範例資料
由於 TFX 目前不支援 TensorFlow Datasets API,我們將手動下載 MovieLens 100K 資料集,以在我們的 TFX 管線中使用。我們使用的資料集是MovieLens 100K 資料集。
此資料集中有四個數值功能
- userId
- movieId
- rating
- timestamp
我們將建構一個排名模型,以預測電影的 rating
。我們不會使用 timestamp
功能。
由於 TFX ExampleGen 從目錄讀取輸入,我們需要建立目錄並將資料集複製到其中。
wget https://files.grouplens.org/datasets/movielens/ml-100k.zip
mkdir -p {DATA_ROOT}
unzip ml-100k.zip
echo 'userId,movieId,rating,timestamp' > {DATA_ROOT}/ratings.csv
sed 's/\t/,/g' ml-100k/u.data >> {DATA_ROOT}/ratings.csv
--2022-12-14 13:03:46-- https://files.grouplens.org/datasets/movielens/ml-100k.zip Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152 Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 4924029 (4.7M) [application/zip] Saving to: ‘ml-100k.zip’ ml-100k.zip 100%[===================>] 4.70M 31.0MB/s in 0.2s 2022-12-14 13:03:47 (31.0 MB/s) - ‘ml-100k.zip’ saved [4924029/4924029] Archive: ml-100k.zip creating: ml-100k/ inflating: ml-100k/allbut.pl inflating: ml-100k/mku.sh inflating: ml-100k/README inflating: ml-100k/u.data inflating: ml-100k/u.genre inflating: ml-100k/u.info inflating: ml-100k/u.item inflating: ml-100k/u.occupation inflating: ml-100k/u.user inflating: ml-100k/u1.base inflating: ml-100k/u1.test inflating: ml-100k/u2.base inflating: ml-100k/u2.test inflating: ml-100k/u3.base inflating: ml-100k/u3.test inflating: ml-100k/u4.base inflating: ml-100k/u4.test inflating: ml-100k/u5.base inflating: ml-100k/u5.test inflating: ml-100k/ua.base inflating: ml-100k/ua.test inflating: ml-100k/ub.base inflating: ml-100k/ub.test
快速查看 CSV 檔案。
head {DATA_ROOT}/ratings.csv
userId,movieId,rating,timestamp 196,242,3,881250949 186,302,3,891717742 22,377,1,878887116 244,51,2,880606923 166,346,1,886397596 298,474,4,884182806 115,265,2,881171488 253,465,5,891628467 305,451,3,886324817
您應該能夠看到四個值。例如,第一個範例表示使用者「196」給電影「242」的評分為 3。
建立管線
TFX 管線是使用 Python API 定義的。我們將定義一個包含以下三個元件的管線。
- CsvExampleGen:讀取資料檔案並將其轉換為 TFX 內部格式,以進行進一步處理。有多個適用於各種格式的 ExampleGen。在本教學課程中,我們將使用 CsvExampleGen,它採用 CSV 檔案輸入。
- Trainer:訓練 ML 模型。Trainer 元件需要使用者提供模型定義程式碼。您可以使用 TensorFlow API 來指定如何訓練模型,並以 _savedmodel 格式儲存。
- Pusher:將經過訓練的模型複製到 TFX 管線外部。Pusher 元件可以視為經過訓練的 ML 模型的部署流程。
在實際定義管線之前,我們需要先為 Trainer 元件編寫模型程式碼。
編寫模型訓練程式碼
我們將建構一個簡單的排名模型來預測電影評分。此模型訓練程式碼將儲存到個別檔案中。
在本教學課程中,我們將使用 TFX 的 Generic Trainer,它支援以 Keras 為基礎的模型。您需要編寫一個包含 run_fn
函數的 Python 檔案,這是 Trainer
元件的進入點。
_trainer_module_file = 'tfrs_ranking_trainer.py'
我們使用的排名模型幾乎與 Basic Ranking 教學課程中的模型完全相同。唯一的區別是我們在候選塔中使用電影 ID 而不是電影標題。
%%writefile {_trainer_module_file}
from typing import Dict, Text
from typing import List
import numpy as np
import tensorflow as tf
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_recommenders as tfrs
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
_FEATURE_KEYS = ['userId', 'movieId']
_LABEL_KEY = 'rating'
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
for feature in _FEATURE_KEYS
}, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
class RankingModel(tf.keras.Model):
def __init__(self):
super().__init__()
embedding_dimension = 32
unique_user_ids = np.array(range(943)).astype(str)
unique_movie_ids = np.array(range(1682)).astype(str)
# Compute embeddings for users.
self.user_embeddings = tf.keras.Sequential([
tf.keras.layers.Input(shape=(1,), name='userId', dtype=tf.int64),
tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
tf.keras.layers.StringLookup(
vocabulary=unique_user_ids, mask_token=None),
tf.keras.layers.Embedding(
len(unique_user_ids) + 1, embedding_dimension)
])
# Compute embeddings for movies.
self.movie_embeddings = tf.keras.Sequential([
tf.keras.layers.Input(shape=(1,), name='movieId', dtype=tf.int64),
tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
tf.keras.layers.StringLookup(
vocabulary=unique_movie_ids, mask_token=None),
tf.keras.layers.Embedding(
len(unique_movie_ids) + 1, embedding_dimension)
])
# Compute predictions.
self.ratings = tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1)
])
def call(self, inputs):
user_id, movie_id = inputs
user_embedding = self.user_embeddings(user_id)
movie_embedding = self.movie_embeddings(movie_id)
return self.ratings(tf.concat([user_embedding, movie_embedding], axis=2))
class MovielensModel(tfrs.models.Model):
def __init__(self):
super().__init__()
self.ranking_model: tf.keras.Model = RankingModel()
self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
loss=tf.keras.losses.MeanSquaredError(),
metrics=[tf.keras.metrics.RootMeanSquaredError()])
def call(self, features: Dict[str, tf.Tensor]) -> tf.Tensor:
return self.ranking_model((features['userId'], features['movieId']))
def compute_loss(self,
features: Dict[Text, tf.Tensor],
training=False) -> tf.Tensor:
labels = features[1]
rating_predictions = self(features[0])
# The task computes the loss and the metrics.
return self.task(labels=labels, predictions=rating_predictions)
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 256) -> tf.data.Dataset:
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _build_keras_model() -> tf.keras.Model:
return MovielensModel()
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files, fn_args.data_accessor, schema, batch_size=8192)
eval_dataset = _input_fn(
fn_args.eval_files, fn_args.data_accessor, schema, batch_size=4096)
model = _build_keras_model()
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
epochs = 3,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
model.save(fn_args.serving_model_dir)
Writing tfrs_ranking_trainer.py
現在您已完成所有準備步驟來建構 TFX 管線。
編寫管線定義
我們定義一個函數來建立 TFX 管線。Pipeline
物件代表 TFX 管線,可以使用 TFX 支援的管線協調系統之一來執行。
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
metadata_path: str) -> tfx.dsl.Pipeline:
"""Creates a three component pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=12),
eval_args=tfx.proto.EvalArgs(num_steps=24))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components)
執行管線
TFX 支援多個協調器來執行管線。在本教學課程中,我們將使用 LocalDagRunner
,它包含在 TFX Python 套件中,並在本機環境中執行管線。
現在我們建立 LocalDagRunner
並傳遞從我們已定義的函數建立的 Pipeline
物件。
管線直接執行,您可以查看管線進度的記錄,包括 ML 模型訓練。
tfx.orchestration.LocalDagRunner().run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=_trainer_module_file,
serving_model_dir=SERVING_MODEL_DIR,
metadata_path=METADATA_PATH))
INFO:absl:Generating ephemeral wheel package for '/tmpfs/src/temp/docs/examples/tfrs_ranking_trainer.py' (including modules: ['tfrs_ranking_trainer']). INFO:absl:User module package has hash fingerprint version bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c. INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '/tmpfs/tmp/tmpbks2_kfl/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmpfs/tmp/tmpbvugqpwn', '--dist-dir', '/tmpfs/tmp/tmpfqmceeau'] /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/setuptools/command/install.py:34: SetuptoolsDeprecationWarning: setup.py install is deprecated. Use build and pip and other standards-based tools. warnings.warn( INFO:absl:Successfully built user code wheel distribution at 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'; target user module is 'tfrs_ranking_trainer'. INFO:absl:Full user module path is 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "Pusher" value { python_class_executable_spec { class_path: "tfx.components.pusher.executor.Executor" } } } executor_specs { key: "Trainer" value { python_class_executable_spec { class_path: "tfx.components.trainer.executor.GenericExecutor" } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "metadata/TFRS-ranking/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "metadata/TFRS-ranking/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "data/TFRS-ranking" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "Trainer" execution_options { caching_options { } } running bdist_wheel running build running build_py creating build creating build/lib copying tfrs_ranking_trainer.py -> build/lib installing to /tmpfs/tmp/tmpbvugqpwn running install running install_lib copying build/lib/tfrs_ranking_trainer.py -> /tmpfs/tmp/tmpbvugqpwn running install_egg_info running egg_info creating tfx_user_code_Trainer.egg-info writing tfx_user_code_Trainer.egg-info/PKG-INFO writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' Copying tfx_user_code_Trainer.egg-info to /tmpfs/tmp/tmpbvugqpwn/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3.9.egg-info running install_scripts creating /tmpfs/tmp/tmpbvugqpwn/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL creating '/tmpfs/tmp/tmpfqmceeau/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' and adding '/tmpfs/tmp/tmpbvugqpwn' to it adding 'tfrs_ranking_trainer.py' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/METADATA' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/top_level.txt' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/RECORD' removing /tmpfs/tmp/tmpbvugqpwn INFO:absl:MetadataStore with DB connection initialized INFO:absl:[CsvExampleGen] Resolved inputs: ({},) INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 1 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_data_format': 6, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': 'data/TFRS-ranking', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027'}, execution_output_uri='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "data/TFRS-ranking" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "Trainer" execution_options { caching_options { } } , pipeline_info=id: "TFRS-ranking" , pipeline_run_id='2022-12-14T13:03:48.504573') INFO:absl:Generating examples. WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features. INFO:absl:Processing input csv data data/TFRS-ranking/* to TFExample. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 1 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 1 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component Trainer is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.trainer.component.Trainer" base_type: TRAIN } id: "Trainer" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "model" value { artifact_spec { type { name: "Model" base_type: MODEL } } } } outputs { key: "model_run" value { artifact_spec { type { name: "ModelRun" } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "eval_args" value { field_value { string_value: "{\n \"num_steps\": 24\n}" } } } parameters { key: "module_path" value { field_value { string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl" } } } parameters { key: "train_args" value { field_value { string_value: "{\n \"num_steps\": 12\n}" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "Pusher" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[Trainer] Resolved inputs: ({'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023043350 last_update_time_since_epoch: 1671023043350 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 2 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023043350 last_update_time_since_epoch: 1671023043350 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2" , artifact_type: name: "ModelRun" )], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2" , artifact_type: name: "Model" base_type: MODEL )]}), exec_properties={'train_args': '{\n "num_steps": 12\n}', 'eval_args': '{\n "num_steps": 24\n}', 'custom_config': 'null', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'}, execution_output_uri='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Trainer/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/.temp/', pipeline_node=node_info { type { name: "tfx.components.trainer.component.Trainer" base_type: TRAIN } id: "Trainer" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "model" value { artifact_spec { type { name: "Model" base_type: MODEL } } } } outputs { key: "model_run" value { artifact_spec { type { name: "ModelRun" } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "eval_args" value { field_value { string_value: "{\n \"num_steps\": 24\n}" } } } parameters { key: "module_path" value { field_value { string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl" } } } parameters { key: "train_args" value { field_value { string_value: "{\n \"num_steps\": 12\n}" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "Pusher" execution_options { caching_options { } } , pipeline_info=id: "TFRS-ranking" , pipeline_run_id='2022-12-14T13:03:48.504573') INFO:absl:Train on the 'train' split when train_args.splits is not set. INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set. INFO:absl:udf_utils.get_fn {'train_args': '{\n "num_steps": 12\n}', 'eval_args': '{\n "num_steps": 24\n}', 'custom_config': 'null', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'} 'run_fn' INFO:absl:Installing 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' to a temporary directory. INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '-m', 'pip', 'install', '--target', '/tmpfs/tmp/tmpkwsj4cmv', 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'] Processing ./pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl INFO:absl:Successfully installed 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'. INFO:absl:Training model. INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. Installing collected packages: tfx-user-code-Trainer Successfully installed tfx-user-code-Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. Epoch 1/3 12/12 [==============================] - 4s 247ms/step - root_mean_squared_error: 1.9485 - loss: 3.6361 - regularization_loss: 0.0000e+00 - total_loss: 3.6361 - val_root_mean_squared_error: 1.2136 - val_loss: 1.4467 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.4467 Epoch 2/3 12/12 [==============================] - 2s 198ms/step - root_mean_squared_error: 1.1657 - loss: 1.3525 - regularization_loss: 0.0000e+00 - total_loss: 1.3525 - val_root_mean_squared_error: 1.1173 - val_loss: 1.2615 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2615 Epoch 3/3 12/12 [==============================] - 2s 212ms/step - root_mean_squared_error: 1.1121 - loss: 1.2330 - regularization_loss: 0.0000e+00 - total_loss: 1.2330 - val_root_mean_squared_error: 1.0983 - val_loss: 1.2303 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2303 WARNING:absl:Function `_wrapped_model` contains input name(s) movieId, userId with unsupported characters which will be renamed to movieid, userid in the SavedModel. WARNING:absl:Found untraced functions such as ranking_layer_call_fn, ranking_layer_call_and_return_conditional_losses while saving (showing 2 of 2). These functions will not be directly callable after loading. INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets INFO:absl:Training complete. Model written to pipelines/TFRS-ranking/Trainer/model/2/Format-Serving. ModelRun written to pipelines/TFRS-ranking/Trainer/model_run/2 INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 2 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2" , artifact_type: name: "ModelRun" )], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2" , artifact_type: name: "Model" base_type: MODEL )]}) for execution 2 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component Trainer is finished. INFO:absl:Component Pusher is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.pusher.component.Pusher" base_type: DEPLOY } id: "Pusher" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Pusher" } } } } inputs { inputs { key: "model" value { channels { producer_node_query { id: "Trainer" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } artifact_query { type { name: "Model" base_type: MODEL } } output_key: "model" } } } } outputs { outputs { key: "pushed_model" value { artifact_spec { type { name: "PushedModel" base_type: MODEL } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "push_destination" value { field_value { string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/TFRS-ranking\"\n }\n}" } } } } upstream_nodes: "Trainer" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[Pusher] Resolved inputs: ({'model': [Artifact(artifact: id: 3 type_id: 18 uri: "pipelines/TFRS-ranking/Trainer/model/2" custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023059840 last_update_time_since_epoch: 1671023059840 , artifact_type: id: 18 name: "Model" base_type: MODEL )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 3 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'model': [Artifact(artifact: id: 3 type_id: 18 uri: "pipelines/TFRS-ranking/Trainer/model/2" custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023059840 last_update_time_since_epoch: 1671023059840 , artifact_type: id: 18 name: "Model" base_type: MODEL )]}, output_dict=defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3" , artifact_type: name: "PushedModel" base_type: MODEL )]}), exec_properties={'push_destination': '{\n "filesystem": {\n "base_directory": "serving_model/TFRS-ranking"\n }\n}', 'custom_config': 'null'}, execution_output_uri='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Pusher/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/.temp/', pipeline_node=node_info { type { name: "tfx.components.pusher.component.Pusher" base_type: DEPLOY } id: "Pusher" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Pusher" } } } } inputs { inputs { key: "model" value { channels { producer_node_query { id: "Trainer" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } artifact_query { type { name: "Model" base_type: MODEL } } output_key: "model" } } } } outputs { outputs { key: "pushed_model" value { artifact_spec { type { name: "PushedModel" base_type: MODEL } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "push_destination" value { field_value { string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/TFRS-ranking\"\n }\n}" } } } } upstream_nodes: "Trainer" execution_options { caching_options { } } , pipeline_info=id: "TFRS-ranking" , pipeline_run_id='2022-12-14T13:03:48.504573') WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline. INFO:absl:Model version: 1671023059 INFO:absl:Model written to serving path serving_model/TFRS-ranking/1671023059. INFO:absl:Model pushed to pipelines/TFRS-ranking/Pusher/pushed_model/3. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 3 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3" , artifact_type: name: "PushedModel" base_type: MODEL )]}) for execution 3 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component Pusher is finished.
如果管線順利完成,您應該在記錄結尾看到「INFO:absl:Component Pusher is finished.」。因為 Pusher
元件是管線的最後一個元件。
pusher 元件將經過訓練的模型推送至 SERVING_MODEL_DIR
,如果您未在先前的步驟中變更變數,則這是 serving_model/TFRS-ranking
目錄。您可以在 Colab 左側面板的檔案瀏覽器中查看結果,或使用以下指令
# List files in created model directory.
ls -R {SERVING_MODEL_DIR}
serving_model/TFRS-ranking: 1671023059 serving_model/TFRS-ranking/1671023059: assets keras_metadata.pb saved_model.pb variables serving_model/TFRS-ranking/1671023059/assets: serving_model/TFRS-ranking/1671023059/variables: variables.data-00000-of-00001 variables.index
現在我們可以透過計算使用者和電影的預測來測試排名模型
import glob
# Load the latest model for testing
loaded = tf.saved_model.load(max(glob.glob(os.path.join(SERVING_MODEL_DIR, '*/')), key=os.path.getmtime))
print(loaded({'userId': [[42]], 'movieId': [[15]]}).numpy())
[[[3.6946948]]]
TensorFlow Recommenders + TFX 教學課程到此結束。