将大型训练和测试文件流式传输到Tensorflow的DNNClassifier

19
我有一个巨大的训练CSV文件(709M)和一个大型测试CSV文件(125M),我想在使用高级Tensorflow API的情况下将它们发送到中。
看起来,由fitevaluate接受的input_fn参数必须在内存中保存所有特征和标签数据,但是我目前希望在本地计算机上运行它,因此如果我将这些文件读入内存然后处理它们,它很快就会耗尽内存。
我浏览了streamed-reading of data的文档,但是读取CSV的示例代码似乎是针对低级Tensorflow API的。
而且 - 如果您原谅一点抱怨 - 它似乎对于将经过充分准备的训练和测试数据文件发送到的微不足道的用例而言过于复杂...尽管,在Tensorflow中训练和测试大量数据可能确实需要这种复杂性?
无论如何,如果可能的话,我真的很希望能够使用高级API使用那种方法的示例,我开始怀疑是否可能。
在四处探索之后,我确实找到了DNNClassifier#partial_fit,并将尝试使用它进行培训。

这个方法的使用示例可以节省我一些时间,但希望在接下来的几个小时内能够找到正确的用法。

然而,似乎没有对应的 DNNClassifier#partial_evaluate ...虽然我怀疑我可以将测试数据分成较小的部分,并在每个批次上连续运行DNNClassifier#evaluate,这实际上可能是一个很好的方法,因为我可以将测试数据分成组,并获得每组的准确性。

==== 更新 ====

简短版本:

  1. DomJack的建议应该被接受。

  2. 然而,我的Mac有16GB的RAM足以容纳整个709Mb的训练数据集,而不会崩溃。因此,虽然我最终部署应用程序时会使用DataSets功能,但我目前还没有在本地开发工作中使用它。

更长版本:

我开始使用上面描述的partial_fit API,但每次使用都会发出警告。

所以,我去看了这个方法的源代码 here,发现它的完整实现如下:

logging.warning('The current implementation of partial_fit is not optimized'
                ' for use in a loop. Consider using fit() instead.')
return self.fit(x=x, y=y, input_fn=input_fn, steps=steps,
                batch_size=batch_size, monitors=monitors)

......这让我想起《银河系漫游指南》中的一幕:

亚瑟·丹特:如果我按下这个按钮会发生什么?

福特·普雷弗特:我不会-

亚瑟·丹特:哦。

福特·普雷弗特:发生了什么事?

亚瑟·丹特:有一个标志亮起来,上面写着“请不要再按此按钮”。

也就是说:partial_fit似乎只存在于告诉你不要使用它的目的。

此外,通过在训练文件块上迭代使用partial_fit生成的模型比在整个训练文件上使用fit生成的模型要小得多,这强烈表明只有最后一个partial_fit训练块实际上“生效”了。

2个回答

36

查看tf.data.Dataset API,有多种创建数据集的方式。我将概述四种方法 - 但你只需要实现其中一种。

我假设你的csv文件的每一行都是n_features浮点值,后跟一个单独的int值。

创建tf.data.Dataset

使用Dataset.from_generator包装Python生成器

开始的最简单方法是包装本地Python生成器。 这可能会导致性能问题,但对于您的目的可能没有问题。

def read_csv(filename):
    with open(filename, 'r') as f:
        for line in f.readlines():
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield features, label

def get_dataset():
    filename = 'my_train_dataset.csv'
    generator = lambda: read_csv(filename)
    return tf.data.Dataset.from_generator(
        generator, (tf.float32, tf.int32), ((n_features,), ()))

这种方法非常灵活,使您能够在不依赖 TensorFlow 的情况下测试生成器函数 (read_csv)。

使用Tensorflow Datasets API

支持tensorflow版本1.12+,TensorFlow Datasets是我创建数据集的新选择。它可以自动序列化数据,收集统计信息,并通过infobuilder对象向您提供其他元数据。它还可以处理自动下载和提取,从而简化协作。

import tensorflow_datasets as tfds

class MyCsvDatasetBuilder(tfds.core.GeneratorBasedBuilder):
  VERSION = tfds.core.Version("0.0.1")

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        description=(
            "My dataset"),
        features=tfds.features.FeaturesDict({
            "features": tfds.features.Tensor(
              shape=(FEATURE_SIZE,), dtype=tf.float32),
            "label": tfds.features.ClassLabel(
                names=CLASS_NAMES),
            "index": tfds.features.Tensor(shape=(), dtype=tf.float32)
        }),
        supervised_keys=("features", "label"),
    )

  def _split_generators(self, dl_manager):
    paths = dict(
      train='/path/to/train.csv',
      test='/path/to/test.csv',
    )
    # better yet, if the csv files were originally downloaded, use
    # urls = dict(train=train_url, test=test_url)
    # paths = dl_manager.download(urls)
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            num_shards=10,
            gen_kwargs=dict(path=paths['train'])),
        tfds.core.SplitGenerator(
            name=tfds.Split.TEST,
            num_shards=2,
            gen_kwargs=dict(cvs_path=paths['test']))
    ]

  def _generate_examples(self, csv_path):
    with open(csv_path, 'r') as f:
        for i, line in enumerate(f.readlines()):
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield dict(features=features, label=label, index=i)

使用方法:

builder = MyCsvDatasetBuilder()
builder.download_and_prepare()  # will only take time to run first time
# as_supervised makes output (features, label) - good for model.fit
datasets = builder.as_dataset(as_supervised=True)

train_ds = datasets['train']
test_ds = datasets['test']

将基于索引的Python函数封装

以上方法的一个缺点是,使用大小为n的混洗缓冲区对生成的数据集进行混洗需要加载n个示例。 这将在您的管道中创建定期暂停(大型n)或导致潜在的差错混洗(小型n)。

def get_record(i):
    # load the ith record using standard python, return numpy arrays
    return features, labels

def get_inputs(batch_size, is_training):

    def tf_map_fn(index):
        features, labels = tf.py_func(
            get_record, (index,), (tf.float32, tf.int32), stateful=False)
        features.set_shape((n_features,))
        labels.set_shape(())
        # do data augmentation here
        return features, labels

    epoch_size = get_epoch_size()
    dataset = tf.data.Dataset.from_tensor_slices((tf.range(epoch_size,))
    if is_training:
        dataset = dataset.repeat().shuffle(epoch_size)
    dataset = dataset.map(tf_map_fn, (tf.float32, tf.int32), num_parallel_calls=8)
    dataset = dataset.batch(batch_size)
    # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, labels = dataset.make_one_shot_iterator().get_next()
    return features, labels

简而言之,我们只创建一个仅包含记录索引的数据集(或任何可以完全加载到内存中的小型记录ID)。然后在这个最小数据集上执行混洗/重复操作,接着通过 tf.data.Dataset.maptf.py_func 将索引映射到实际数据。请参阅下面的 与Estimators一起使用在隔离环境中进行测试 部分以获取用法说明。请注意,这需要按行访问数据,因此您可能需要从 csv 转换为其他格式。

TextLineDataset

您还可以直接使用 tf.data.TextLineDataset 读取 csv 文件。

def get_record_defaults():
  zf = tf.zeros(shape=(1,), dtype=tf.float32)
  zi = tf.ones(shape=(1,), dtype=tf.int32)
  return [zf]*n_features + [zi]

def parse_row(tf_string):
    data = tf.decode_csv(
        tf.expand_dims(tf_string, axis=0), get_record_defaults())
    features = data[:-1]
    features = tf.stack(features, axis=-1)
    label = data[-1]
    features = tf.squeeze(features, axis=0)
    label = tf.squeeze(label, axis=0)
    return features, label

def get_dataset():
    dataset = tf.data.TextLineDataset(['data.csv'])
    return dataset.map(parse_row, num_parallel_calls=8)

parse_row 函数有些复杂,因为 tf.decode_csv 需要一个批次(batch),如果在解析之前对数据集进行分批(batch)处理可以使其变得稍微简单一些。

def parse_batch(tf_string):
    data = tf.decode_csv(tf_string, get_record_defaults())
    features = data[:-1]
    labels = data[-1]
    features = tf.stack(features, axis=-1)
    return features, labels

def get_batched_dataset(batch_size):
    dataset = tf.data.TextLineDataset(['data.csv'])
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(parse_batch)
    return dataset

TFRecordDataset

你可以将csv文件转换为TFRecord文件,并使用TFRecordDataset。这里有一个详细的教程(链接)

步骤1: 将csv数据转换为TFRecords数据。以下是示例代码(请参见上面的from_generator示例中的read_csv函数)。

with tf.python_io.TFRecordWriter("my_train_dataset.tfrecords") as writer:
    for features, labels in read_csv('my_train_dataset.csv'):
        example = tf.train.Example()
        example.features.feature[
            "features"].float_list.value.extend(features)
        example.features.feature[
            "label"].int64_list.value.append(label)
        writer.write(example.SerializeToString())

这只需要运行一次。

步骤2:编写一个数据集,解码这些记录文件。

def parse_function(example_proto):
    features = {
        'features': tf.FixedLenFeature((n_features,), tf.float32),
        'label': tf.FixedLenFeature((), tf.int64)
    }
    parsed_features = tf.parse_single_example(example_proto, features)
    return parsed_features['features'], parsed_features['label']

def get_dataset():
    dataset = tf.data.TFRecordDataset(['data.tfrecords'])
    dataset = dataset.map(parse_function)
    return dataset

使用估算器处理数据集

def get_inputs(batch_size, shuffle_size):
    dataset = get_dataset()  # one of the above implementations
    dataset = dataset.shuffle(shuffle_size)
    dataset = dataset.repeat()  # repeat indefinitely
    dataset = dataset.batch(batch_size)
            # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, label = dataset.make_one_shot_iterator().get_next()

estimator.train(lambda: get_inputs(32, 1000), max_steps=1e7)

隔离测试数据集

我强烈建议你独立地测试你的数据集而不是和估算器一起测试。使用上面的get_inputs,这非常简单。

batch_size = 4
shuffle_size = 100
features, labels = get_inputs(batch_size, shuffle_size)
with tf.Session() as sess:
    f_data, l_data = sess.run([features, labels])
print(f_data, l_data)  # or some better visualization function

性能

假设您正在使用GPU运行网络,除非csv文件的每一行都很大且您的网络很小,否则您可能不会注意到性能差异。这是因为Estimator实现强制要求在CPU上执行数据加载/预处理,而prefetch意味着在当前批次在GPU上训练时,下一批可以在CPU上准备好。唯一的例外是如果您的数据集具有大量数据记录且洗牌大小很大,则需要在运行任何内容通过GPU之前,花费一些时间来载入若干个样本。


谢谢您提供如此丰富的回复,但我仍然怀疑是否可以对流数据源进行洗牌[可能我们无法预先知道整个资源],以及如何正确地执行它??.. 对于资源(在每个时刻都不完全知道)进行洗牌和拆分似乎是一项非常棘手的任务。 - JeeyCi

4
我同意 DomJack 对使用 Dataset API 的看法,但不需要读取整个 csv 文件然后转换为 TfRecord。我在此提议使用 TextLineDataset - Dataset API 的子类,直接将数据加载到 TensorFlow 程序中。可以在这里找到一个直观的教程。

下面的代码用于 MNIST 分类问题的演示,希望能够回答 OP 的问题。csv 文件有 784 列,分类数为 10。我在这个例子中使用了一个具有 16 个 relu 单元的 1 隐藏层神经网络分类器。

首先,加载库并定义一些常量:

# load libraries
import tensorflow as tf
import os

# some constants
n_x = 784
n_h = 16
n_y = 10

# path to the folder containing the train and test csv files
# You only need to change PATH, rest is platform independent
PATH = os.getcwd() + '/' 

# create a list of feature names
feature_names = ['pixel' + str(i) for i in range(n_x)]

其次,我们使用Dataset API创建一个读取文件的输入函数,然后将结果提供给Estimator API。返回值必须是一个由两个元素组成的元组,第一个元素必须是一个字典,其中每个输入特征都是一个键,并且是一个训练批次值列表;第二个元素是一个训练批次的标签列表。
def my_input_fn(file_path, batch_size=32, buffer_size=256,\
                perform_shuffle=False, repeat_count=1):
    '''
    Args:
        - file_path: the path of the input file
        - perform_shuffle: whether the data is shuffled or not
        - repeat_count: The number of times to iterate over the records in the dataset.
                    For example, if we specify 1, then each record is read once.
                    If we specify None, iteration will continue forever.
    Output is two-element tuple organized as follows:
        - The first element must be a dict in which each input feature is a key,
        and then a list of values for the training batch.
        - The second element is a list of labels for the training batch.
    '''
    def decode_csv(line):
        record_defaults = [[0.]]*n_x # n_x features
        record_defaults.insert(0, [0]) # the first element is the label (int)
        parsed_line = tf.decode_csv(records=line,\
                                    record_defaults=record_defaults)
        label = parsed_line[0]  # First element is the label
        del parsed_line[0]  # Delete first element
        features = parsed_line  # Everything but first elements are the features
        d = dict(zip(feature_names, features)), label
        return d

    dataset = (tf.data.TextLineDataset(file_path)  # Read text file
               .skip(1)  # Skip header row
               .map(decode_csv))  # Transform each elem by applying decode_csv fn
    if perform_shuffle:
        # Randomizes input using a window of 256 elements (read into memory)
        dataset = dataset.shuffle(buffer_size=buffer_size)
    dataset = dataset.repeat(repeat_count)  # Repeats dataset this # times
    dataset = dataset.batch(batch_size)  # Batch size to use
    iterator = dataset.make_one_shot_iterator()
    batch_features, batch_labels = iterator.get_next()

    return batch_features, batch_labels

然后,可以计算小批量数据。
next_batch = my_input_fn(file_path=PATH+'train1.csv',\
                         batch_size=batch_size,\
                         perform_shuffle=True) # return 512 random elements

下一步,我们定义特征列是数值型的。
feature_columns = [tf.feature_column.numeric_column(k) for k in feature_names]

第三步,我们创建一个估计器DNNClassifier
classifier = tf.estimator.DNNClassifier(
    feature_columns=feature_columns,  # The input features to our model
    hidden_units=[n_h],  # One layer
    n_classes=n_y,
    model_dir=None)

最终,使用测试CSV文件训练DNN,而评估是在测试文件上执行的。请更改repeat_countsteps以确保训练符合代码中所需的纪元数。
# train the DNN
classifier.train(
    input_fn=lambda: my_input_fn(file_path=PATH+'train1.csv',\
                                 perform_shuffle=True,\
                                 repeat_count=1),\
                                 steps=None)    

# evaluate using the test csv file
evaluate_result = classifier.evaluate(
    input_fn=lambda: my_input_fn(file_path=PATH+'test1.csv',\
                                 perform_shuffle=False))
print("Evaluation results")
for key in evaluate_result:
    print("   {}, was: {}".format(key, evaluate_result[key]))

感谢您的反馈。我已经根据您的评论对我的答案进行了重大修订。希望这不仅能够满足stackoverflow的要求,而且足以帮助其他人。 - Cuong

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接