将numpy数组传递给tensorflow队列

4
我有一个NumPy数组,想要在TensorFlow的代码中使用Queue读取它。我希望队列返回整个数据集并进行洗牌,指定一定数量的epochs并在此之后抛出错误。最好不需要硬编码示例的大小或示例的数量。 我认为shuffle batch就是为此目的而设计的。我已经尝试使用以下方式:
data = tf.constant(train_np) # train_np is my numpy array of shape (num_examples, example_size)
batch = tf.train.shuffle_batch([data], batch_size=5, capacity=52200, min_after_dequeue=10, num_threads=1, seed=None, enqueue_many=True)

sess.run(tf.initialize_all_variables())
tf.train.start_queue_runners(sess=sess)
batch.eval()

那种方法的问题在于它会连续读取所有数据,我无法指定在某些轮数后停止。我知道我可以使用RandomShuffleQueue并将数据少量地插入其中几次,但是: a)我不想浪费 epoch*data 的内存,并且 b)这将允许队列在 epochs 之间进行洗牌。
是否有一种好的方法在 Tensorflow 中按 epochs 读取已打乱的数据而不编写自己的队列?
1个回答

6
您可以创建另一个队列,将数据排入其中num_epoch次,关闭它,然后将其连接到您的batch。为了节省内存,您可以将此队列设得很小,并并行排队项目。在epochs之间会有一些混合。为了完全防止混合,您可以使用下面的代码,调用num_epochsnum_epochs=1
tf.reset_default_graph()
data = np.array([1, 2, 3, 4])
num_epochs = 5
queue1_input = tf.placeholder(tf.int32)
queue1 = tf.FIFOQueue(capacity=10, dtypes=[tf.int32], shapes=[()])

def create_session():
    config = tf.ConfigProto()
    config.operation_timeout_in_ms=20000
    return tf.InteractiveSession(config=config)

enqueue_op = queue1.enqueue_many(queue1_input)
close_op = queue1.close()
dequeue_op = queue1.dequeue()
batch = tf.train.shuffle_batch([dequeue_op], batch_size=4, capacity=5, min_after_dequeue=4)

sess = create_session()

def fill_queue():
    for i in range(num_epochs):
        sess.run(enqueue_op, feed_dict={queue1_input: data})
    sess.run(close_op)

fill_thread = threading.Thread(target=fill_queue, args=())
fill_thread.start()

# read the data from queue shuffled
tf.train.start_queue_runners()
try:
    while True:
        print batch.eval()
except tf.errors.OutOfRangeError:
    print "Done"

顺便提一下,如果队列的大小不足以将整个numpy数据集加载到其中,则上述enqueue_many模式会挂起。您可以通过按以下方式分块加载数据来使自己具有更小的队列灵活性。

tf.reset_default_graph()
data = np.array([1, 2, 3, 4])
queue1_capacity = 2
num_epochs = 2
queue1_input = tf.placeholder(tf.int32)
queue1 = tf.FIFOQueue(capacity=queue1_capacity, dtypes=[tf.int32], shapes=[()])

enqueue_op = queue1.enqueue_many(queue1_input)
close_op = queue1.close()
dequeue_op = queue1.dequeue()

def dequeue():
    try:
        while True:
            print sess.run(dequeue_op)
    except:
        return 

def enqueue():
    for i in range(num_epochs):
        start_pos = 0
        while start_pos < len(data):
            end_pos = start_pos+queue1_capacity
            data_chunk = data[start_pos: end_pos]
            sess.run(enqueue_op, feed_dict={queue1_input: data_chunk})
            start_pos += queue1_capacity
    sess.run(close_op)

sess = create_session()

enqueue_thread = threading.Thread(target=enqueue, args=())
enqueue_thread.start()

dequeue_thread = threading.Thread(target=dequeue, args=())
dequeue_thread.start()

你能详细解释一下它是如何工作的吗(可能会指向文档)? 特别是:为什么创建一个单独的线程并将所有元素“num_epochs”多次排队不会使用太多内存? 还有:如果将数据多次排队到FIFO队列中,为什么会混合元素?只有当一个时期结束并另一个开始时才会出现吗? 此外,调用整个代码“num_epochs”次远非“读取数据的好方法”:P - sygi
1
如果您的队列容量为10,则任何时候它只需要10个示例的空间,异步队列加载将在消耗前一个示例时添加更多示例。由(shuffle_batch)创建的第二个队列不知道时代何时结束,因此来自时代的最后一批可能也会有来自下一个时代的条目。 - Yaroslav Bulatov
队列的文档在这里,此外,这里是我关于它们的演讲幻灯片。 - Yaroslav Bulatov
你知道为什么只有在指定操作超时时才会出现以下错误吗? CancelledError: Dequeue operation was cancelled [[Node: shuffle_batch = QueueDequeueMany[_class=["loc:@shuffle_batch/random_shuffle_queue"], component_types=[DT_INT32], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](shuffle_batch/random_shuffle_queue, shuffle_batch/n)]] - sygi
我喜欢指定超时时间,因为默认的超时时间是无限的,谁有时间等这么长时间呢?在实践中,你可以使用更大的值,比如120秒(第一次加载cuda东西可能需要60秒)。当存在滞留者时,例如可能没有足够的项目来填充最后一批,它想要永远等待,并且超时终止等待时,你会收到以上类似的消息。相反,你可以使用dequeue_up_to - Yaroslav Bulatov
谢谢!我尝试增加超时时间,但没想到可能需要将其设置为>10秒。 - sygi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接