![]() |
![]() |
![]() |
![]() |
總覽
Avro Dataset API 的目標是以原生方式將 Avro 格式化資料載入 TensorFlow 作為TensorFlow 資料集。Avro 是一種資料序列化系統,類似於 Protocol Buffers。它廣泛用於 Apache Hadoop 中,可為持久性資料提供序列化格式,並為 Hadoop 節點之間的通訊提供線路格式。Avro 資料是一種以列為導向的壓縮二進位資料格式。它依賴於架構,架構儲存為個別的 JSON 檔案。如需 Avro 格式和架構宣告的規格,請參閱官方手冊。
設定套件
安裝必要的 tensorflow-io 套件
pip install tensorflow-io
匯入套件
import tensorflow as tf
import tensorflow_io as tfio
驗證 tf 和 tfio 匯入
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0 tensorflow version: 2.5.0
用法
探索資料集
為了本教學課程的目的,讓我們下載範例 Avro 資料集。
下載範例 Avro 檔案
curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 151 100 151 0 0 1268 0 --:--:-- --:--:-- --:--:-- 1268 100 369 100 369 0 0 1255 0 --:--:-- --:--:-- --:--:-- 1255 -rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro
下載範例 Avro 檔案的對應架構檔案
curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 151 100 151 0 0 1247 0 --:--:-- --:--:-- --:--:-- 1247 100 271 100 271 0 0 780 0 --:--:-- --:--:-- --:--:-- 780 -rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc
在上述範例中,已根據 mnist 資料集建立測試 Avro 資料集。TFRecord 格式的原始 mnist 資料集是從TF 命名資料集產生。但是,mnist 資料集作為示範資料集來說太大了。為了簡化起見,大部分內容都經過修剪,並且只保留了前幾個記錄。此外,還對原始 mnist 資料集中的 image
欄位進行了額外修剪,並將其對應到 Avro 中的 features
欄位。因此,avro 檔案 train.avro
有 4 個記錄,每個記錄有 3 個欄位:features
(整數陣列)、label
(整數或空值)和 dataType
(列舉)。若要檢視已解碼的 train.avro
(請注意,原始 avro 資料檔案並非人類可讀,因為 avro 是一種壓縮格式)
安裝必要的套件以讀取 Avro 檔案
pip install avro
若要以人類可讀格式讀取和列印 Avro 檔案
from avro.io import DatumReader
from avro.datafile import DataFileReader
import json
def print_avro(avro_file, max_record_num=None):
if max_record_num is not None and max_record_num <= 0:
return
with open(avro_file, 'rb') as avro_handler:
reader = DataFileReader(avro_handler, DatumReader())
record_count = 0
for record in reader:
record_count = record_count+1
print(record)
if max_record_num is not None and record_count == max_record_num:
break
print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'} {'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'} {'features': [0], 'label': 3, 'dataType': 'VALIDATION'} {'features': [1], 'label': 4, 'dataType': 'VALIDATION'}
而 train.avro
的架構(由 train.avsc
表示)是一個 JSON 格式的檔案。若要檢視 train.avsc
def print_schema(avro_schema_file):
with open(avro_schema_file, 'r') as handle:
parsed = json.load(handle)
print(json.dumps(parsed, indent=4, sort_keys=True))
print_schema('train.avsc')
{ "fields": [ { "name": "features", "type": { "items": "int", "type": "array" } }, { "name": "label", "type": [ "int", "null" ] }, { "name": "dataType", "type": { "name": "dataTypes", "symbols": [ "TRAINING", "VALIDATION" ], "type": "enum" } } ], "name": "ImageDataset", "type": "record" }
準備資料集
使用 Avro Dataset API 將 train.avro
載入為 TensorFlow 資料集
features = {
'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}
schema = tf.io.gfile.GFile('train.avsc').read()
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
batch_size=3,
num_epochs=1)
for record in dataset:
print(record['features[*]'])
print(record['label'])
print(record['dataType'])
print("--------------------")
SparseTensor(indices=tf.Tensor( [[0 0] [0 1] [0 2] [0 3] [0 4] [1 0] [1 1] [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64)) tf.Tensor([-100 2 3], shape=(3,), dtype=int32) tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string) -------------------- SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64)) tf.Tensor([4], shape=(1,), dtype=int32) tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string) --------------------
上述範例會將 train.avro
轉換為 tensorflow 資料集。資料集的每個元素都是一個字典,其鍵是特徵名稱,值是轉換後的稀疏或密集張量。例如,它會將 features
、label
、dataType
欄位分別轉換為 VarLenFeature(SparseTensor)、FixedLenFeature(DenseTensor) 和 FixedLenFeature(DenseTensor)。由於 batch_size 為 3,因此它會強制將 train.avro
中的 3 個記錄轉換為結果資料集中的一個元素。對於 train.avro
中標籤為空值的第一個記錄,avro 讀取器會將其替換為指定的預設值 (-100)。在本範例中,train.avro
中總共有 4 個記錄。由於批次大小為 3,因此結果資料集包含 3 個元素,最後一個元素的批次大小為 1。但是,如果使用者啟用 drop_final_batch
,則也可以捨棄最後一個批次(如果大小小於批次大小)。例如
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
batch_size=3,
drop_final_batch=True,
num_epochs=1)
for record in dataset:
print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100, 2, 3], dtype=int32)>}
也可以增加 num_parallel_reads 以透過增加 avro 剖析/讀取平行處理來加快 Avro 資料處理速度。
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
num_parallel_reads=16,
batch_size=3,
drop_final_batch=True,
num_epochs=1)
for record in dataset:
print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100, 2, 3], dtype=int32)>}
如需 make_avro_record_dataset
的詳細用法,請參閱 API 文件。
使用 Avro 資料集訓練 tf.keras 模型
現在,讓我們逐步了解使用 Avro 資料集(根據 mnist 資料集)進行 tf.keras 模型訓練的端對端範例。
使用 Avro Dataset API 將 train.avro
載入為 TensorFlow 資料集
features = {
'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}
schema = tf.io.gfile.GFile('train.avsc').read()
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
batch_size=1,
num_epochs=1)
定義簡單的 keras 模型
def build_and_compile_cnn_model():
model = tf.keras.Sequential()
model.compile(optimizer='sgd', loss='mse')
return model
model = build_and_compile_cnn_model()
使用 Avro 資料集訓練 keras 模型
model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>} Consider rewriting this model with the Functional API. WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>} Consider rewriting this model with the Functional API. 1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00 <tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>
avro 資料集可以剖析並強制轉換任何 avro 資料為 TensorFlow 張量,包括記錄中的記錄、地圖、陣列、分支和列舉。剖析資訊會以地圖的形式傳遞到 avro 資料集實作中,其中索引鍵會編碼如何剖析資料值,而值會編碼如何強制轉換資料為 TensorFlow 張量 – 決定原始類型 (例如 bool、int、long、float、double、string) 以及張量類型 (例如稀疏或密集)。隨附 TensorFlow 剖析器類型 (請參閱表 1) 和原始類型強制轉換 (表 2) 的清單。
表 1 支援的 TensorFlow 剖析器類型
TensorFlow 剖析器類型 | TensorFlow 張量 | 說明 |
---|---|---|
tf.FixedLenFeature([], tf.int32) | 密集張量 | 剖析固定長度特徵;也就是說,所有列都具有相同的常數元素數量,例如,每個列只有一個元素或一個陣列,該陣列始終具有相同數量的元素 |
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) | 稀疏張量 | 剖析稀疏特徵,其中每列都有可變長度的索引和值清單。「index_key」識別索引。「value_key」識別值。「dtype」是資料類型。「size」是每個索引項目的預期最大索引值 |
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) | 稀疏張量 | 剖析可變長度特徵;這表示每個資料列可以有可變數量的元素,例如,第 1 列有 5 個元素,第 2 列有 7 個元素 |
表 2 支援從 Avro 類型到 TensorFlow 類型的轉換
Avro 原始類型 | TensorFlow 原始類型 |
---|---|
boolean:二進位值 | tf.bool |
bytes:8 位元無號位元組序列 | tf.string |
double:倍精確度 64 位元 IEEE 浮點數 | tf.float64 |
enum:列舉類型 | tf.string,使用符號名稱 |
float:單精確度 32 位元 IEEE 浮點數 | tf.float32 |
int:32 位元帶正負號整數 | tf.int32 |
long:64 位元帶正負號整數 | tf.int64 |
null:無值 | 使用預設值 |
string:Unicode 字元序列 | tf.string |
Avro Dataset API 的完整範例集在測試中提供。