多线程:检查队列成员并停止线程

3
我想用两个线程遍历一个列表。一个从头部开始,另一个从尾部开始,并在每次迭代时将元素放入队列中。但是,在将值放入队列之前,我需要检查值是否已经存在于队列中(即当其中一个线程将该值放入队列时)。因此,当这种情况发生时,我需要停止线程并返回每个线程遍历的值列表。
以下是我迄今为止尝试过的内容:
from Queue import Queue
from threading import Thread, Event

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

main_path = Queue()

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

def a(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'a'
    if is_in_queue(i,main_path):
      return l
    main_path.put(i)

def b(main_path,g,l=[]):
  for i in g:
    l.append(i)
    print 'b'
    if is_in_queue(i,main_path):
      return l
    main_path.put(i)

g=['a','b','c','d','e','f','g','h','i','j','k','l']

t1 = ThreadWithReturnValue(target=a, args=(main_path,g))
t2 = ThreadWithReturnValue(target=b, args=(main_path,g[::-1]))
t2.start()
t1.start()
# Wait for all produced items to be consumed
print main_path.join()

我使用了ThreadWithReturnValue,它会创建一个自定义线程并返回值。

而对于会员资格检查,我使用了以下函数:

def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

现在,如果我先启动 t1,然后再启动 t2,我将得到12个 a,然后一个 b,然后不会执行任何操作,我需要手动终止 Python!但是,如果我先运行 t2,然后再运行 t1,我将得到以下结果:
b
b
b
b
 ab

ab
b

b
b
 b
a
a

所以我的问题是为什么Python在这些情况下处理线程不同?如何终止线程并使它们相互通信?


请看这里http://pymotw.com/2/multiprocessing/communication.html...如果您更感兴趣于管理共享状态。 - OWADVL
@OWADVL 听起来很有用,我会试一下的!谢谢! - Mazdak
你是否有实际需求需要从列表两端迭代,还是只是将其作为分割任务的一种方式? - 101
2个回答

3

在我们深入研究更大的问题之前,你没有正确使用Queue.join

这个函数的整个意义在于,生产者向队列添加一堆项目后可以等待消费者或消费者完成所有这些项目的工作。这是通过消费者在使用get获取每个项目并完成工作后调用task_done来实现的。一旦task_done调用次数与put调用次数相同,队列就完成了。你没有使用get,更不用说task_done了,因此队列永远无法结束。这就是为什么在两个线程完成后你会永远阻塞的原因。


这里的第一个问题是,除了实际同步之外,您的线程几乎没有执行任何工作。 如果它们唯一要做的就是争夺队列,那么一次只有一个线程能够运行。
当然,在玩具问题中,这是常见的,但您必须仔细思考您的真正问题:
- 如果您需要执行大量I/O操作(侦听套接字,等待用户输入等),则线程非常适合。 - 如果您执行大量CPU操作(计算质数),则Python中的GIL会导致线程不起作用,但进程可以。 - 如果您实际上主要处理同步不同任务,则二者都不能很好地工作(并且进程将更糟)。 仍可能更简单地考虑线程,但这将是执行任务最慢的方法。 您可能需要研究协程; Greg Ewing演示了如何使用yield from来使用协程构建调度程序或多个角色模拟。
接下来,正如我在您之前的问题中所提到的,使线程(或进程)有效地与共享状态配合需要尽可能短地持有锁。
因此,如果您必须在锁定状态下搜索整个队列,最好进行常数时间搜索,而不是线性时间搜索。这就是为什么我建议使用类似于“OrderedSet”配方而不是像stdlib中的“Queue.Queue”内部的“list”的原因。然后这个函数:
def is_in_queue(x, q):
   with q.mutex:
      return x in q.queue

...只会阻塞队列一小部分时间——仅足够在表中查找哈希值,而不是足够比较队列中每个元素和x


最后,我尝试在你的另一个问题中解释竞态条件,但让我再试一次。

在你的代码中,需要在每个完整的“事务”周围放置锁,而不仅仅是在单个操作周围放置锁。

例如,如果你这样做:

with queue locked:
    see if x is in the queue
if x was not in the queue:
    with queue locked:
        add x to the queue

如果在你检查时x不在队列中,但在你解锁和重新锁定之间的时间内有人添加了它,则始终可能出现此情况。这正是为什么两个线程都可能提前停止的原因。

要解决此问题,您需要在整个过程周围放置一个锁:

with queue locked:
    if x is not in the queue:
        add x to the queue

当然,这直接违背了我之前关于尽可能短暂地锁定队列的说法。实际上,这就是多线程编程难点所在。编写安全代码很容易,只需锁定所有可能需要的时间,但这样你的代码最终只会使用一个核心,而其他线程则被阻塞等待锁定。编写快速代码也很容易,只需尽可能短暂地锁定一切,但这样不安全,你会得到垃圾值甚至到处崩溃的结果。确定哪些部分需要作为事务处理,如何将这些事务内的工作最小化,并如何处理可能需要的多重锁定以避免死锁…这并不容易。

非常感谢@abarnert抽出时间并提供详细的解释!您澄清了我的一些误解,实际上我在多进程方面很糟糕,我认为我需要更多的学习!! :) - Mazdak
2
@Kasra:“共享内存的多线程很困难”已经是陈词滥调了。每个人都很糟糕,因为我们的直觉是错误的(至少在从语言级别到CPU微码级别设计为优化单处理的系统上是如此,而且无法更改)。尽可能使用更高级别的抽象(消息传递而不是共享内存、STM等),当不可行时……好吧,您将逐渐感到何时忽略自己的直觉并严格地工作(或测试)可以发生什么,但仍会犯难以调试的错误…… - abarnert
是的,我的问题有很多情景!我正在慢慢地解决它并想深入学习!正如你所说,“共享内存多线程很难”是一个陈词滥调,对我来说也是真的,但我喜欢挑战困难的事情,我会克服它的! - Mazdak

2
一些可以改进的事情:
  1. 由于GIL的原因,您可能希望使用multiprocessing(而不是threading)模块。通常,CPython线程不会加速CPU密集型工作。(根据您问题的具体上下文,multiprocessing可能也无法做到这一点,但threading几乎肯定不行。)
  2. is_inqueue这样的函数很可能导致高争用。

锁定时间在需要遍历的项目数量上呈线性增长:

def is_in_queue(x, q):
    with q.mutex:
        return x in q.queue

因此,你可以尝试以下方法。

使用具有共享字典的multiprocessing

 from multiprocessing import Process, Manager

 manager = Manager()
 d = manager.dict()

 # Fn definitions and such

 p1 = Process(target=p1, args=(d,))
 p2 = Process(target=p2, args=(d,))

在每个函数中,检查此类项:
def p1(d):

    # Stuff

    if 'foo' in d:
        return 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接