Avro Dataset API

在 TensorFlow.org 上檢視 在 Google Colab 中執行 在 GitHub 上檢視原始碼 下載筆記本

總覽

Avro Dataset API 的目標是以原生方式將 Avro 格式化資料載入 TensorFlow 作為TensorFlow 資料集。Avro 是一種資料序列化系統,類似於 Protocol Buffers。它廣泛用於 Apache Hadoop 中,可為持久性資料提供序列化格式,並為 Hadoop 節點之間的通訊提供線路格式。Avro 資料是一種以列為導向的壓縮二進位資料格式。它依賴於架構,架構儲存為個別的 JSON 檔案。如需 Avro 格式和架構宣告的規格,請參閱官方手冊

設定套件

安裝必要的 tensorflow-io 套件

pip install tensorflow-io

匯入套件

import tensorflow as tf
import tensorflow_io as tfio

驗證 tf 和 tfio 匯入

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

用法

探索資料集

為了本教學課程的目的,讓我們下載範例 Avro 資料集。

下載範例 Avro 檔案

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

下載範例 Avro 檔案的對應架構檔案

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

在上述範例中,已根據 mnist 資料集建立測試 Avro 資料集。TFRecord 格式的原始 mnist 資料集是從TF 命名資料集產生。但是,mnist 資料集作為示範資料集來說太大了。為了簡化起見,大部分內容都經過修剪,並且只保留了前幾個記錄。此外,還對原始 mnist 資料集中的 image 欄位進行了額外修剪,並將其對應到 Avro 中的 features 欄位。因此,avro 檔案 train.avro 有 4 個記錄,每個記錄有 3 個欄位:features(整數陣列)、label(整數或空值)和 dataType(列舉)。若要檢視已解碼的 train.avro(請注意,原始 avro 資料檔案並非人類可讀,因為 avro 是一種壓縮格式)

安裝必要的套件以讀取 Avro 檔案

pip install avro

若要以人類可讀格式讀取和列印 Avro 檔案

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

train.avro 的架構(由 train.avsc 表示)是一個 JSON 格式的檔案。若要檢視 train.avsc

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

準備資料集

使用 Avro Dataset API 將 train.avro 載入為 TensorFlow 資料集

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

上述範例會將 train.avro 轉換為 tensorflow 資料集。資料集的每個元素都是一個字典,其鍵是特徵名稱,值是轉換後的稀疏或密集張量。例如,它會將 featureslabeldataType 欄位分別轉換為 VarLenFeature(SparseTensor)、FixedLenFeature(DenseTensor) 和 FixedLenFeature(DenseTensor)。由於 batch_size 為 3,因此它會強制將 train.avro 中的 3 個記錄轉換為結果資料集中的一個元素。對於 train.avro 中標籤為空值的第一個記錄,avro 讀取器會將其替換為指定的預設值 (-100)。在本範例中,train.avro 中總共有 4 個記錄。由於批次大小為 3,因此結果資料集包含 3 個元素,最後一個元素的批次大小為 1。但是,如果使用者啟用 drop_final_batch,則也可以捨棄最後一個批次(如果大小小於批次大小)。例如

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

也可以增加 num_parallel_reads 以透過增加 avro 剖析/讀取平行處理來加快 Avro 資料處理速度。

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

如需 make_avro_record_dataset 的詳細用法,請參閱 API 文件

使用 Avro 資料集訓練 tf.keras 模型

現在,讓我們逐步了解使用 Avro 資料集(根據 mnist 資料集)進行 tf.keras 模型訓練的端對端範例。

使用 Avro Dataset API 將 train.avro 載入為 TensorFlow 資料集

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

定義簡單的 keras 模型

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

使用 Avro 資料集訓練 keras 模型

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

avro 資料集可以剖析並強制轉換任何 avro 資料為 TensorFlow 張量,包括記錄中的記錄、地圖、陣列、分支和列舉。剖析資訊會以地圖的形式傳遞到 avro 資料集實作中,其中索引鍵會編碼如何剖析資料值,而值會編碼如何強制轉換資料為 TensorFlow 張量 – 決定原始類型 (例如 bool、int、long、float、double、string) 以及張量類型 (例如稀疏或密集)。隨附 TensorFlow 剖析器類型 (請參閱表 1) 和原始類型強制轉換 (表 2) 的清單。

表 1 支援的 TensorFlow 剖析器類型

TensorFlow 剖析器類型 TensorFlow 張量 說明
tf.FixedLenFeature([], tf.int32) 密集張量 剖析固定長度特徵;也就是說,所有列都具有相同的常數元素數量,例如,每個列只有一個元素或一個陣列,該陣列始終具有相同數量的元素
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) 稀疏張量 剖析稀疏特徵,其中每列都有可變長度的索引和值清單。「index_key」識別索引。「value_key」識別值。「dtype」是資料類型。「size」是每個索引項目的預期最大索引值
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) 稀疏張量 剖析可變長度特徵;這表示每個資料列可以有可變數量的元素,例如,第 1 列有 5 個元素,第 2 列有 7 個元素

表 2 支援從 Avro 類型到 TensorFlow 類型的轉換

Avro 原始類型 TensorFlow 原始類型
boolean:二進位值 tf.bool
bytes:8 位元無號位元組序列 tf.string
double:倍精確度 64 位元 IEEE 浮點數 tf.float64
enum:列舉類型 tf.string,使用符號名稱
float:單精確度 32 位元 IEEE 浮點數 tf.float32
int:32 位元帶正負號整數 tf.int32
long:64 位元帶正負號整數 tf.int64
null:無值 使用預設值
string:Unicode 字元序列 tf.string

Avro Dataset API 的完整範例集在測試中提供。