当使用`thread.join()`时,多线程会冻结。

4

我正在尝试设置3个线程并在队列中执行5个任务。想法是线程首先同时运行前3个任务,然后2个线程完成剩下的2个任务。但程序似乎冻结了。我无法检测出任何问题。

from multiprocessing import Manager
import threading
import time
global exitFlag 
exitFlag = 0


class myThread(threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q

    def run(self):
        print("Starting " + self.name)
        process_data(self.name, self.q)
        print("Exiting " + self.name)


def process_data(threadName, q):
    global exitFlag
    while not exitFlag:
        if not workQueue.empty():
            data = q.get()
            print("%s processing %s" % (threadName, data))
        else:
            pass
        time.sleep(1)
    print('Nothing to Process')


threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Manager().Queue(10)
threads = []
threadID = 1

# create thread
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# fill up queue
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# wait queue clear
while not workQueue.empty():
    pass
# notify thread exit
exitFlag = 1
# wait for all threads to finish
for t in threads:
    t.join()
print("Exiting Main Thread")

我不知道具体发生了什么,但是当我删除join()部分后,程序就能够正常运行。我不理解的是,当队列为空时,exitFlag应该已经发出信号。因此,看起来某种方式下,进程并没有检测到该信号。


你可能遇到了Python线程中的死锁问题。deadlock-in-python-threads - stovfl
1个回答

2
您的代码存在多个问题。首先,由于全局解释器锁(GIL),CPython中的线程不能同时运行Python代码。线程必须持有GIL才能执行Python字节码。默认情况下,线程最多持有GIL 5毫秒(Python 3.2+),如果它没有因为进行阻塞I/O而提前释放GIL。要并行执行Python代码,您需要使用multiprocessing
此外,您不必使用Manager.Queue,而应该使用queue.QueueManager.Queue是一个独立的管理进程上的queue.Queue。在这里,您引入了IPC和内存复制的绕路,但没有任何好处。
您死锁的原因是存在竞争条件:
    if not workQueue.empty():
        data = q.get()

这不是一个原子操作。一个线程可以检查workQueue.empty(),然后放弃GIL,让另一个线程清空队列,然后继续执行data = q.get(),如果你不在队列中再次放入数据,它将永远阻塞。 Queue.empty()检查是一种常见的反模式,没有必要使用它。使用毒丸(哨兵值)来打破获取循环,并让工作者知道他们应该退出。您需要与工作线程数量相同的毒丸值。在这里了解更多关于iter(callable, sentinel)的内容here
import time
from queue import Queue
from datetime import datetime
from threading import Thread, current_thread


SENTINEL = 'SENTINEL'


class myThread(Thread):

    def __init__(self, func, inqueue):
        super().__init__()
        self.func = func
        self._inqueue = inqueue

    def run(self):
        print(f"{datetime.now()} {current_thread().name} starting")
        self.func(self._inqueue)
        print(f"{datetime.now()} {current_thread().name} exiting")


def process_data(_inqueue):
    for data in iter(_inqueue.get, SENTINEL):
        print(f"{datetime.now()} {current_thread().name} "
              f"processing {data}")
        time.sleep(1)


if __name__ == '__main__':


    N_WORKERS = 3

    inqueue = Queue()
    input_data = ["One", "Two", "Three", "Four", "Five"]

    sentinels = [SENTINEL] * N_WORKERS # one sentinel value per worker
    # enqueue input and sentinels
    for word in input_data +  sentinels:
        inqueue.put(word)

    threads = [myThread(process_data, inqueue) for _ in range(N_WORKERS)]

    for t in threads:
        t.start()
    for t in threads:
        t.join()

    print(f"{datetime.now()} {current_thread().name} exiting")

示例输出:
2019-02-14 17:58:18.265208 Thread-1 starting
2019-02-14 17:58:18.265277 Thread-1 processing One
2019-02-14 17:58:18.265472 Thread-2 starting
2019-02-14 17:58:18.265542 Thread-2 processing Two
2019-02-14 17:58:18.265691 Thread-3 starting
2019-02-14 17:58:18.265793 Thread-3 processing Three
2019-02-14 17:58:19.266417 Thread-1 processing Four
2019-02-14 17:58:19.266632 Thread-2 processing Five
2019-02-14 17:58:19.266767 Thread-3 exiting
2019-02-14 17:58:20.267588 Thread-1 exiting
2019-02-14 17:58:20.267861 Thread-2 exiting
2019-02-14 17:58:20.267994 MainThread exiting

Process finished with exit code 0

如果您不坚持子类化Thread,您也可以使用multiprocessing.pool.ThreadPool,也称为multiprocessing.dummy.Pool,它会在后台为您处理管道。

非常感谢您提供如此详细的解释!真的非常感激。我在阅读有关多线程的内容时遇到了GIL问题。但是由于对机制的理解不够,没有意识到这是问题所在。 - fnosdy
还有一个小问题。如果我想学习多线程/多进程开发,有好的参考资料吗?到目前为止,我一直在关注博客并进行微调。 - fnosdy
1
@fnosdy GIL 并不是真正的问题,而是竞争条件。当您使用 queue.empty() 进行控制流时,使用进程和 multiprocessing.Queue 也可能遇到相同的问题。 - Darkonaut
1
@fnosdy 供参考...多线程和多进程是广泛而复杂的主题,你很难找到一个提供所有信息的参考。你需要仔细阅读mp-docsmt-docs,并搜索像这个例子一样的教程。此外,还需要一些硬件和操作系统的理解(可以参考维基百科和stackoverflow)。 - Darkonaut
1
试图测试条件以确保操作成功,然后假设它一定会成功是一种反模式。这既增加了复杂性,而且如果在测试和操作之间发生任何变化,就会出现问题。相反,尝试执行操作并在发生失败时处理它。 - David Schwartz
@David Schwartz 说得好。 - Darkonaut

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接