Python的asyncio等待线程

12
我有一个情况,我有一个“服务器”线程,应该监听来自其他服务器线程的调用/事件,同时执行一些其他代码。最近我已经大量使用Node.js,所以我想使用async/await创建一个事件循环,在其中等待其他线程加入事件循环,并在它们最终加入时处理它们的响应。
为了测试这个想法,我在Python 3.5中编写了以下测试脚本:
# http://stackabuse.com/python-async-await-tutorial/
# Testing out Python's asynchronous features
import asyncio
from time import sleep
import threading
from threading import Thread
import random

class MyThread(Thread):

    def __init__(self, message):
        Thread.__init__(self)
        self._message = message

    def run(self):
        self._return = self._message + " oli viesti"
        a = random.randint(1, 5)
        print("Sleep for ", a)
        sleep(a)
        print("Thread exiting...")


    def join(self):
        Thread.join(self)
        return self._return



async def send(message):
    t = MyThread(message)  # daemon = True
    t.start()
    print("asd")
    return t.join()

async def sendmsg(msg):
    response = await send(msg)
    print("response is ", response)


if __name__ == "__main__":
    # Initiate a new thread and pass in keyword argument dictionary as parameters
    loop = asyncio.get_event_loop()
    tasks = [
        asyncio.ensure_future(sendmsg("hippa1"), loop=loop),
        asyncio.ensure_future(sendmsg("hippa2"), loop=loop),
        asyncio.ensure_future(sendmsg("hippa3"), loop=loop)
    ]

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

在这个例子中,我想启动三个不同字符串的工作线程,并等待它们完成。工作线程会随机休眠一段时间,因此当多次运行脚本时,它们应该以随机顺序完成。但实际上它们似乎是按顺序执行的,第二个线程在第一个线程之后开始。
我的错误在哪里?难道睡眠只会阻塞当前线程吗?我的事件循环设置正确吗?我能否使用async/await来加入线程?
最终,我想向其他线程发送消息并等待它们的响应,然后使用返回值运行回调函数。
编辑:为了澄清,最终我想在主线程中使用async/await等待条件变量,直到有些条件变量允许执行通过。在这个示例代码中,我试图使用工作线程的join做同样的事情。

time.sleep 不是异步的,请尝试使用 await asyncio.sleep - jonrsharpe
但它是在一个单独的线程中触发的,所以它不应该阻塞事件循环所在的主线程,而应该只阻塞单独的线程。我的理解是,sleep是线程阻塞,而不是进程阻塞。那么为什么即使使用async/await结构,join仍会阻塞我的主线程呢? - Tumetsu
1个回答

5
最终,由于这段代码的存在,它是按顺序运行的:
async def send(message):
    t = MyThread(message)  # daemon = True
    t.start()
    print("asd")
    return t.join()

你启动一个线程,然后立即等待该线程完成,然后再继续。这就是它们按顺序执行的原因。
Node.js和asyncio不一定会创建新线程来执行它们的操作。例如,Node.js仅使用单个线程,但它使用内核级函数(例如'epoll')在新网络活动发生时调用您指定的回调函数。这使得单个线程可以管理数百个网络连接。
这就是为什么当你没有使用Thread实例来执行此操作时,你会在当前运行的线程上调用sleep,这与主线程相同。当你使用带有网络功能的asyncio时,你可以使用“yield from”结构,这允许其他代码块在其他任务正在处理其他远程服务时执行。
主要结构是正确的。你需要这段代码块:
loop.run_until_complete(asyncio.wait(tasks))

但是不要仅凭“睡眠”测试该函数,你需要进行网络调用或使用:

yield from asyncio.sleep(1)

在这种情况下,没有必要启动单独的线程。


1
线程启动的观点很好。然而,在我的情况下,我确实需要单独的线程,但如果可能的话,我希望异步地等待它们的条件。因此,我想要一个函数,在事件循环中检查加入,但如果仍然在循环中执行其他任意代码,直到加入在循环中解除阻塞。因此,我的情况并不完全类似于Node.js,我希望通过使用异步事件循环来等待其他线程完成/设置条件变量,而不会阻塞我的主线程。 - Tumetsu
简而言之,我无法想出如何编写类似以下代码的内容: yield t.join() - Tumetsu
1
在这种情况下,我建议您根本不要使用asyncio,而是在中间使用队列进行工作。您可以使用task_done()等待队列,并使用get()来获取工作项。然后,您可以使用另一个队列将结果传递回主线程。文档页面上有一个显示它的代码示例:https://docs.python.org/3/library/queue.html - radialmind
很好了解。当我无法让asyncio工作时,我最终以那种方式实现了它。架构足够清晰,可以使用它。 - Tumetsu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接