TensorFlow 2 TPUEmbeddingLayer:快速入門

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 下載筆記本

總覽

這個 Colab 簡要介紹 TensorFlow 2 的 TPUEmbeddingLayer。

當您有許多大型嵌入表時,TPUEmbeddingLayer 可以使用 Cloud TPU 上的嵌入加速器來加速嵌入查找。這在建立推薦模型時特別有用,因為這些模型通常具有非常大的嵌入表。

請依照 Google Cloud TPU 快速入門指南 瞭解如何建立 GCP 帳戶和 GCS 儲存空間。您有 $300 美元免費額度 可開始使用任何 GCP 產品。您可以在 https://cloud.google.com/tpu/docs 瞭解更多關於 Cloud TPU 的資訊

設定

安裝 Tensorflow 2.0 和 Tensorflow-Recommenders

pip install -U tensorflow-recommenders
import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs

連線到 TPU 節點或本機 TPU 並初始化 TPU 系統。

resolver = tf.distribute.cluster_resolver.TPUClusterResolver('').connect('')

建立 TPU 策略。需要在 TPU 上運行的模型應在 TPUStrategy 下建立。

strategy = tf.distribute.TPUStrategy(resolver)

您也可以在 TPUStrategy 物件中檢查 tpu 硬體功能。

例如,您可以檢查此 TPU 上支援哪個版本的嵌入功能。請查看 tf.tpu.experimental.HardwareFeature 以取得詳細文件。

embedding_feature = strategy.extended.tpu_hardware_feature.embedding_feature
assert embedding_feature == tf.tpu.experimental.HardwareFeature.EmbeddingFeature.V1, 'Make sure that you have the right TPU Hardware'

TPUEmbedding API 拆解

特徵與表格配置

在建立此層的實例時,您必須指定

  1. 完整的嵌入表集合,
  2. 您期望在這些表格中查找的特徵以及
  3. 您希望在表格上使用的最佳化器。

請參閱 tf.tpu.experimental.embedding.TableConfigtf.tpu.experimental.embedding.FeatureConfig 的文件,以瞭解有關完整選項集的更多詳細資訊。我們將在此處介紹基本用法。

多個 FeatureConfig 物件可以使用相同的 TableConfig 物件,允許不同的特徵共用相同的表格

table_config_one = tf.tpu.experimental.embedding.TableConfig(
    vocabulary_size=8, dim=8)
table_config_two = tf.tpu.experimental.embedding.TableConfig(
    vocabulary_size=16, dim=4)
feature_config = {
    'feature_one':
        tf.tpu.experimental.embedding.FeatureConfig(table=table_config_one),
    'feature_two':
        tf.tpu.experimental.embedding.FeatureConfig(table=table_config_one),
    'feature_three':
        tf.tpu.experimental.embedding.FeatureConfig(table=table_config_two)
}

最佳化器

可以透過將以下輸入類型之一傳遞給 optimizer 引數來全域指定最佳化器

  1. 字串,'sgd'、'adagrad' 或 'adam' 之一,它使用具有預設參數的給定最佳化器。
  2. Keras 最佳化器的實例。
  3. 來自 tf.tpu.experimental.embedding 模組的最佳化器類別的實例。

您也可以透過 tf.tpu.experimental.embedding.TableConfig 的 optimizer 引數在表格層級指定最佳化器。這將完全覆蓋此表格的全域最佳化器。基於效能考量,建議您盡量減少不同最佳化器的總數。

optimizer=tf.tpu.experimental.embedding.SGD(0.1)

模型建立

以下是在其中建立具有 tpu 嵌入層的 keras 模型的兩個範例。

對於函數式風格的 Keras 模型

with strategy.scope():
  embedding_inputs = {
      'feature_one':
          tf.keras.Input(batch_size=1024, shape=(1,), dtype=tf.int32),
      'feature_two':
          tf.keras.Input(
              batch_size=1024, shape=(1,), dtype=tf.int32, ragged=True),
      'feature_three':
          tf.keras.Input(batch_size=1024, shape=(1,), dtype=tf.int32)
  }
  # embedding, feature_config and embedding_inputs all have the same nested
  # structure.
  embedding = tfrs.layers.embedding.TPUEmbedding(
      feature_config=feature_config, optimizer=optimizer)(
          embedding_inputs)
  logits = tf.keras.layers.Dense(1)(
      tf.concat(tf.nest.flatten(embedding), axis=1))
  model = tf.keras.Model(embedding_inputs, logits)

對於子類別風格的模型

class ModelWithEmbeddings(tf.keras.Model):

  def __init__(self):
    super(ModelWithEmbeddings, self).__init__()
    self.embedding_layer = tfrs.layers.embedding.TPUEmbedding(
        feature_config=feature_config, optimizer=optimizer)
    self.dense = tf.keras.layers.Dense(1)

  def call(self, inputs):
    embedding = self.embedding_layer(inputs)
    logits = self.dense(tf.concat(tf.nest.flatten(embedding), axis=1))
    return logits


# Make sure that the tpu is reinitialized when you try to create another mdoel
tf.tpu.experimental.initialize_tpu_system(resolver)
with strategy.scope():
  model = ModelWithEmbeddings()
WARNING:tensorflow:TPU system grpc://10.3.32.50:8470 has already been initialized. Reinitializing the TPU can cause previously created variables on TPU to be lost.
tf.tpu.experimental.initialize_tpu_system(resolver)
WARNING:tensorflow:TPU system grpc://10.3.32.50:8470 has already been initialized. Reinitializing the TPU can cause previously created variables on TPU to be lost.
<tensorflow.python.tpu.topology.Topology at 0x7f2085f74400>

簡單的 TPUEmbeddingLayer 範例

在本教學中,我們使用 MovieLens 100K 資料集和 TPUEmbeddingLayer 建立一個簡單的排名模型。我們可以使用此模型根據 user_idmovie_id 預測評分。

安裝和匯入 tensorflow datasets

pip install -q --upgrade tensorflow-datasets
import tensorflow_datasets as tfds

讀取資料

為了使資料集可供 Cloud TPU 工作站存取。您需要建立 gcs 儲存空間並將資料集下載到該儲存空間。依照這些 指示 建立您的 gcs 儲存空間。

gcs_bucket = 'gs://YOUR-BUCKET-NAME'
from google.colab import auth
auth.authenticate_user()

首先,我們使用 tensorflow_dataset 提取資料。我們需要的資料是 movie_iduser_iduser_rating

然後預處理資料並將其轉換為整數。

# Ratings data.
ratings = tfds.load(
    "movielens/100k-ratings", data_dir=gcs_bucket, split="train")

# Select the basic features.
ratings = ratings.map(
    lambda x: {
        "movie_id": tf.strings.to_number(x["movie_id"]),
        "user_id": tf.strings.to_number(x["user_id"]),
        "user_rating": x["user_rating"],
    })

準備資料集與模型

這裡我們為模型定義一些超參數。

per_replica_batch_size = 16
movie_vocabulary_size = 2048
movie_embedding_size = 64
user_vocabulary_size = 2048
user_embedding_size = 64

我們將拆分資料,將 80% 的評分放入訓練集,20% 放入測試集。

shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

批次處理資料集並將其轉換為分散式資料集。

train_dataset = train.batch(
    per_replica_batch_size * strategy.num_replicas_in_sync,
    drop_remainder=True).cache()
test_dataset = test.batch(
    per_replica_batch_size * strategy.num_replicas_in_sync,
    drop_remainder=True).cache()
distribute_train_dataset = strategy.experimental_distribute_dataset(
    train_dataset,
    options=tf.distribute.InputOptions(experimental_fetch_to_device=False))
distribute_test_dataset = strategy.experimental_distribute_dataset(
    test_dataset,
    options=tf.distribute.InputOptions(experimental_fetch_to_device=False))

這裡我們建立最佳化器,指定特徵和表格配置。然後我們建立具有嵌入層的模型。

optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.1)

user_table = tf.tpu.experimental.embedding.TableConfig(
    vocabulary_size=user_vocabulary_size, dim=user_embedding_size)
movie_table = tf.tpu.experimental.embedding.TableConfig(
    vocabulary_size=movie_vocabulary_size, dim=movie_embedding_size)
feature_config = {
    "movie_id": tf.tpu.experimental.embedding.FeatureConfig(table=movie_table),
    "user_id": tf.tpu.experimental.embedding.FeatureConfig(table=user_table)
}


# Define a ranking model with embedding layer.
class EmbeddingModel(tfrs.models.Model):

  def __init__(self):
    super().__init__()

    self.embedding_layer = tfrs.layers.embedding.TPUEmbedding(
        feature_config=feature_config, optimizer=optimizer)
    self.ratings = tf.keras.Sequential([
        # Learn multiple dense layers.
        tf.keras.layers.Dense(256, activation="relu"),
        tf.keras.layers.Dense(64, activation="relu"),
        # Make rating predictions in the final layer.
        tf.keras.layers.Dense(1)
    ])
    self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
        loss=tf.keras.losses.MeanSquaredError(
            reduction=tf.keras.losses.Reduction.NONE),
        metrics=[tf.keras.metrics.RootMeanSquaredError()])

  def compute_loss(self, features, training=False):
    embedding = self.embedding_layer({
        "user_id": features["user_id"],
        "movie_id": features["movie_id"]
    })
    rating_predictions = self.ratings(
        tf.concat([embedding["user_id"], embedding["movie_id"]], axis=1))

    return tf.reduce_sum(
        self.task(
            labels=features["user_rating"], predictions=rating_predictions)) * (
                1 / (per_replica_batch_size * strategy.num_replicas_in_sync))

  def call(self, features, serving_config=None):
    embedding = self.embedding_layer(
        {
            "user_id": features["user_id"],
            "movie_id": features["movie_id"]
        },
        serving_config=serving_config)
    return self.ratings(
        tf.concat([embedding["user_id"], embedding["movie_id"]], axis=1))

確保您在 TPUStrategy 下初始化模型。

with strategy.scope():
  model = EmbeddingModel()
  model.compile(optimizer=optimizer)

訓練與評估模型

import os

訓練模型

model.fit(distribute_train_dataset, steps_per_epoch=10, epochs=10)
Epoch 1/10
10/10 [==============================] - 7s 32ms/step - root_mean_squared_error: 2.7897 - loss: 0.0564 - regularization_loss: 0.0000e+00 - total_loss: 0.0564
Epoch 2/10
10/10 [==============================] - 0s 26ms/step - root_mean_squared_error: 1.1963 - loss: 0.0088 - regularization_loss: 0.0000e+00 - total_loss: 0.0088
Epoch 3/10
10/10 [==============================] - 0s 25ms/step - root_mean_squared_error: 1.1261 - loss: 0.0089 - regularization_loss: 0.0000e+00 - total_loss: 0.0089
Epoch 4/10
10/10 [==============================] - 0s 35ms/step - root_mean_squared_error: 1.1403 - loss: 0.0094 - regularization_loss: 0.0000e+00 - total_loss: 0.0094
Epoch 5/10
10/10 [==============================] - 0s 40ms/step - root_mean_squared_error: 1.1269 - loss: 0.0103 - regularization_loss: 0.0000e+00 - total_loss: 0.0103
Epoch 6/10
10/10 [==============================] - 0s 36ms/step - root_mean_squared_error: 1.1162 - loss: 0.0100 - regularization_loss: 0.0000e+00 - total_loss: 0.0100
Epoch 7/10
10/10 [==============================] - 0s 36ms/step - root_mean_squared_error: 1.1365 - loss: 0.0097 - regularization_loss: 0.0000e+00 - total_loss: 0.0097
Epoch 8/10
10/10 [==============================] - 0s 47ms/step - root_mean_squared_error: 1.1171 - loss: 0.0110 - regularization_loss: 0.0000e+00 - total_loss: 0.0110
Epoch 9/10
10/10 [==============================] - 0s 48ms/step - root_mean_squared_error: 1.1037 - loss: 0.0100 - regularization_loss: 0.0000e+00 - total_loss: 0.0100
Epoch 10/10
10/10 [==============================] - 0s 51ms/step - root_mean_squared_error: 1.0953 - loss: 0.0092 - regularization_loss: 0.0000e+00 - total_loss: 0.0092
<keras.callbacks.History at 0x7f2084d7ddf0>

在測試資料集上評估模型

model.evaluate(distribute_test_dataset, steps=10)
10/10 [==============================] - 4s 27ms/step - root_mean_squared_error: 1.1339 - loss: 0.0090 - regularization_loss: 0.0000e+00 - total_loss: 0.0090
[1.1338995695114136, 0.009662957862019539, 0, 0.009662957862019539]

儲存與還原檢查點

您可以使用 gcs 儲存空間來儲存您的檢查點。

請按照 指示 確保您授予 tpu 工作站存取儲存空間的權限。

model_dir = os.path.join(gcs_bucket, "saved_model")

為 TPU 模型建立檢查點,並將模型儲存到儲存空間。

checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
saved_tpu_model_path = checkpoint.save(os.path.join(model_dir, "ckpt"))

您可以列出儲存在該路徑中的變數。

tf.train.list_variables(saved_tpu_model_path)
[('_CHECKPOINTABLE_OBJECT_GRAPH', []),
 ('model/embedding_layer/_tpu_embedding/.ATTRIBUTES/TPUEmbedding_saveable',
  []),
 ('model/embedding_layer/_tpu_embedding/table_0/.ATTRIBUTES/VARIABLE_VALUE',
  [2048, 64]),
 ('model/embedding_layer/_tpu_embedding/table_0/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [2048, 64]),
 ('model/embedding_layer/_tpu_embedding/table_1/.ATTRIBUTES/VARIABLE_VALUE',
  [2048, 64]),
 ('model/embedding_layer/_tpu_embedding/table_1/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [2048, 64]),
 ('model/ratings/layer_with_weights-0/bias/.ATTRIBUTES/VARIABLE_VALUE', [256]),
 ('model/ratings/layer_with_weights-0/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [256]),
 ('model/ratings/layer_with_weights-0/kernel/.ATTRIBUTES/VARIABLE_VALUE',
  [128, 256]),
 ('model/ratings/layer_with_weights-0/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [128, 256]),
 ('model/ratings/layer_with_weights-1/bias/.ATTRIBUTES/VARIABLE_VALUE', [64]),
 ('model/ratings/layer_with_weights-1/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [64]),
 ('model/ratings/layer_with_weights-1/kernel/.ATTRIBUTES/VARIABLE_VALUE',
  [256, 64]),
 ('model/ratings/layer_with_weights-1/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [256, 64]),
 ('model/ratings/layer_with_weights-2/bias/.ATTRIBUTES/VARIABLE_VALUE', [1]),
 ('model/ratings/layer_with_weights-2/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [1]),
 ('model/ratings/layer_with_weights-2/kernel/.ATTRIBUTES/VARIABLE_VALUE',
  [64, 1]),
 ('model/ratings/layer_with_weights-2/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [64, 1]),
 ('model/task/_ranking_metrics/0/count/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('model/task/_ranking_metrics/0/total/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('optimizer/decay/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('optimizer/iter/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('optimizer/learning_rate/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('save_counter/.ATTRIBUTES/VARIABLE_VALUE', [])]

您可以稍後還原檢查點。這是為每個 epoch 檢查點模型並在之後還原的常見做法。

with strategy.scope():
  checkpoint.restore(saved_tpu_model_path)

此外,您可以建立 cpu 模型並還原在 TPU 上訓練的權重。

cpu_model = EmbeddingModel()

# Create the cpu checkpoint and restore the tpu checkpoint.
cpu_checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=cpu_model)
cpu_checkpoint.restore(saved_tpu_model_path)
<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f20830fe5b0>

您也可以部分還原嵌入權重。

embedding_checkpoint = tf.train.Checkpoint(embedding=model.embedding_layer)
saved_embedding_path = embedding_checkpoint.save(
    os.path.join(model_dir, 'tpu-embedding'))
# Restore the embedding parameters on cpu model.
cpu_embedding_checkpoint = tf.train.Checkpoint(
    embeddign=cpu_model.embedding_layer)
cpu_embedding_checkpoint.restore(saved_embedding_path)
<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f20831bbeb0>
# Save the embedding parameters on cpu model and restore it to the tpu model.
saved_cpu_embedding_path = embedding_checkpoint.save(
    os.path.join(model_dir, 'cpu-embedding'))
with strategy.scope():
  embedding_checkpoint.restore(saved_cpu_embedding_path)

模型服務化

最後,您可以使用匯出的 cpu 模型來進行模型服務化。模型服務化是透過 tf.saved_model API 完成

@tf.function
def serve_tensors(features):
  return cpu_model(features)


signatures = {
    'serving':
        serve_tensors.get_concrete_function(
            features={
                'movie_id':
                    tf.TensorSpec(shape=(1,), dtype=tf.int32, name='movie_id'),
                'user_id':
                    tf.TensorSpec(shape=(1,), dtype=tf.int32, name='user_id'),
            }),
}
tf.saved_model.save(
    cpu_model,
    export_dir=os.path.join(model_dir, 'exported_model'),
    signatures=signatures)
WARNING:tensorflow:Skipping full serialization of Keras layer <tensorflow_recommenders.tasks.ranking.Ranking object at 0x7f20831ead00>, because it is not built.

現在可以載入 (在 Python 或 C 中) 匯出的模型並用於模型服務化

imported = tf.saved_model.load(os.path.join(model_dir, 'exported_model'))
predict_fn = imported.signatures['serving']

# Dummy serving data.
input_batch = {
    'movie_id': tf.constant(np.array([100]), dtype=tf.int32),
    'user_id': tf.constant(np.array([30]), dtype=tf.int32)
}
# The prediction it generates.
prediction = predict_fn(**input_batch)['output_0']
WARNING:tensorflow:Detecting that an object or model or tf.train.Checkpoint is being deleted with unrestored values. See the following logs for the specific values in question. To silence these warnings, use `status.expect_partial()`. See https://tensorflow.dev.org.tw/api_docs/python/tf/train/Checkpoint#restorefor details about the status object returned by the restore function.
WARNING:tensorflow:An attribute in the restored object could not be found in the checkpoint. Object: (root).embedding_layer._tpu_embedding, attribute: ['TPUEmbedding_saveable']

此外,您可以傳遞模型服務化配置來進行模型服務化。

請注意:您可以使用模型服務化配置來使用已訓練嵌入表的一個子集來進行模型服務化。

serving_config = {
    'movie_id': tf.tpu.experimental.embedding.FeatureConfig(table=movie_table),
    'user_id': tf.tpu.experimental.embedding.FeatureConfig(table=user_table)
}
prediction = cpu_model(input_batch, serving_config=serving_config)