使用 Apache Beam 產生大型資料集

有些資料集過於龐大,無法在單一機器上處理。tfds 支援使用 Apache Beam 在多部機器上產生資料。

這份文件包含兩個章節

  • 適用於想要產生現有 Beam 資料集的使用者
  • 適用於想要建立新的 Beam 資料集的開發人員

產生 Beam 資料集

以下是一些產生 Beam 資料集的範例,無論是在雲端或本機上。

在 Google Cloud Dataflow 上

若要使用 Google Cloud Dataflow 執行管線並充分利用分散式運算,請先按照快速入門操作說明進行。

環境設定完成後,您可以使用 tfds build CLI,方法是使用 GCS 上的資料目錄,並指定 必要選項 作為 --beam_pipeline_options 旗標。

為了更輕鬆地啟動指令碼,建議使用 GCP/GCS 設定和您想要產生的資料集的實際值來定義下列變數

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

接著,您需要建立一個檔案,告知 Dataflow 在工作站上安裝 tfds

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

如果您使用的是 tfds-nightly,請務必從 tfds-nightly 執行 echo,以防資料集自上次發行以來已更新。

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

如果您使用的是 TFDS 程式庫中未包含的其他依附元件,請依照管理 Python 管線依附元件的操作說明進行。

最後,您可以使用以下指令啟動工作

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

在本機上

若要使用 預設 Apache Beam 執行器在本機執行指令碼 (必須讓所有資料都符合記憶體),則指令與其他資料集相同

tfds build my_dataset

若要使用 Apache Flink 執行管線,您可以閱讀官方文件。請確認您的 Beam 符合Flink 版本相容性

為了更輕鬆地啟動指令碼,建議使用 Flink 設定和您想要產生的資料集的實際值來定義下列變數

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

若要在內嵌 Flink 叢集上執行,您可以使用以下指令啟動工作

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

使用自訂指令碼

若要在 Beam 上產生資料集,API 與其他資料集相同。您可以使用 DownloadConfigbeam_options (和 beam_runner) 引數自訂 beam.Pipeline

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

實作 Beam 資料集

先決條件

為了撰寫 Apache Beam 資料集,您應該熟悉下列概念

操作說明

如果您熟悉資料集建立指南,則新增 Beam 資料集只需要修改 _generate_examples 函式。此函式應傳回 Beam 物件,而不是產生器

非 Beam 資料集

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Beam 資料集

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

其餘部分可以 100% 相同,包括測試。

其他考量

  • 使用 tfds.core.lazy_imports 匯入 Apache Beam。透過使用延遲依附元件,使用者仍然可以在產生資料集後讀取資料集,而無需安裝 Beam。
  • 請注意 Python 閉包。執行管線時,beam.Mapbeam.DoFn 函式會使用 pickle 序列化並傳送至所有工作站。如果狀態必須在工作站之間共用,請勿在 beam.PTransform 內使用可變物件。
  • 由於使用 pickle 序列化 tfds.core.DatasetBuilder 的方式,在資料建立期間變動 tfds.core.DatasetBuilder 將會在工作站上遭到忽略 (例如,無法在 _split_generators 中設定 self.info.metadata['offset'] = 123,並從工作站存取,例如 beam.Map(lambda x: x + self.info.metadata['offset']))
  • 如果您需要在分割之間共用一些管線步驟,您可以將額外的 pipeline: beam.Pipeline kwarg 新增至 _split_generator,並控制完整的產生管線。請參閱 tfds.core.GeneratorBasedBuilder_generate_examples 文件。

範例

以下是 Beam 資料集的範例。

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

執行您的管線

若要執行管線,請參閱以上章節。

tfds build my_dataset --register_checksums

使用 TFDS 作為輸入的管線

如果您想要建立以 TFDS 資料集作為來源的 Beam 管線,可以使用 tfds.beam.ReadFromTFDS

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

它會平行處理資料集的每個分片。