如何提高数据输入流水线的性能?

26
我尝试优化我的数据输入管道。这个数据集是一组450个大小约为70MB的TFRecord文件,存储在GCS上。该作业使用GCP ML Engine执行。没有GPU。
以下是管道:
def build_dataset(file_pattern):
    return tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        tf.data.TFRecordDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        buffer_size=2048
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).map(
        map_func=_parse_example_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).prefetch(
        buffer_size=1
    )

使用映射函数:

def _bit_to_float(string_batch: tf.Tensor):
    return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
        tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
        tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
    ), tf.float32), 2), (tf.shape(string_batch)[0], -1))


def _parse_example_batch(example_batch):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = _bit_to_float(samples["booleanFeatures"])
    return (
        tf.concat([dense_float, bits_to_float], 1),
        tf.reshape(samples["label"], (-1, 1))
    )

我尝试遵循数据管道教程的最佳实践,并向量化我的映射函数(正如mrry建议的那样)。

在这种设置下,虽然数据下载速度很快(带宽约为200MB/s),但CPU利用率很低(仅14%),训练非常缓慢(一个时代需要超过1小时)。

我尝试了一些参数配置,改变了interleave()参数,例如num_parallel_callscycle_length,或者使用TFRecordDataset参数,例如num_parallel_calls

最快的配置使用以下参数集:

  • interleave.num_parallel_calls: 1
  • interleave.cycle_length: 8
  • TFRecordDataset.num_parallel_calls: 8

使用此配置,一个时代只需大约20分钟即可运行。 但是,CPU使用率仅为50%,而带宽消耗约为55MB/s

问题:

  1. 如何优化管道以达到100%的CPU利用率(以及大约100MB/s的带宽消耗)?
  2. 为什么tf.data.experimental.AUTOTUNE无法找到加速训练的最佳值?

亲切的问候, Alexis。


编辑

经过更多实验,我得出了以下解决方案。

  1. 删除已由TFRecordDataset处理的interleave步骤,如果num_parallel_calls大于0,则可以处理。
  2. 更新映射函数,仅执行parse_exampledecode_raw,返回元组`((,), ())`
  3. map之后进行cache
  4. _bit_to_float函数作为模型的组成部分移动

最后,这是数据管道代码:

def build_dataset(file_pattern):
    return tf.data.TFRecordDataset(
        tf.data.Dataset.list_files(file_pattern),
        num_parallel_reads=multiprocessing.cpu_count(),
        buffer_size=70*1000*1000
    ).shuffle(
        buffer_size=2048
    ).map(
        map_func=split,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).prefetch(
        buffer_size=32
    )


def split(example):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (dense_float, bits_to_float),
        tf.reshape(samples["label"], (1,))
    )


def build_model(input_shape):
    feature = keras.Input(shape=(N,))
    bool_feature = keras.Input(shape=(M,), dtype="uint8")
    one_hot = dataset._bit_to_float(bool_feature)
    dense_input = tf.reshape(
        keras.backend.concatenate([feature, one_hot], 1),
        input_shape)
    output = actual_model(dense_input)

    model = keras.Model([feature, bool_feature], output)
    return model

def _bit_to_float(string_batch: tf.Tensor):
    return tf.dtypes.cast(tf.reshape(
        tf.bitwise.bitwise_and(
            tf.bitwise.right_shift(
                tf.expand_dims(string_batch, 2),
                tf.reshape(
                    tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
                    (1, 1, 8)
                ),
            ),
            tf.constant(0x01, dtype=tf.uint8)
        ),
        (tf.shape(string_batch)[0], -1)
    ), tf.float32)

感谢所有这些优化:

  • 带宽消耗约为90MB/s
  • CPU使用率约为20%
  • 第一个epoch花费20分钟
  • 后续epochs每个花费5分钟

因此,这似乎是一个不错的首次设置。但CPU和带宽仍未过度使用,因此仍然欢迎任何建议!


编辑Bis

因此,在一些基准测试之后,我发现了我认为是我们最好的输入管道:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )

那么,有什么新内容:

  • 根据这个GitHub问题TFRecordDataset的交错是一个遗留问题,因此interleave函数更好。
  • map之前的batch是一个好习惯(向量化您的函数),可以减少调用映射函数的次数。
  • 不再需要repeat。自从TF2.0以来,Keras模型API支持数据集API并可以使用缓存(请参见SO post
  • VarLenFeature切换到FixedLenSequenceFeature,删除对tf.sparse.to_dense的无用调用。

希望这能有所帮助。欢迎提出建议。


谢谢你不仅提出了正确的问题,还提供了答案。如果可以的话,我会加两个赞的。:)编辑:实际上,我刚刚做到了 - 我已经点赞了你提到这个问题的另一个答案。 :) - Super-intelligent Shade
@InnocentBystander 不客气 ^^ 感谢您的投票,它们也给了我一些徽章! - AlexisBRENON
2个回答

15

为了社区的利益,在答案部分提到@AlexisBRENON的解决方案和重要观察。

下面列出了重要观察:

  1. 根据这个GitHub问题TFRecordDataset interleaving 是一个遗留的函数,因此interleave函数更好。
  2. map之前的batch是一个好习惯(向量化您的函数),可以减少映射函数被调用的次数。
  3. 不再需要repeat。自从TF2.0以来,Keras模型API支持数据集API并且可以使用缓存(请参见SO post
  4. VarLenFeature切换到FixedLenSequenceFeature,删除了对tf.sparse.to_dense的无用调用。

以下是改进性能的管道代码,符合上述观察:

def build_dataset(file_pattern):
    tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        TFRecordDataset,
        cycle_length=tf.data.experimental.AUTOTUNE,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        2048
    ).batch(
        batch_size=64,
        drop_remainder=True,
    ).map(
        map_func=parse_examples_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).cache(
    ).prefetch(
        tf.data.experimental.AUTOTUNE
    )

def parse_examples_batch(examples):
    preprocessed_sample_columns = {
        "features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(examples, preprocessed_sample_columns)
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (samples['features'], bits_to_float),
        tf.expand_dims(samples["label"], 1)
    )

0

我有一个进一步的建议:

根据interleave()的文档,你可以使用一个映射函数作为第一个参数。

这意味着,你可以这样写:

 dataset = tf.data.Dataset.list_files(file_pattern)
 dataset = dataset.interleave(lambda x:
    tf.data.TFRecordDataset(x).map(parse_fn, num_parallel_calls=AUTOTUNE),
    cycle_length=tf.data.experimental.AUTOTUNE,
    num_parallel_calls=tf.data.experimental.AUTOTUNE
    )

据我所知,这将为每个分片映射一个解析函数,然后交错结果。这样就可以消除以后使用 dataset.map(...) 的需要。

我最近没有进行太多实验。但我认为你的解决方案并没有带来太多改进。我认为interleave处理IO阻塞行为(不需要CPU),而map大部分时间都是CPU密集型的(因此不能太多地并行化)。 因此,我认为你的解决方案与interleave().map()大致相当。 为了确保,请随意尝试这个这个 - AlexisBRENON

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接