如何将Python的asyncio与线程结合使用?

78
我已经成功使用Python asyncio和aiohttp构建了一个RESTful微服务,它监听POST事件以从各种提供者收集实时事件。然后,它构建了一个嵌套的defaultdict/deque结构来缓存最近24小时的事件。
现在,我想定期将该结构检查点到磁盘上,最好使用pickle。由于内存结构可能超过100MB,我想避免在检查点结构时阻塞传入的事件处理所需的时间。我宁愿创建一个快照副本(例如deepcopy)的结构,然后花费时间将其写入磁盘,并按预设时间间隔重复此操作。
我一直在搜索如何将线程(线程是否是这个问题的最佳解决方案?)和asyncio组合使用以实现此目的的示例,但是找不到有帮助的内容。
非常感谢任何开始的指针!

1
我使用了dano的建议,并构建了一个非常简单的多线程设置,每60秒将内存中的事件存储检查点写入磁盘。这是一个包含整个逻辑的git repo文件链接:https://github.com/fxstein/SentientHome/blob/master/engine/event.engine.py - fxstein
3个回答

93

使用BaseEventLoop.run_in_executor方法将一个方法委托给线程或子进程非常简单:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_operation(x):
    time.sleep(x) # This is some operation that is CPU-bound

@asyncio.coroutine
def main():
    # Run cpu_bound_operation in the ProcessPoolExecutor
    # This will make your coroutine block, but won't block
    # the event loop; other coroutines can run in meantime.
    yield from loop.run_in_executor(p, cpu_bound_operation, 5)


loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())

关于使用ProcessPoolExecutor还是ThreadPoolExecutor,这有点难说;pickle一个大对象肯定会消耗一些CPU周期,最初你可能会认为ProcessPoolExecutor是更好的选择。然而,将100MB对象传递给池中的Process需要在主进程中对实例进行pickle,通过IPC将字节发送到子进程,然后在子进程中进行反pickle,并再次pickle以便将其写入磁盘。鉴于此,我猜测pickle / unpickle开销足够大,因此最好使用ThreadPoolExecutor,尽管由于GIL的存在,性能可能会受到影响。
话虽如此,测试两种方法非常简单,可以确切地了解哪种方式更好,所以不妨这么做。

1
谢谢你,dano!这其实很容易。你是正确的,我采用了ThreadPoolExecutor的方法,现在可以每60秒写入检查点而不会阻塞任何事件处理。 - fxstein
我猜这对我来说有点晚了,但是当我听到“asyncio + paralllelism [sp?]”时,我想起了这篇杰出的文章shorturl.at/aOVZ2。哦,是的,我也方便地想起了这句话:“你不应该评判别人的评论,或者作证你自己的判断力。” ̿̿ ̿̿ ̿̿ ̿’̿’\̵͇̿̿\З= ( ▀ ͜͞ʖ▀) =Ε/̵͇̿̿/’̿’̿ ̿ ̿̿ ̿̿ ̿̿(所以如果这些都不相关-那就难了)。谢谢。 - JB-007

15

我也使用了run_in_executor,但我发现在大多数情况下这个函数有点繁琐,因为它需要使用partial()来处理关键字参数,而我从未使用过除单个执行器和默认事件循环以外的任何内容。所以我创建了一个便捷的包装器,它具有合理的默认值和自动关键字参数处理。

from time import sleep
import asyncio as aio
loop = aio.get_event_loop()

class Executor:
    """In most cases, you can just use the 'execute' instance as a
    function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
    the executor, assign result to y. The defaults can be changed, though,
    with your own instantiation of Executor, i.e. execute =
    Executor(nthreads=4)"""
    def __init__(self, loop=loop, nthreads=1):
        from concurrent.futures import ThreadPoolExecutor
        self._ex = ThreadPoolExecutor(nthreads)
        self._loop = loop
    def __call__(self, f, *args, **kw):
        from functools import partial
        return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()

...

def cpu_bound_operation(t, alpha=30):
    sleep(t)
    return 20*alpha

async def main():
    y = await execute(cpu_bound_operation, 5, alpha=-2)

loop.run_until_complete(main())

6

另一个选择是使用 loop.call_soon_threadsafe 以及一个 asyncio.Queue 作为通信的中间渠道。

Python 3 的当前文档还有一节关于使用 asyncio 进行开发 - 并发和多线程的内容:

import asyncio

# This method represents your blocking code
def blocking(loop, queue):
    import time
    while True:
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking A')
        time.sleep(2)
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking B')
        time.sleep(2)

# This method represents your async code
async def nonblocking(queue):
    await asyncio.sleep(1)
    while True:
        queue.put_nowait('Non-blocking A')
        await asyncio.sleep(2)
        queue.put_nowait('Non-blocking B')
        await asyncio.sleep(2)

# The main sets up the queue as the communication channel and synchronizes them
async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_running_loop()

    blocking_fut = loop.run_in_executor(None, blocking, loop, queue)
    nonblocking_task = loop.create_task(nonblocking(queue))

    running = True  # use whatever exit condition
    while running:
        # Get messages from both blocking and non-blocking in parallel
        message = await queue.get()
        # You could send any messages, and do anything you want with them
        print(message)

asyncio.run(main())

如何将 asyncio任务发送到运行在其他线程中的循环 可能也会对您有所帮助。

如果您需要一个更 "强大" 的示例,请查看我的从线程代码启动异步任务的包装器。它会为您处理线程安全部分(大部分)并让您执行像这样的操作:

# See https://gist.github.com/Lonami/3f79ed774d2e0100ded5b171a47f2caf for the full example

async def async_main(queue):
    # your async code can go here
    while True:
        command = await queue.get()
        if command.id == 'print':
            print('Hello from async!')
        elif command.id == 'double':
            await queue.put(command.data * 2)

with LaunchAsync(async_main) as queue:
    # your threaded code can go here
    queue.put(Command('print'))
    queue.put(Command('double', 7))
    response = queue.get(timeout=1)
    print('The result of doubling 7 is', response)

你与queue的唯一交互方式是通过loop.call_soon_threadsafe从另一个线程调用回调函数,这是“必须用于从另一个线程调度回调函数”的。据我所知,这是安全的方法,但我可能误解了文档。 - Lonami
有人能解释一下为什么如果我们注释掉async def nonblocking(queue)中的所有三个await asyncio.sleep()函数执行,代码就不能按预期工作了吗? - ievgenii
如果没有 await,函数实际上会变成阻塞性质。需要使用 await 以便 asyncio 事件循环可以重新获取控制权(在 await 点暂停函数,让它完成其他工作,然后再返回;在那个点它字面意义上“返回”,但稍后又回到了那个点)。 - Lonami
谢谢,我知道这一点。但是,我有两个后续问题:1. queue.put_nowait('Non-blocking A') 不应该是非阻塞的吗,因为它是 put 的“nowait”版本?2. 我尝试使用 await queue.put_nowait('Non-blocking A'),但没有任何区别。 - ievgenii
1
put 是一个协程(需要使用 await)它会阻塞(让出控制权),直到队列中有空闲空间可以放置项目。put_nowait 不是协程(不使用 await),如果队列未满,它将引发错误。如果 nonblocking 永远不休眠或让出控制权,则不会给其他代码运行的机会。您可以使用 await asyncio.sleep(0) 强制进行一次让出控制权而不休眠。 - Lonami

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接