线程的返回值

71

在Python中,我如何让一个线程返回一个元组或我选择的任何值给父线程?

13个回答

73

我建议在启动线程之前实例化一个Queue.Queue,并将其作为其中的一个参数传递给线程:在线程结束之前,它会将结果作为参数接收到的队列.put。父进程可以随时使用.get.get_nowait获取结果。

在Python中,队列通常是安排线程同步和通信的最佳方式:它们内在地支持线程安全的消息传递机制——是组织多任务处理的最佳方法!-)


2
在线程完成之前,它将结果放入作为参数接收的队列中。你的意思是 Python 会自动完成这个过程吗?如果不是(作为设计提示),那么你能否在回答中明确说明。 - n611x007
3
为特定目的专门化现有功能是不美观的;对于单个结果问题,队列具有许多不必要的开销。更清晰有效的方法是子类化 threading.Thread,并将新的run()方法简单地将结果存储为属性,例如 self.ret = ...。(更舒适的方法是Thread的子类来处理自定义目标函数的返回值/异常。确实,应该扩展 threading.Thread 来提供这个功能 - 因为它会与旧行为“返回None”兼容。) - kxr
1
使用队列是最好的答案,然而上面的帖子很差地说明了如何使用队列。请参考此答案,它提供了一个基本的使用队列和返回值的示例。 - Alex

16
你应该将一个Queue实例作为参数传递,然后将返回的对象使用.put()方法放入队列中。无论你放置了什么对象,都可以通过queue.get()方法来收集返回值。
示例:
queue = Queue.Queue()
thread_ = threading.Thread(
                target=target_method,
                name="Thread1",
                args=[params, queue],
                )
thread_.start()
thread_.join()
queue.get()

def target_method(self, params, queue):
 """
 Some operations right here
 """
 your_return = "Whatever your object is"
 queue.put(your_return)

用于多线程:

#Start all threads in thread pool
    for thread in pool:
        thread.start()
        response = queue.get()
        thread_results.append(response)

#Kill all threads
    for thread in pool:
        thread.join()

我使用了这个实现,对我来说非常好用。我希望你也能这样做。


1
当然我会启动线程,我只是忘记在这里加上那一行了 :) 谢谢提醒。 - fth
如果有多个线程,这会是什么样子?que.get() 对我来说只返回一个线程的结果? - ABros
1
在多线程情况下,如果线程尚未完成,则似乎response = queue.get()会引发“Empty exception”并可能以已处理的异常终止。即使每次都成功,这也意味着每个线程都已完成,并且几乎没有实际的多线程发生。 - martineau
1
我认为只有在线程加入后从队列中获取才有意义,因为那时你才知道结果已经被添加。 - alterfox
我仍然无法理解这里queue将被填充的顺序之间的对应关系。我们能以某种顺序收集返回值吗? - Krishna Oza

14

使用lambda将目标线程函数包装起来,使用一个队列将其返回值传回父线程。(您原始的目标函数保持不变,不需要额外的队列参数。)

示例代码:

import threading
import queue
def dosomething(param):
    return param * 2
que = queue.Queue()
thr = threading.Thread(target = lambda q, arg : q.put(dosomething(arg)), args = (que, 2))
thr.start()
thr.join()
while not que.empty():
    print(que.get())

输出:

4

12
如果您正在调用join()等待线程完成,可以将结果直接附加到Thread实例本身,然后在join()返回后从主线程检索它。另一方面,您没有告诉我们如何发现线程已完成并且结果可用。如果您已经有一种方法来做到这一点,那么它可能会指向最佳的获取结果的方法(如果您告诉我们)。

你可以将结果直接附加到 Thread 实例本身。如何将 Thread 实例传递给它运行的目标,以便目标可以将结果附加到该实例? - Piotr Dobrogost
1
Piotr Dobrogost,如果您没有为实例子类化Thread,您可以在目标可调用函数的末尾使用threading.current_thread()。我认为这有点丑陋,但Alex的方法一直更加优雅。在某些情况下,这种方法只是更加迅速。 - Peter Hansen
9
如果join()方法返回调用方法的返回值,那就太好了...看起来它返回None有点傻。 - ArtOfWarfare

9
我很惊讶没有人提到你可以传递一个可变对象:

我很惊讶没有人提到您可以只传递一个可变对象:

>>> thread_return={'success': False}
>>> from threading import Thread
>>> def task(thread_return):
...  thread_return['success'] = True
... 
>>> Thread(target=task, args=(thread_return,)).start()
>>> thread_return
{'success': True}

也许这有重大问题,我不知道。


1
这个运行完美!如果有任何缺少的地方,我真的很想听听意见。 - f-z-N
这个函数虽然能够工作,但是对于现有的函数进行专门化会让它变得丑陋,并且会带来很多混淆(可读性),请参见第一个答案中的评论。 - kxr

5

另一种方法是将回调函数传递到线程中。这为从新线程向父级返回值提供了一种简单、安全和灵活的方式。

# A sample implementation

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, cb):
        threading.Thread.__init__(self)
        self.callback = cb

    def run(self):
        for i in range(10):
            self.callback(i)
            time.sleep(1)


# test

import sys

def count(x):
    print x
    sys.stdout.flush()

t = MyThread(count)
t.start()

11
问题在于回调仍然在子线程中运行,而不是在原始线程中运行。 - babbageclunk
@wilberforce,您能否解释一下它可能会引起什么问题? - Vijay Mathew
4
好的,举个例子,如果回调函数在子线程中运行时写入到一个日志文件中,而主线程也在同时写入该日志文件。由于存在两个同时写入的风险,可能会导致输出混乱或交错,或者在日志框架执行一些内部簿记操作时导致崩溃。使用线程安全队列并让一个线程执行所有写入操作可以避免这种情况发生。这种问题很麻烦,因为它们不是确定性的,可能只会在生产环境中出现,并且很难复现。 - babbageclunk

3
你可以使用同步队列模块。
假设你需要使用已知的id从数据库中检索用户信息:
def check_infos(user_id, queue):
    result = send_data(user_id)
    queue.put(result)

现在您可以像这样获取数据:
import queue, threading
queued_request = queue.Queue()
check_infos_thread = threading.Thread(target=check_infos, args=(user_id, queued_request))
check_infos_thread.start()
final_result = queued_request.get()

我一直收到这个错误:TypeError: square()接受1个位置参数,但实际给出了2个 - Mehdi RH

3

对于简单的程序来说,以上的答案看起来有点过度设计了。我会采用可变的方法来使代码更加简洁:

class RetVal:
 def __init__(self):
   self.result = None


def threadfunc(retVal):
  retVal.result = "your return value"

retVal = RetVal()
thread = Thread(target = threadfunc, args = (retVal))

thread.start()
thread.join()
print(retVal.result)

2

POC:

import random
import threading

class myThread( threading.Thread ):
    def __init__( self, arr ):
        threading.Thread.__init__( self )
        self.arr = arr
        self.ret = None

    def run( self ):
        self.myJob( self.arr )

    def join( self ):
        threading.Thread.join( self )
        return self.ret

    def myJob( self, arr ):
        self.ret = sorted( self.arr )
        return

#Call the main method if run from the command line.
if __name__ == '__main__':
    N = 100

    arr = [ random.randint( 0, 100 ) for x in range( N ) ]
    th = myThread( arr )
    th.start( )
    sortedArr = th.join( )

    print "arr2: ", sortedArr

1

根据jcomeau_ictx的建议,我找到了最简单的方法。这里的要求是从服务器上运行的三个不同进程中获取退出状态,并在所有三个进程都成功时触发另一个脚本。这似乎运行良好。

  class myThread(threading.Thread):
        def __init__(self,threadID,pipePath,resDict):
            threading.Thread.__init__(self)
            self.threadID=threadID
            self.pipePath=pipePath
            self.resDict=resDict

        def run(self):
            print "Starting thread %s " % (self.threadID)
            if not os.path.exists(self.pipePath):
            os.mkfifo(self.pipePath)
            pipe_fd = os.open(self.pipePath, os.O_RDWR | os.O_NONBLOCK )
           with os.fdopen(pipe_fd) as pipe:
                while True:
                  try:
                     message =  pipe.read()
                     if message:
                        print "Received: '%s'" % message
                        self.resDict['success']=message
                        break
                     except:
                        pass

    tResSer={'success':'0'}
    tResWeb={'success':'0'}
    tResUisvc={'success':'0'}


    threads = []

    pipePathSer='/tmp/path1'
    pipePathWeb='/tmp/path2'
    pipePathUisvc='/tmp/path3'

    th1=myThread(1,pipePathSer,tResSer)
    th2=myThread(2,pipePathWeb,tResWeb)
    th3=myThread(3,pipePathUisvc,tResUisvc)

    th1.start()
    th2.start()
    th3.start()

    threads.append(th1)
    threads.append(th2)
    threads.append(th3)

    for t in threads:
        print t.join()

    print "Res: tResSer %s tResWeb %s tResUisvc %s" % (tResSer,tResWeb,tResUisvc)
    # The above statement prints updated values which can then be further processed

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接