Tensorflow: 在CPU上多线程加载数据

10

我有一个Python类SceneGenerator,它有多个成员函数用于预处理和生成器函数generate_data()。基本结构如下:

class SceneGenerator(object):
    def __init__(self):
       # some inits

    def generate_data(self):
        """
        Generator. Yield data X and labels y after some preprocessing
        """
        while True:
            # opening files, selecting data
            X,y = self.preprocess(some_params, filenames, ...)            

            yield X, y

我在keras的model.fit_generator()函数中使用类成员函数sceneGenerator.generate_data()从磁盘读取数据,对其进行预处理并输出。在keras中,如果将model.fit_generator()的参数workers设置为大于1的值,则会使用多个CPU线程。

现在我想在tensorflow中使用相同的SceneGenerator类。我的当前方法是:

sceneGenerator = SceneGenerator(some_params)
for X, y in sceneGenerator.generate_data():

    feed_dict = {ops['data']: X,
                 ops['labels']: y,
                 ops['is_training_pl']: True
                 }
    summary, step, _, loss, prediction = sess.run([optimization_op, loss_op, pred_op],
                                                  feed_dict=feed_dict)

然而,这种方法较慢且不使用多个线程。我发现了tf.data.Dataset API以及一些文档,但我无法实现这些方法。

编辑:请注意,我不处理图像,因此具有文件路径等的图像加载机制在此处不起作用。我的SceneGenerator从hdf5文件中加载数据。但不是完整的数据集,而是根据初始化参数只加载部分数据集。我想保留生成器函数的基本形式,并了解如何将其直接用作tensorflow的输入,并在CPU上运行多个线程。将数据从hdf5文件重写为csv不是一个好选择,因为它会复制大量数据。

编辑2:我认为类似于这样的方法可能有所帮助:parallelising tf.data.Dataset.from_generator


1
我更新了我的源代码,使其更加清晰易懂。 - Merlin1896
2个回答

12
假设您正在使用最新版本的Tensorflow(在撰写本文时为1.4),您可以保留生成器并使用tf.data.* API,如下所示(我选择了线程数、预取缓冲区大小、批处理大小和输出数据类型的任意值):
NUM_THREADS = 5
sceneGen = SceneGenerator()
dataset = tf.data.Dataset.from_generator(sceneGen.generate_data, output_types=(tf.float32, tf.int32))
dataset = dataset.map(lambda x,y : (x,y), num_parallel_calls=NUM_THREADS).prefetch(buffer_size=1000)
dataset = dataset.batch(42)
X, y = dataset.make_one_shot_iterator().get_next()

为了展示实际上有多个线程从生成器中提取数据,我对你的类进行了如下修改:
import threading    
class SceneGenerator(object):
  def __init__(self):
    # some inits
    pass

  def generate_data(self):
    """
    Generator. Yield data X and labels y after some preprocessing
    """
    while True:
      # opening files, selecting data
      X,y = threading.get_ident(), 2 #self.preprocess(some_params, filenames, ...)            
      yield X, y

这样,创建一个Tensorflow会话并获取一个批次将显示获取数据的线程ID。在我的电脑上运行:

sess = tf.Session()
print(sess.run([X, y]))

打印
[array([  8460.,   8460.,   8460.,  15912.,  16200.,  16200.,   8460.,
         15912.,  16200.,   8460.,  15912.,  16200.,  16200.,   8460.,
         15912.,  15912.,   8460.,   8460.,   6552.,  15912.,  15912.,
          8460.,   8460.,  15912.,   9956.,  16200.,   9956.,  16200.,
         15912.,  15912.,   9956.,  16200.,  15912.,  16200.,  16200.,
         16200.,   6552.,  16200.,  16200.,   9956.,   6552.,   6552.], dtype=float32),
 array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
        2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])]

注意: 你可能想尝试移除map调用(我们仅使用它来拥有多个线程),并检查prefetch的缓冲区是否足以消除输入管道中的瓶颈(即使只有一个线程,通常输入预处理比实际图执行更快,因此缓冲区足以让预处理尽可能快速地进行)。


使用X,y生成器,我是否仍需使用feed_dict? - Merlin1896
你不需要使用生成器中的X和y,而是使用iterator.get_next()中的那些作为模型的输入张量(它们取代了你当前使用的占位符)。这样,当执行sess.run()时,你就不需要再提供任何输入。 - GPhilo
啊,我明白了!所以我只需摆脱占位符,改用X和y。在sess.run()期间,Tensorflow会自动获取下一个元素,对吗?我可以重置迭代器,还是必须为每个epoch创建一个新的迭代器? - Merlin1896
没错,Tensorflow 负责从数据集中获取下一个批次。你编写生成器的方式,循环是无限的,因此迭代器永远不会结束,但如果你想改变这一点,可以使用可初始化的迭代器(请查看Tensorflow网站上的“读取数据”教程,我会在回到电脑后在此处放置链接)。 - GPhilo
1
好的,明白了。我应该在这里更新我的代码,使其与我本地脚本中的状态相同。那里的生成器实际上在一些运行之后就结束了,然后我的时代也就结束了。 - Merlin1896
显示剩余3条评论

3

使用feed_dict运行会话确实非常缓慢:

feed_dict将Python运行时的内容单线程复制到TensorFlow运行时中。

更快的方法是使用tf.train.string_input_producer + *Reader + tf.train.Coordinator来提供数据,这将在多个线程中批处理数据。为此,您可以直接将数据读入张量中,例如,以下是读取和处理csv文件的方法:

def batch_generator(filenames):
  filename_queue = tf.train.string_input_producer(filenames)
  reader = tf.TextLineReader(skip_header_lines=1)
  _, value = reader.read(filename_queue)

  content = tf.decode_csv(value, record_defaults=record_defaults)
  content[4] = tf.cond(tf.equal(content[4], tf.constant('Present')),
                       lambda: tf.constant(1.0),
                       lambda: tf.constant(0.0))

  features = tf.stack(content[:N_FEATURES])
  label = content[-1]

  data_batch, label_batch = tf.train.shuffle_batch([features, label],
                                                   batch_size=BATCH_SIZE,
                                                   capacity=20*BATCH_SIZE,
                                                   min_after_dequeue=10*BATCH_SIZE)
  return data_batch, label_batch

这个函数获取输入文件列表,创建读取器和数据转换,并输出张量,这些张量被评估为这些文件的内容。您的场景生成器可能会进行不同的转换,但思路是相同的。 接下来,您需要启动一个tf.train.Coordinator以并行化此过程:
with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    for _ in range(10):  # generate 10 batches
        features, labels = sess.run([data_batch, label_batch])
        print(features)
    coord.request_stop()
    coord.join(threads)

根据我的经验,这种方式可以更快地提供数据,并允许利用整个可用的GPU功率。完整的工作示例可以在这里找到。

谢谢您的输入! 我的数据不是csv格式,而是多个hdf5文件,我只读取其中的部分。我不想将数据复制到csv文件中,因为我的SceneGenerator中的不同参数会强制我再次将所有数据写入新的一组csv文件中。因此,我的问题是如何使用Python生成器来创建输入。我敢打赌,有比先将所有训练(和测试)数据写入csv文件更直接的方法。 - Merlin1896
@Merlin1896 我明白了。嗯,TensorFlow中的hdf5支持...还有很多需要改进的地方。你考虑使用第三方库吗?比如这个:https://github.com/ghcollin/tftables - Maxim
谢谢你的提示,我会去了解一下! - Merlin1896

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接