等到所有的线程都完成在Python中。

192
我想要同时运行多个线程,并在所有线程完成之前等待。
import subprocess

# I want these to happen simultaneously:
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

# I want to wait until the above threads are all finished, and then run this:
print("All threads are done.")

我尝试使用像这个例子这里所示的threading
from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

# TODO: Wait for all threads to finish.

print("All threads are done.")

如何在运行最后一行代码之前等待线程完成?
10个回答

258
将线程放入列表中,.start() 每个线程,然后 .join() 每个线程:
threads = [
    Thread(...),
    Thread(...),
    Thread(...),
]

# Start all threads.
for t in threads:
    t.start()

# Wait for all threads to finish.
for t in threads:
    t.join()

1
是的,这样做可以实现,但更难理解。你应该始终努力在紧凑代码和“可读性”之间找到平衡点。记住:代码只写一次,但会被阅读多次。因此,易于理解更为重要。 - Aaron Digulla
2
“工厂模式”不是我可以用一句话解释清楚的。请在Google和stackoverflow.com上搜索相关内容,有许多例子和解释。简而言之:您编写代码来为您构建复杂的东西。就像一个真正的工厂:您下订单并获得成品。 - Aaron Digulla
22
我不喜欢使用列表推导式来产生没有实际用途的结果列表而带来副作用。即使需要占用两行,一个简单的for循环也会更清晰易懂。 - Ioan Alexandru Cucu
3
@Aaron DIgull 我明白你的意思。我的意思是我会使用 for x in threads: x.join() 而不是列表推导式。 - Ioan Alexandru Cucu
2
@IoanAlexandruCucu:我仍在思考是否有更易读和高效的解决方案:https://dev59.com/CXvaa4cB1Zd3GeqPIuDt - Aaron Digulla
显示剩余9条评论

206

在脚本结尾处,您需要使用Thread对象的join方法。

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

因此,主线程将等待t1t2t3执行完毕。


8
嗯——我有点难以理解,这个代码不是会先运行t1,等待其完成,然后再去运行t2,依次类推吗?那么如何才能让它们同时运行呢?我不明白这怎么能同时运行它们? - Inbar Rose
35
join 方法会阻塞,直到线程执行完毕。无论如何,您都必须等待所有线程完成。如果 t1 最先完成,您将开始等待 t2(它可能已经完成了,您将立即继续等待 t3)。如果 t1 执行时间最长,在从中返回后,t1t2 都将立即返回而无需阻塞。 - Maksim Skurydzin
2
好的,我明白了。一开始有点困惑,但现在我想我明白了。join将当前进程附加到线程上并等待其完成。如果t2在t1之前完成,则当t1完成时,它会检查t2是否已完成,如果完成了,那么再去检查t3..等等..只有当所有线程都完成后才会继续进行。太棒了。 - Inbar Rose
4
如果说t1需要最长时间,但是t2有一个例外情况,那么会发生什么?你能否捕获该异常或检查t2是否正常完成? - Ciprian Tomoiagă
1
@datdinhquoc 无论如何,您都打算等待所有线程完成。如果在线程调用join时已经完成,则调用应该非常快且非阻塞。因此,总等待时间应由最慢的线程执行时间主导。 - Maksim Skurydzin
显示剩余4条评论

67

在Python3中,自Python 3.2起,有一种新的方法可以得到相同的结果。个人而言,我更喜欢使用包concurrent.futures而非传统的线程创建/启动/加入方法。https://docs.python.org/3/library/concurrent.futures.html

使用ThreadPoolExecutor,代码如下:

from concurrent.futures.thread import ThreadPoolExecutor
import time
    
def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')
    
args = ['argumentsA', 'argumentsB', 'argumentsC']
    
with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

前面代码的输出结果类似于:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

其中之一的优点是您可以通过设置最大并发工作者来控制吞吐量。

要改用多进程,您可以使用ProcessPoolExecutor


但是你如何知道线程池中的所有线程何时完成? - K-Dawg
5
正如您在示例中所看到的那样,with语句后的代码在所有任务完成后执行。 - Roberto
这个不起作用。尝试在线程中执行一些非常长的操作。你的打印语句会在线程完成之前执行。 - Pranalee
2
@Pranalee,那段代码是有效的,我已经更新了代码以添加输出行。在所有线程完成之前,您无法看到“所有任务…”,这是with语句在这种情况下的设计方式。无论如何,您可以随时在SO上打开一个新问题并发布您的代码,以便我们帮助您找出发生了什么。 - Roberto
3
@PrimeByDesign 您可以使用concurrent.futures.wait函数,您可以在此处查看真实示例。官方文档:https://docs.python.org/zh-cn/3/library/concurrent.futures.html#concurrent.futures.wait - Alexander Fortin
显示剩余2条评论

41

我更喜欢基于输入列表使用列表推导式:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]

2
已经有一个解释得很好的答案了,但这个答案更简短,不需要丑陋的重复。只是一个好的回答。 :) - tleb
1
列表推导式仅用于副作用通常被弃用。但在这种情况下,它似乎是一个好主意。*https://dev59.com/8W025IYBdhLWcg3w3J1H - Vinayak Kaniyarakkal
6
for t in threads:t.start() 这样写不是更好吗? - Smart Manoj

7
您可以创建一个类似下面的类,从中添加您想要并行执行的“n”个函数或console_scripts,并开始执行并等待所有作业完成。
from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()

这是多进程。问题涉及到https://docs.python.org/3/library/threading.html。 - Rustam A.
我也可以推荐使用 multiprocessing,但不是这种方式。如果您只创建进程而不关心系统资源,它可能会冻结系统。如果您不知道进程的数量,最好使用 .map。 - S__

3

我刚遇到了同样的问题,需要等待使用for循环创建的所有线程。我尝试了以下代码片段。它可能不是完美的解决方案,但我认为这是一个简单的测试解决方案:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise

3

来自 threading 模块文档

有一个“主线程”对象;它对应于Python程序中的初始控制线程。它不是守护线程。

有可能创建“虚拟线程对象”。这些线程对象对应于“外部线程”,即从C代码直接启动的控制线程。虚拟线程对象具有有限功能;它们始终被视为活动和守护进程,并且无法进行join()操作。它们永远不会被删除,因为无法检测到外部线程的终止。

因此,当您不想保留所创建线程列表时,请注意这两种情况:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

在此之后:

>>> print(data)
[0, 4, 12, 40]

2
也许,类似以下内容:
for t in threading.enumerate():
    if t.daemon:
        t.join()

我尝试了这段代码,但不确定它是否有效,因为我的代码的最后一条指令被打印出来了,而这个for循环之后,进程仍未终止。 - Omkar

0

仅使用join可能导致与线程的误报交互。如文档所述:

当timeout参数存在且不为None时,它应该是一个浮点数,指定操作的超时时间(以秒为单位或其分数)。由于join()始终返回None,因此必须在join()后调用isAlive()以决定是否发生超时-如果线程仍然存活,则join()调用已超时。

和说明性的代码片段:

threads = []
for name in some_data:
    new = threading.Thread(
        target=self.some_func,
        args=(name,)
    )
    threads.append(new)
    new.start()
    
over_threads = iter(threads)
curr_th = next(over_threads)
while True:
    curr_th.join()
    if curr_th.is_alive():
        continue
    try:
        curr_th = next(over_threads)
    except StopIteration:
        break

0
创建一个ThreadPoolExecutor(或ProcessPoolExecutor)。 然后,调用您想要的函数func和参数列表xs上的.map
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=len(xs)) as executor:
    results = list(executor.map(func, xs))

.map 返回一个包含每个函数返回值的迭代器,我们将其收集到一个列表中。


在你的情况下:
from concurrent.futures import ThreadPoolExecutor

argss = [
    ["python", "scriptA.py", "a"],
    ["python", "scriptA.py", "b"],
    ["python", "scriptA.py", "c"],
]

with ThreadPoolExecutor(max_workers=len(argss)) as executor:
    results = list(executor.map(call_script, argss))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接