使用SageMaker管道模式处理存储在S3目录中的tfrecords。

3

当我使用Pipe而不是File作为input_mode时,我的对的调用会无限期地挂起,但没有错误消息。我相应地将TensorFlow Dataset替换为Pipemodedataset。以File模式进行的训练成功完成。

我的数据包括两个s3存储桶,每个存储桶中有多个tfrecord文件。尽管我已经广泛查看了文档,但我对如何在这种情况下使用Pipemodedataset并不太自信 - 具体来说,如何设置channel

这是我的Sagemaker笔记本设置:

hyperparameters = {
    "batch-size": 1,
    "pipe_mode": 1,
}

estimator_config = {
    "entry_point": "tensorflow_train.py",
    "source_dir": "source",
    "framework_version": "2.3",
    "py_version": "py37",
    "instance_type": "ml.p3.2xlarge",
    "instance_count": 1,
    "role": sagemaker.get_execution_role(),
    "hyperparameters": hyperparameters,
    "output_path": f"s3://{bucket_name}",
    "input_mode": "Pipe",
}

tf_estimator = TensorFlow(**estimator_config)

s3_data_channels = {
    "training": f"s3://{bucket_name}/data/training",
    "validation": f"s3://{bucket_name}/data/validation",
}

tf_estimator.fit(s3_data_channels)

如果我在`s3_data_channels`上运行`aws s3 ls`命令,我会得到一组tfrecord文件列表。
以下是我设置数据集的方法(根据是否选择了`pipe_mode`而有不同的if/else语句:
import tensorflow as tf

if __name__ == "__main__":

    arg_parser = argparse.ArgumentParser()
    ...
    arg_parser.add_argument("--pipe_mode", type=int, default=0)

    arg_parser.add_argument("--train_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAINING"))
    arg_parser.add_argument(
        "--validation_dir", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION")
    )
    arg_parser.add_argument("--model_dir", type=str)
    args, _ = arg_parser.parse_known_args()

    AUTOTUNE = tf.data.experimental.AUTOTUNE

    if args.pipe_mode == 1:
        from sagemaker_tensorflow import PipeModeDataset
        train_ds = PipeModeDataset(channel="training", record_format='TFRecord')
        val_ds = PipeModeDataset(channel="validation", record_format='TFRecord')

    else:
        train_files = tf.data.Dataset.list_files(args.train_dir + '/*tfrecord')
        val_files = tf.data.Dataset.list_files(args.validation_dir + '/*tfrecord')
        train_ds = tf.data.TFRecordDataset(filenames=train_files, num_parallel_reads=AUTOTUNE)
        val_ds = tf.data.TFRecordDataset(filenames=val_files, num_parallel_reads=AUTOTUNE)

    train_ds = (
        train_ds.map(tfrecord_parser, num_parallel_calls=AUTOTUNE)
        .batch(args.batch_size)
        .prefetch(AUTOTUNE)
    )

    val_ds = (
        val_ds.map(tfrecord_parser, num_parallel_calls=AUTOTUNE)
        .batch(args.batch_size)
        .prefetch(AUTOTUNE)
    )
    ...

笔记本输出为:2021-11-04 01:17:11 开始 - 启动所请求的 ML 实例...... 2021-11-04 01:18:33 开始 - 准备实例进行训练......... 2021-11-04 01:19:54 下载 - 正在下载输入数据... 2021-11-04 01:20:10 训练 - 正在下载训练镜像............... 2021-11-04 01:23:02 训练 - 训练镜像下载完成。正在进行训练..``` 它打印环境变量,然后停留在 ```hook.py:425] 监控集合:sm_metrics、metrics、losses``` - FFT
2个回答

2

我曾经遇到过同样的问题,当使用管道模式时,model.fit()会无限期地卡住。经过一些研究和尝试许多更改后,通过在拟合模型时定义steps_per_epoch来解决了这个问题。

我猜想,当使用文件模式时,它已经知道每个epoch将有多少步骤,但是对于管道模式,您需要手动指定它。


这对我也适用!你是如何计算这个值的?有没有其他方法而不是本地运行? - Aspir

0
我知道已经过去了两年。但是这个问题在Sagemaker或Keras中从未得到解决。 问题的根本原因是keras.engine.data_adapter.DataHandler.enumerate_epochsPipeModeDataset的工作方式。
should_recreate_iterator为真时,在enumerate_epochs中会调用iter(dataset)两次,第一个iterator会立即被丢弃。 由于PipeModeDataset的实现,每次创建迭代器时,它会将索引增加到下一个PIPE,这种情况下会使用PIPE training_1进行第一个epoch。但由于PIPE training_0没有被消耗,后台获取进程不会写入training_1。因此我们会发现训练会卡住。
用户还可以使用PipeModeDataset轻松生成它。只需调用即可。
from sagemaker_tensorflow import PipeModeDataset
train_ds = PipeModeDataset(channel="training", record_format='TFRecord')
iter(ds)
next(iter(ds)) # it will wait forever 

这是一个猴子补丁:
from tensorflow.python.distribute.input_lib import (
    DistributedDataset,
)
def enumerate_epochs(self):
    """Yields `(epoch, tf.data.Iterator)`."""
    with self._truncate_execution_to_epoch():
        if not self._adapter.should_recreate_iterator():
            data_iterator = iter(self._dataset)
        for epoch in range(self._initial_epoch, self._epochs):
            if self._insufficient_data:  # Set by `catch_stop_iteration`.
                break
            if self._adapter.should_recreate_iterator():
                data_iterator = iter(self._dataset)
                if not isinstance(self._dataset, DistributedDataset):
                    steps = self._infer_steps(
                        self._steps_per_epoch, self._dataset
                    )
                    if steps is not None:
                        self._inferred_steps = steps
            yield epoch, data_iterator
            self._adapter.on_epoch_end()
import keras
keras.engine.data_adapter.DataHandler.enumerate_epochs = enumerate_epochs

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接