有些資料集過於龐大,無法在單一機器上處理。tfds
支援使用 Apache Beam 在多部機器上產生資料。
這份文件包含兩個章節
- 適用於想要產生現有 Beam 資料集的使用者
- 適用於想要建立新的 Beam 資料集的開發人員
產生 Beam 資料集
以下是一些產生 Beam 資料集的範例,無論是在雲端或本機上。
在 Google Cloud Dataflow 上
若要使用 Google Cloud Dataflow 執行管線並充分利用分散式運算,請先按照快速入門操作說明進行。
環境設定完成後,您可以使用 tfds build
CLI,方法是使用 GCS 上的資料目錄,並指定 必要選項 作為 --beam_pipeline_options
旗標。
為了更輕鬆地啟動指令碼,建議使用 GCP/GCS 設定和您想要產生的資料集的實際值來定義下列變數
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
接著,您需要建立一個檔案,告知 Dataflow 在工作站上安裝 tfds
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
如果您使用的是 tfds-nightly
,請務必從 tfds-nightly
執行 echo,以防資料集自上次發行以來已更新。
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
如果您使用的是 TFDS 程式庫中未包含的其他依附元件,請依照管理 Python 管線依附元件的操作說明進行。
最後,您可以使用以下指令啟動工作
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
在本機上
若要使用 預設 Apache Beam 執行器在本機執行指令碼 (必須讓所有資料都符合記憶體),則指令與其他資料集相同
tfds build my_dataset
搭配 Apache Flink
若要使用 Apache Flink 執行管線,您可以閱讀官方文件。請確認您的 Beam 符合Flink 版本相容性
為了更輕鬆地啟動指令碼,建議使用 Flink 設定和您想要產生的資料集的實際值來定義下列變數
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
若要在內嵌 Flink 叢集上執行,您可以使用以下指令啟動工作
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
使用自訂指令碼
若要在 Beam 上產生資料集,API 與其他資料集相同。您可以使用 DownloadConfig
的 beam_options
(和 beam_runner
) 引數自訂 beam.Pipeline
。
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
實作 Beam 資料集
先決條件
為了撰寫 Apache Beam 資料集,您應該熟悉下列概念
- 熟悉
tfds
資料集建立指南,因為大部分內容仍然適用於 Beam 資料集。 - 透過 Beam 程式設計指南 取得 Apache Beam 簡介。
- 如果您想要使用 Cloud Dataflow 產生資料集,請閱讀 Google Cloud 文件 和 Apache Beam 依附元件指南。
操作說明
如果您熟悉資料集建立指南,則新增 Beam 資料集只需要修改 _generate_examples
函式。此函式應傳回 Beam 物件,而不是產生器
非 Beam 資料集
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Beam 資料集
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
其餘部分可以 100% 相同,包括測試。
其他考量
- 使用
tfds.core.lazy_imports
匯入 Apache Beam。透過使用延遲依附元件,使用者仍然可以在產生資料集後讀取資料集,而無需安裝 Beam。 - 請注意 Python 閉包。執行管線時,
beam.Map
和beam.DoFn
函式會使用pickle
序列化並傳送至所有工作站。如果狀態必須在工作站之間共用,請勿在beam.PTransform
內使用可變物件。 - 由於使用 pickle 序列化
tfds.core.DatasetBuilder
的方式,在資料建立期間變動tfds.core.DatasetBuilder
將會在工作站上遭到忽略 (例如,無法在_split_generators
中設定self.info.metadata['offset'] = 123
,並從工作站存取,例如beam.Map(lambda x: x + self.info.metadata['offset'])
) - 如果您需要在分割之間共用一些管線步驟,您可以將額外的
pipeline: beam.Pipeline
kwarg 新增至_split_generator
,並控制完整的產生管線。請參閱tfds.core.GeneratorBasedBuilder
的_generate_examples
文件。
範例
以下是 Beam 資料集的範例。
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
執行您的管線
若要執行管線,請參閱以上章節。
tfds build my_dataset --register_checksums
使用 TFDS 作為輸入的管線
如果您想要建立以 TFDS 資料集作為來源的 Beam 管線,可以使用 tfds.beam.ReadFromTFDS
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
它會平行處理資料集的每個分片。