如何在tensorflow中使用自定义python函数预取数据

41

我正在尝试预取训练数据以隐藏I/O延迟。我想编写自定义Python代码,从磁盘加载数据并对数据进行预处理(例如添加上下文窗口)。换句话说,一个线程进行数据预处理,另一个线程进行训练。这在TensorFlow中是否可行?

更新:我有一个基于@mrry示例的工作示例。

import numpy as np
import tensorflow as tf
import threading

BATCH_SIZE = 5
TRAINING_ITERS = 4100

feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])

sess = tf.Session()

def load_and_enqueue(sess, enqueue_op, coord):
  with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
    while not coord.should_stop():
      feature_array = np.fromfile(feature_file, np.float32, 128)
      if feature_array.shape[0] == 0:
        print('reach end of file, reset using seek(0,0)')
        feature_file.seek(0,0)
        label_file.seek(0,0)
        continue
      label_value = np.fromfile(label_file, np.float32, 128)

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()

for i in range(TRAINING_ITERS):
  sum = sess.run(c)
  print('train_iter='+str(i))
  print(sum)

coord.request_stop()
coord.join([t])

5
我刚刚制作了一个有关队列的笔记本,它还解释了一个类似的用例,希望对其他人也有用:https://gist.github.com/akiross/23b6ae42812841bb79af4976a2525cf9 - AkiRoss
@AkiRoss 这个网站无法访问... - Shiqing Fan
2个回答

54

这是一个常见的用例,大多数实现使用TensorFlow的队列来将预处理代码与训练代码解耦。有一个关于如何使用队列的教程,但主要步骤如下:

  1. 定义一个名为q的队列,它将缓存预处理数据。TensorFlow支持简单的tf.FIFOQueue,该队列按照元素入队的顺序产生元素,还支持更高级的tf.RandomShuffleQueue,该队列以随机顺序产生元素。队列元素是一个或多个张量的元组(可以具有不同类型和形状)。所有队列都支持单个元素(enqueuedequeue)和批处理(enqueue_manydequeue_many)操作,但要使用批处理操作,必须在构造队列时指定队列元素中每个张量的形状。

  2. 构建一个子图,将预处理的元素入队到队列中。一种方法是为与单个输入示例对应的张量定义一些tf.placeholder()操作,然后将它们传递给q.enqueue()(如果您的预处理一次生成一个批,则应改用q.enqueue_many())。此子图中也可以包含TensorFlow操作。

  3. 构建一个执行训练的子图。这看起来像一个常规的TensorFlow图,但是通过调用q.dequeue_many(BATCH_SIZE)获取其输入。

  4. 启动会话。

  5. 创建一个或多个线程,执行预处理逻辑,然后执行enqueue操作,提供预处理数据作为输入。您可能会发现tf.train.Coordinatortf.train.QueueRunner实用类对此非常有用。

  6. 按照正常方式运行您的训练图(优化器等)。

编辑:以下是一个简单的load_and_enqueue()函数和代码片段,可供参考:

# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])

# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])

# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...

sess = tf.Session()

def load_and_enqueue():
  with open(...) as feature_file, open(...) as label_file:
    while True:
      feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
      if not feature_array:
        return
      label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()

for _ in range(TRAINING_EPOCHS):
  sess.run(train_op)

1
谢谢你的建议。我有另一个问题。在我的实验中,训练特征和标签存储在两个单独的二进制文件中。我应该构建两个队列,一个用于特征,一个用于标签吗?如果我们想从这两个队列中获取随机的一对(特征,标签),我如何确保特征对应正确的标签?换句话说,我如何保证一对一映射? - read Read
1
我添加了一个示例以使其更清晰。简而言之,如果您从两个不同的线程调用sess.run(),它们将并行运行。 - mrry
@mrry,你如何向load_and_enqueue函数传递参数?抱歉,这可能很明显,但我是Python的新手... - Dims
@mrry,也许你忘记了在问题作者那里加上 args=(sess,enqueue_op, coord) - Dims
q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])中的tf.float32和tf.int32应该交换吗? - Greg Cawthorne
显示剩余15条评论

7
换句话说,一个线程进行数据预处理,另一个线程进行训练。在TensorFlow中是否可能实现?
是的,可以。mrry的解决方案可行,但有更简单的方法。
获取数据 tf.py_func包装了一个python函数并将其用作TensorFlow运算符。因此,我们可以在每次sess.run()时加载数据。这种方法的问题是数据通过主线程在sess.run()期间加载。
最简示例:
def get_numpy_tensor():
  return np.array([[1,2],[3,4]], dtype=np.float32)
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32)

一个更复杂的例子:
def get_numpy_tensors():
  # Load data from the disk into numpy arrays.
  input = np.array([[1,2],[3,4]], dtype=np.float32)
  target = np.int32(1)
  return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])

tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target

sess = tf.InteractiveSession()
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target])
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2

在另一个线程中预取数据

为了将我们的数据排队到另一个线程(这样sess.run()就不必等待数据),我们可以使用tf.train.batch()对来自tf.py_func()的操作进行处理。

一个最简示例:

tensor_shape = get_numpy_tensor().shape
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape])
# Run `tf.train.start_queue_runners()` once session is created.

如果指定了tensorflow_tensor的形状,我们可以省略参数shapes:
tensor_shape = get_numpy_tensor().shape
tensorflow_tensor.set_shape(tensor_shape)
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32)
# Run `tf.train.start_queue_runners()` once session is created.

一个更复杂的示例:
input_shape, target_shape = (2, 2), ()
def get_numpy_tensors():
  input = np.random.rand(*input_shape).astype(np.float32)
  target = np.random.randint(10, dtype=np.int32)
  print('f', end='')
  return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
batch_size = 2
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2)
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`.

tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets

sess = tf.InteractiveSession()
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it.
for _ in range(10):
  numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets])
  assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape)
  print('r', end='')

# Prints `fffffrrffrfrffrffrffrffrffrffrf`.

如果get_numpy_tensor()返回一批张量,那么tf.train.batch(..., enqueue_many=True)将会有所帮助。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接