Python 异步(非阻塞)事件处理程序与 while 循环

8
import queue

qq = queue.Queue()
qq.put('hi')

class MyApp():

    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    def get_item(self):
        try:
            item = self._queue.get_nowait()
            self._process_item(item)
        except queue.Empty:
            pass

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            self.get_item()
            await asyncio.sleep(0)      

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

使用Python 3.6。

我正在尝试编写一个事件处理程序,它可以不断地监听queue中的消息,并处理这些消息(在本例中打印)。但它必须是异步的 - 我需要能够在终端(IPython)中运行它,并手动向queue发送内容(至少最初是为了测试)。

以下代码无法正常工作- 它会一直阻塞。

我该如何让它永久运行,但在每次while循环迭代后返回控制权?

谢谢。

附注: 为了使事件循环与IPython(版本7.2)配合使用,我正在使用代码来自于ib_insync库, 在上面的实际问题中,我正在使用这个库。


通常情况下,使用while True是不好的。相反,当您向队列中添加内容(使用方法)时,应调用另一个方法从队列中取出元素,执行其操作,并在最后再次检查队列以获取更多元素。如果没有任何元素,则终止。对于此类任务,使用线程也更容易。 - RnD
我不太清楚你想做什么。虽然loop.run_until_complete()会阻塞,但你的方法似乎是同步的。除了在顶部放置消息之外,我没有看到消息被放入队列的地方。你提到手动输入消息...这是什么意思? - Nikolas Stevenson-Molnar
@NikolasStevenson-Molnar 我有另一个线程,用于轮询外部网络资源以获取数据(I/O 密集型),并将传入的消息转储到此线程中。这是接收传入数据的处理程序。 - Josh D
在这种情况下,看起来你在错误的地方使用了asyncio。它非常适用于网络I/O,但从你的代码示例中显示的内容来看,在这里似乎没有什么作用;你基本上构建了一个异步例程,其实质上像一个同步例程。 - Nikolas Stevenson-Molnar
2个回答

7

您需要将队列改为asyncio.Queue,并以线程安全的方式向队列中添加元素。例如:

qq = asyncio.Queue()

class MyApp():
    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    async def get_item(self):
        item = await self._queue.get()
        self._process_item(item)

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            await self.get_item()

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

您的其他线程必须按照以下方式将内容放入队列中:

loop.call_soon_threadsafe(qq.put_nowait, <item>)

call_soon_threadsafe 会确保正确的锁定,并在新的队列项准备就绪时唤醒事件循环。


这很好,但是当我调用loop.run_until_complete(a.listen_for_orders())时,它会阻止我在Python控制台中输入更多命令。我需要它不要阻塞。 - Josh D
@JoshD 然后在后台线程中运行 run_until_complete,并在主线程中从控制台读取。只要使用 call_soon_threadsafe 将内容添加到队列中,代码就可以在任何线程中运行事件循环。 - user4815162342
谢谢。虽然它并没有完全解决我的问题,但我怀疑如果我将线程和异步结合起来,我的整体架构可能存在问题。从您的其他答案中我看到您对异步编程非常了解,不知道能否私下联系您寻求帮助?您有时间吗?谢谢。 - Josh D

2
这不是一个异步队列。您需要使用 asyncio.Queue
qq = queue.Queue()

异步是一个事件循环。您调用循环将控制权转移给它,它会循环直到您的函数完成,但这永远不会发生:
loop.run_until_complete(a.listen_for_orders())

你评论道:

我有另一个线程,它轮询外部网络资源以获取数据(I/O 密集型),并将传入的消息转储到此线程中。

把这段代码写成异步的 - 这样你会有:
async def run():
    while 1:
        item = await get_item_from_network()
        process_item(item)

loop = asyncio.get_event_loop()
loop.run_until_complete( run() )

如果你不想这样做,你可以通过循环步进,尽管你不想这样做。
import asyncio


def run_once(loop):
    loop.call_soon(loop.stop)
    loop.run_forever()


loop = asyncio.get_event_loop()

for x in range(100):
    print(x)
    run_once(loop)

然后您只需调用异步函数,每次调用 run_once 时,它都会检查您的 (asyncio 队列),如果队列中有项目,则将控制权传递给您的 listen for orders 函数。

谢谢,但我的get_item_from_network()不是可等待的——它是调用Azure Storage Queue Python SDK从队列获取消息的操作,并且我相信它是建立在阻塞式的requests之上的。因此,我将获取这些消息的代码放在一个单独的线程中,并转移到了一个queue.Queue中。 - Josh D

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接