Tensorflow多个队列链接时的竞态条件问题

4

我希望以多线程的方式计算一组图像的RGB通道的平均值。

我的想法是创建一个string_input_producer,填充一个filename_queue,然后使用num_threads个线程填充第二个FIFOQueue,这些线程从filename_queue加载图像,对其进行一些操作,然后将结果入队。

接下来,由单个线程(主线程)访问这个第二个队列,将队列中所有值相加。

以下是我的代码:

# variables for storing the mean and some intermediate results
mean = tf.Variable([0.0, 0.0, 0.0])
total = tf.Variable(0.0)

# the filename queue and the ops to read from it
filename_queue = tf.train.string_input_producer(filenames, num_epochs=1)
reader = tf.WholeFileReader()
_, value = reader.read(filename_queue)
image = tf.image.decode_jpeg(value, channels=3)
image = tf.cast(image, tf.float32)

sum = tf.reduce_sum(image, [0, 1])
num = tf.mul(tf.shape(image)[0], tf.shape(image)[1])
num = tf.cast(num, tf.float32)

# the second queue and its enqueue op
queue = tf.FIFOQueue(1000, dtypes=[tf.float32, tf.float32], shapes=[[3], []])
enqueue_op = queue.enqueue([sum, num])

# the ops performed by the main thread
img_sum, img_num = queue.dequeue()
mean_op = tf.add(mean, img_sum)
total_op = tf.add(total, img_num)

# adding new queue runner that performs enqueue_op on num_threads threads
qr = tf.train.QueueRunner(queue, [enqueue_op] * num_threads)
tf.train.add_queue_runner(qr)

init_op = tf.initialize_all_variables()

sess = tf.Session()
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

# the main loop being executed until the OutOfRangeError 
# (when filename_queue does not yield elements anymore)
try:
    while not coord.should_stop():
        mean, total = sess.run([mean_op, total_op])

except tf.errors.OutOfRangeError:
    print 'All images processed.'
finally:
    coord.request_stop()

coord.join(threads)

# some additional computations to get the mean
total_3channel = tf.pack([total, total, total])
mean = tf.div(mean, total_3channel)
mean = sess.run(mean)
print mean

这个函数每次运行结果都不一样,比如:
[ 99.35347748  58.35261154  44.56705856]
[ 95.91153717  92.54192352  87.48269653]
[ 124.991745    121.83417511  121.1891861 ]

我把这归咎于竞态条件。但是这些竞态条件是从哪里来的呢?有人能帮帮我吗?

1个回答

2
你的QueueRunner将启动num_threads个线程,这些线程将竞争访问你的reader并将结果推送到队列中。在队列上的图像顺序将根据哪个线程先完成而有所不同。 更新于2月12日 链接两个队列的简单示例,并从第二个队列中总结值。当使用num_threads>1时,中间值存在一些不确定性,但最终值始终为30。当num_threads=1时,所有内容都是确定性的。
tf.reset_default_graph()

queue_dtype = np.int32

# values_queue is a queue that will be filled with 0,1,2,3,4
# range_input_producer creates the queue and registers its queue_runner
value_queue = tf.range_input_producer(limit=5, num_epochs=1, shuffle=False)
value = value_queue.dequeue()

# value_squared_queue will be filled with 0,1,4,9,16
value_squared_queue = tf.FIFOQueue(capacity=50, dtypes=queue_dtype)
value_squared_enqueue = value_squared_queue.enqueue(tf.square(value))
value_squared = value_squared_queue.dequeue()

# value_squared_sum keeps running sum of squares of values 
value_squared_sum = tf.Variable(0)
value_squared_sum_update = value_squared_sum.assign_add(value_squared)

# register queue_runner in the global queue runners collection
num_threads = 2
qr = tf.train.QueueRunner(value_squared_queue, [value_squared_enqueue] * num_threads)
tf.train.queue_runner.add_queue_runner(qr)

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.start_queue_runners()

for i in range(5):
  sess.run([value_squared_sum_update])
  print sess.run([value_squared_sum])

你应该看到:
[0]
[1]
[5]
[14]
[30]

有时候(当前两个值的顺序被颠倒时),也会出现这种情况。
[1]
[1]
[5]
[14]
[30]

是的,没错。但由于我只对“queue”中的值执行加法操作,所以结果应该与“queue”中图像的顺序无关。 我刚刚进行了交叉检查,当我初始化“qr = tf.train.QueueRunner(queue, [enqueue_op])”时,也会得到不同的结果。 - mackcmillion
我仔细阅读了你的问题,发现另一个问题,你正在使用add而不是assign_add,因此你正在查看最后一张图像,这取决于线程如何被调度。更新了一个简单的例子,包含2个队列,以确保事情是确定性的。 - Yaroslav Bulatov
太棒了!这对我很有用。我不知道assign_add操作。非常感谢你。(只有一个小的修正:在您的第二个打印示例中,第二行需要是[0])。 - mackcmillion
不,第二行是[1],因为它是累加和。 - Yaroslav Bulatov
哦,我明白了。谢谢你的澄清。 - mackcmillion

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接