如何将Tensorflow数据集保存到文件?

14

在SO上至少还有两个类似的问题,但没有一个得到答案。

我有一个数据集,其格式如下:

<TensorSliceDataset shapes: ((512,), (512,), (512,), ()), types: (tf.int32, tf.int32, tf.int32, tf.int32)>

另一个的格式为:

<BatchDataset shapes: ((None, 512), (None, 512), (None, 512), (None,)), types: (tf.int32, tf.int32, tf.int32, tf.int32)>
我已经仔细寻找,但是无法找到保存这些数据集并能够在后来加载的代码。我找到了最接近的 TensorFlow 文档页,其中建议使用tf.io.serialize_tensor序列化张量,然后使用tf.data.experimental.TFRecordWriter将它们写入文件中。
但是,当我尝试使用以下代码时:
dataset.map(tf.io.serialize_tensor)
writer = tf.data.experimental.TFRecordWriter('mydata.tfrecord')
writer.write(dataset)

我在第一行遇到了一个错误:

TypeError: serialize_tensor() 接受1到2个位置参数,但实际给出了4个

我该如何修改上述代码(或者进行其他操作)以实现我的目标呢?

6个回答

10

TFRecordWriter 似乎是最方便的选项,但不幸的是它只能写入每个元素一个张量的数据集。以下是您可以使用的几种解决方法。首先,由于所有张量具有相同的类型和类似的形状,因此您可以将它们全部连接成一个张量,然后在加载时再拆分回来:

import tensorflow as tf

# Write
a = tf.zeros((100, 512), tf.int32)
ds = tf.data.Dataset.from_tensor_slices((a, a, a, a[:, 0]))
print(ds)
# <TensorSliceDataset shapes: ((512,), (512,), (512,), ()), types: (tf.int32, tf.int32, tf.int32, tf.int32)>
def write_map_fn(x1, x2, x3, x4):
    return tf.io.serialize_tensor(tf.concat([x1, x2, x3, tf.expand_dims(x4, -1)], -1))
ds = ds.map(write_map_fn)
writer = tf.data.experimental.TFRecordWriter('mydata.tfrecord')
writer.write(ds)

# Read
def read_map_fn(x):
    xp = tf.io.parse_tensor(x, tf.int32)
    # Optionally set shape
    xp.set_shape([1537])  # Do `xp.set_shape([None, 1537])` if using batches
    # Use `x[:, :512], ...` if using batches
    return xp[:512], xp[512:1024], xp[1024:1536], xp[-1]
ds = tf.data.TFRecordDataset('mydata.tfrecord').map(read_map_fn)
print(ds)
# <MapDataset shapes: ((512,), (512,), (512,), ()), types: (tf.int32, tf.int32, tf.int32, tf.int32)>

但更普遍的情况是,您可以为每个张量(即Tensor)单独创建一个文件,然后读取它们:

import tensorflow as tf

# Write
a = tf.zeros((100, 512), tf.int32)
ds = tf.data.Dataset.from_tensor_slices((a, a, a, a[:, 0]))
for i, _ in enumerate(ds.element_spec):
    ds_i = ds.map(lambda *args: args[i]).map(tf.io.serialize_tensor)
    writer = tf.data.experimental.TFRecordWriter(f'mydata.{i}.tfrecord')
    writer.write(ds_i)

# Read
NUM_PARTS = 4
parts = []
def read_map_fn(x):
    return tf.io.parse_tensor(x, tf.int32)
for i in range(NUM_PARTS):
    parts.append(tf.data.TFRecordDataset(f'mydata.{i}.tfrecord').map(read_map_fn))
ds = tf.data.Dataset.zip(tuple(parts))
print(ds)
# <ZipDataset shapes: (<unknown>, <unknown>, <unknown>, <unknown>), types: (tf.int32, tf.int32, tf.int32, tf.int32)>

可以将整个数据集保存在单个文件中,每个元素有多个独立的张量,即包含tf.train.Example的TFRecords文件,但我不知道是否有一种方法可以在TensorFlow内部创建这些文件,而无需将数据从数据集中提取到Python中,再将其写入记录文件。


我很快就会尝试这个,并会回复您。非常感谢您的帮助!最近一直在努力寻找有关TF的问题的答案。 - Vivek Subramanian
非常感谢!你不知道那有多有帮助! - Vivek Subramanian

10

1
我正在进行实验。有一些笨重的事情,但很容易解决:1. 如果您指定了GZIP压缩,但没有明确说明它是gzipped,当您尝试加载它时,如果您没有指定compression ='GZIP',它将在不发出警告的情况下加载数据,但当您尝试使用它时,它会说“数据损坏”。不明显为什么会损坏。2. 您需要指定tf.TypeSpec。如果tf.data.experimental.save为您创建所需的protobuf,那就太好了,这样您就不必担心它。 - rodrigo-silveira
1
从上面的评论中更正:您需要指定一个tf.TensorSpec(略有不同)。但是,从我简短的实验中不幸得知:文件的大小非常大。我有一个带有7M行大多数为uint16的Parquet文件(经过gzip压缩),大小为277MB。tf.dataset.experimental.save工件(几个目录和“shards”)比Parquet文件少一些“列”,但也经过gzip压缩,大小超过600MB。 - rodrigo-silveira
TensorFlow 2.9文档表示此已被弃用 https://www.tensorflow.org/api_docs/python/tf/data/experimental/save - gary69
在这里,有关如何使用它的代码。它太长了,不适合在评论中呈现 XD - J Agustin Barrachina

6

补充Yoan的回答:

tf.experimental.save()和load()API很好用。您还需要手动将ds.element_spec保存到磁盘上,以便稍后/在不同上下文中进行load()。

对我来说,pickling也很好用:

1- 保存:

tf.data.experimental.save(
    ds, tf_data_path, compression='GZIP'
)
with open(tf_data_path + '/element_spec', 'wb') as out_:  # also save the element_spec to disk for future loading
    pickle.dump(ds.element_spec, out_)

2- 加载时,需要使用包含tf shards的文件夹路径和我们手动pickled的element_spec。

with open(tf_data_path + '/element_spec', 'rb') as in_:
    es = pickle.load(in_)

loaded = tf.data.experimental.load(
    tf_data_path, es, compression='GZIP'
)

2
使用 TF 2.5+,您可以使用 tf.data.experimental.save(...) 进行保存并在不指定元素规范的情况下加载。但是,在旧版本的 TF(如 2.4-)中,似乎需要采用这种方法。 - greedybuddha

5
Tensorflow 2.10将save方法从tf.data.experimental移动到tf.data.Dataset。与load方法一起使用,这是保存和加载模型最简单的方法。
Tensorflow 2.6引入了snapshot方法(以前是“实验性”功能)。 Tensorflow RFC-193详细介绍了该功能的动机和细节。
从文档中可以看出:
快照 API 允许用户透明地将其预处理流程的输出持久化到磁盘,并在不同的训练运行中使预处理数据具体化。该 API 使得可重复的预处理步骤得以合并,允许重复使用已经处理过的数据,以换取释放更有价值的 CPU 资源和加速器计算时间的磁盘存储和网络带宽。

这应该是截至2022年11月的被接受的答案。 - Angelo Cardellicchio
似乎某些继承类型,如<PrefetchedDataset>可能没有这个功能?如何在它们上使用此功能? - Torben Nordtorp

2

我也一直在处理这个问题,目前为止我已经写了以下实用工具(可以在我的存储库中找到)。

def cache_with_tf_record(filename: Union[str, pathlib.Path]) -> Callable[[tf.data.Dataset], tf.data.TFRecordDataset]:
    """
    Similar to tf.data.Dataset.cache but writes a tf record file instead. Compared to base .cache method, it also insures that the whole
    dataset is cached
    """

    def _cache(dataset):
        if not isinstance(dataset.element_spec, dict):
            raise ValueError(f"dataset.element_spec should be a dict but is {type(dataset.element_spec)} instead")
        Path(filename).parent.mkdir(parents=True, exist_ok=True)
        with tf.io.TFRecordWriter(str(filename)) as writer:
            for sample in dataset.map(transform(**{name: tf.io.serialize_tensor for name in dataset.element_spec.keys()})):
                writer.write(
                    tf.train.Example(
                        features=tf.train.Features(
                            feature={
                                key: tf.train.Feature(bytes_list=tf.train.BytesList(value=[value.numpy()]))
                                for key, value in sample.items()
                            }
                        )
                    ).SerializeToString()
                )
        return (
            tf.data.TFRecordDataset(str(filename), num_parallel_reads=tf.data.experimental.AUTOTUNE)
            .map(
                partial(
                    tf.io.parse_single_example,
                    features={name: tf.io.FixedLenFeature((), tf.string) for name in dataset.element_spec.keys()},
                ),
                num_parallel_calls=tf.data.experimental.AUTOTUNE,
            )
            .map(
                transform(
                    **{name: partial(tf.io.parse_tensor, out_type=spec.dtype) for name, spec in dataset.element_spec.items()}
                )
            )
            .map(
                transform(**{name: partial(tf.ensure_shape, shape=spec.shape) for name, spec in dataset.element_spec.items()})
            )
        )

    return _cache

有了这个工具,我可以做到:

dataset.apply(cache_with_tf_record("filename")).map(...)

并且可以使用util的第二部分直接加载数据集以供以后使用。

我仍在努力,所以它可能会在以后发生变化,特别是为了序列化正确类型而不是所有字节以节省空间(我猜测)。


1
您可以像这样使用tf.data.experimental.savetf.data.experimental.load
保存代码:
tf_dataset = get_dataset()    # returns a tf.data.Dataset() file
tf.data.experimental.save(dataset=tf_dataset, path="path/to/desired/save/file_name")
with open("path/to/desired/save/file_name" + ".pickle")), 'wb') as file:
    pickle.dump(tf_dataset.element_spec, file)   # I need this for opening it later

打开代码:

element_spec = pickle.load("path/to/desired/save/file_name" + ".pickle", 'rb'))
tensor_data = tf.data.experimental.load("path/to/desired/save/file_name", element_spec=element_spec)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接