我尝试优化我的数据输入管道。这个数据集是一组450个大小约为70MB的TFRecord文件,存储在GCS上。该作业使用GCP ML Engine执行。没有GPU。
以下是管道:
以下是管道:
def build_dataset(file_pattern):
return tf.data.Dataset.list_files(
file_pattern
).interleave(
tf.data.TFRecordDataset,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
buffer_size=2048
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).map(
map_func=_parse_example_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).prefetch(
buffer_size=1
)
使用映射函数:
def _bit_to_float(string_batch: tf.Tensor):
return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
), tf.float32), 2), (tf.shape(string_batch)[0], -1))
def _parse_example_batch(example_batch):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = _bit_to_float(samples["booleanFeatures"])
return (
tf.concat([dense_float, bits_to_float], 1),
tf.reshape(samples["label"], (-1, 1))
)
我尝试遵循数据管道教程的最佳实践,并向量化我的映射函数(正如mrry建议的那样)。
在这种设置下,虽然数据下载速度很快(带宽约为200MB/s),但CPU利用率很低(仅14%),训练非常缓慢(一个时代需要超过1小时)。
我尝试了一些参数配置,改变了interleave()
参数,例如num_parallel_calls
或cycle_length
,或者使用TFRecordDataset
参数,例如num_parallel_calls
。
最快的配置使用以下参数集:
interleave.num_parallel_calls
: 1interleave.cycle_length
: 8TFRecordDataset.num_parallel_calls
: 8
使用此配置,一个时代只需大约20分钟即可运行。 但是,CPU使用率仅为50%,而带宽消耗约为55MB/s
问题:
- 如何优化管道以达到100%的CPU利用率(以及大约100MB/s的带宽消耗)?
- 为什么
tf.data.experimental.AUTOTUNE
无法找到加速训练的最佳值?
亲切的问候, Alexis。
编辑
经过更多实验,我得出了以下解决方案。
- 删除已由
TFRecordDataset
处理的interleave
步骤,如果num_parallel_calls
大于0,则可以处理。 - 更新映射函数,仅执行
parse_example
和decode_raw
,返回元组`((,), ())` map
之后进行cache
- 将
_bit_to_float
函数作为模型的组成部分移动
最后,这是数据管道代码:
def build_dataset(file_pattern):
return tf.data.TFRecordDataset(
tf.data.Dataset.list_files(file_pattern),
num_parallel_reads=multiprocessing.cpu_count(),
buffer_size=70*1000*1000
).shuffle(
buffer_size=2048
).map(
map_func=split,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).prefetch(
buffer_size=32
)
def split(example):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(dense_float, bits_to_float),
tf.reshape(samples["label"], (1,))
)
def build_model(input_shape):
feature = keras.Input(shape=(N,))
bool_feature = keras.Input(shape=(M,), dtype="uint8")
one_hot = dataset._bit_to_float(bool_feature)
dense_input = tf.reshape(
keras.backend.concatenate([feature, one_hot], 1),
input_shape)
output = actual_model(dense_input)
model = keras.Model([feature, bool_feature], output)
return model
def _bit_to_float(string_batch: tf.Tensor):
return tf.dtypes.cast(tf.reshape(
tf.bitwise.bitwise_and(
tf.bitwise.right_shift(
tf.expand_dims(string_batch, 2),
tf.reshape(
tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
(1, 1, 8)
),
),
tf.constant(0x01, dtype=tf.uint8)
),
(tf.shape(string_batch)[0], -1)
), tf.float32)
感谢所有这些优化:
- 带宽消耗约为90MB/s
- CPU使用率约为20%
- 第一个epoch花费20分钟
- 后续epochs每个花费5分钟
因此,这似乎是一个不错的首次设置。但CPU和带宽仍未过度使用,因此仍然欢迎任何建议!
编辑Bis
因此,在一些基准测试之后,我发现了我认为是我们最好的输入管道:
def build_dataset(file_pattern):
tf.data.Dataset.list_files(
file_pattern
).interleave(
TFRecordDataset,
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
2048
).batch(
batch_size=64,
drop_remainder=True,
).map(
map_func=parse_examples_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).cache(
).prefetch(
tf.data.experimental.AUTOTUNE
)
def parse_examples_batch(examples):
preprocessed_sample_columns = {
"features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(examples, preprocessed_sample_columns)
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(samples['features'], bits_to_float),
tf.expand_dims(samples["label"], 1)
)
那么,有什么新内容:
- 根据这个GitHub问题,
TFRecordDataset
的交错是一个遗留问题,因此interleave
函数更好。 map
之前的batch
是一个好习惯(向量化您的函数),可以减少调用映射函数的次数。- 不再需要
repeat
。自从TF2.0以来,Keras模型API支持数据集API并可以使用缓存(请参见SO post) - 从
VarLenFeature
切换到FixedLenSequenceFeature
,删除对tf.sparse.to_dense
的无用调用。
希望这能有所帮助。欢迎提出建议。