如何在不阻塞事件循环的情况下迭代一个大型列表

5
我有一个带有正在运行的asyncio事件循环的Python脚本,我想知道如何在不阻塞事件循环的情况下遍历一个大列表。因此保持循环运行。
我尝试过使用__aiter__和__anext__创建自定义类,但没有成功,我还尝试了创建一个异步函数来产生结果,但仍然会阻塞。
当前:
for index, item in enumerate(list_with_thousands_of_items):
    # do something

我尝试过的自定义类:
class Aiter:
    def __init__(self, iterable):
        self.iter_ = iter(iterable)

    async def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            object = next(self.iter_)
        except StopIteration:
            raise StopAsyncIteration
        return object

但这总是导致

的结果。
TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine

我制作的 async function 能够工作,但仍会阻塞事件循环。
async def async_enumerate(iterable, start:int=0):
    for idx, i in enumerate(iterable, start):
        yield idx, i

你在列表中对每个项目做什么? - rdas
1
你是否考虑过使用类似于ProcessPoolExecutor的东西?它可能非常适合你的问题。 - Rafaël Dera
@rdas 我正在将每个项目添加到一个字符串中。 - Dylee
@RafaëlDera 很遗憾,我从未听说过ProcessPoolExecutor。 - Dylee
1
你的 __aiter__ 应该使用 def,而不是 async def - user4815162342
在执行了@user4815162342的修复之后,您可以尝试将await asyncio.sleep(0)添加到__anext__中;每次迭代或每10次迭代或其他以产生yield。 - Max
2个回答

6
正如@deceze所指出的那样,您可以使用await asyncio.sleep(0)来明确地将控制权传递给事件循环。然而,这种方法存在问题。
可以想象,列表非常大,这就是为什么需要特殊措施来解除阻塞事件循环的原因。但是,如果列表这么大,强制每个循环迭代都让出控制权以便于事件循环会 相当慢。当然,您可以通过添加计数器并仅在i%10 == 0i%100 == 0时等待来减轻这一点,等等。但是,您必须进行任意决策(猜测)关于何时放弃控制权。如果您放弃太频繁,则会减缓函数的速度。如果你放弃的不够频繁,你会使事件循环停滞不前。
这可以通过使用run_in_executor来避免,就像RafaëlDera建议的那样。 run_in_executor接受一个阻塞函数并将其执行卸载线程池中。它立即返回一个future,可以在asyncio中await,并且一旦可用,其结果将是阻塞函数的返回值。(如果阻塞函数引发,异常将被传播。)这样的await将暂停协程,直到函数在其线程中返回或引发异常,同时允许事件循环保持完全功能。由于阻塞函数和事件循环在单独的线程中运行,因此函数不需要执行任何操作即可允许事件工作运行 - 它们独立运行。即使是GIL在这里也不是问题,因为GIL确保控制权在线程之间传递。
使用run_in_executor,您的代码可能如下所示:
def process_the_list():
    for index, item in enumerate(list_with_thousands_of_items):
        # do something

loop = asyncio.get_event_loop()
await loop.run_in_executor(None, process_the_list)

3

asyncio 是一种 协同 多任务处理技术。这里的“协同”是指你的函数必须通过 yield 将执行权交回给事件循环,以便让其他任务运行。如果你没有 await 任何东西(或结束函数),那么你将会独占事件循环。

你可以简单地 await 一些无操作事件,最适合的可能是 await asyncio.sleep(0)。这可以确保你的任务尽快恢复,并允许其他任务也被调度。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接