使用asyncio进行进程间通信?

7
我有一组CPU密集型进程,偶尔需要彼此依赖才能继续。因此类似这样的:
def run():
  while True:
    do stuff
    wake up some other process
    wait for some other process to wake me up
    do stuff

在每个进程中,我想使用异步操作,这样我就可以始终拥有一个正在运行的run实例,而其他实例则在等待唤醒。查看asyncio文档,我在“高级API”部分中看到的唯一IPC选项是使用套接字。我更愿意使用管道,看起来我可以通过低级API实现,但是那份文档充满了警告,如果你只是编写一个应用程序,那么使用它是错误的。有人能给出在这里做的习惯用法吗?(还有,速度是一个重要因素,所以如果有一些不太习惯但更高效的方法,我也想知道那个选项。)


1
使用asyncio,您最好的选择可能是使用套接字,因为它得到了最好的支持。您可以使用域套接字来避免(本地)TCP/IP的开销。管道的唯一公共异步接口是通过生成子进程来实现的。但是,虽然您可以与子进程异步通信,在子进程内部却没有异步通信与父进程的设施。 - user4815162342
真遗憾。我特别惊讶于multiprocessing中没有异步版本的数据结构。 - danben
1
我认为有一个 aiomultiprocessing,但据我所知它只是在幕后使用线程池来运行真正的多进程,并在其上提供“异步”API。由于多进程本身启动了一些线程,因此您需要支付两者的资源。 - user4815162342
在每个进程中,我想使用互斥锁、锁、信号量或其他同步工具来代替 xxx,这样我就可以在其他进程等待唤醒时始终保持 run 实例运行。但肯定不会使用 asyncio。你认为 asyncio 是正确的工具吗? - Serge Ballesta
@SergeBallesta 每个线程定期阻塞等待来自另一个进程的数据。我希望该线程仅在有数据可用时才会唤醒。threading 不是使用抢占式线程吗?这似乎会导致大量浪费的周期(更不用说代码必须更加小心地编写以显式管理锁)。为什么您说 asyncio 在这里不适合使用? - danben
当你说“我希望该线程仅在有数据可用时才唤醒”时,我认为这是暗示asyncio可能不是解决方案。你应该提供一些关于你的进程或线程的更多上下文信息,它们如何找到它们的输入和产生输出的位置/方式。Asyncio可能是最佳解决方案,但你所说的并没有清楚地证明它。 - Serge Ballesta
1个回答

1
我想提到aioprocessing库,因为我在其中一个项目中成功地使用了它。它提供了一个异步接口,包括IPC的multiprocessing原语,例如ProcessPipeLockQueue等等。它使用线程池来实现这一点:
    ...
    @staticmethod
    def coro_maker(func):
        def coro_func(self, *args, loop=None, **kwargs):
            return self.run_in_executor(
                getattr(self, func), *args, loop=loop, **kwargs
            )

        return coro_func

说实话,很多取决于问题的解决方式,以及同时执行的任务,因为异步方法中的密集IPC本身由于事件循环、线程池等开销而比同步方法不那么有效。有时最好将所有IPC操作变成同步的,并将它们放在一个单独的线程中。再次强调,这完全取决于问题和环境。下面是一个远非全面的基准测试,但可以大致了解正在解决的问题(缓冲区的密集交换)。
注意:我写了一篇关于Queue和SimpleQueue之间差异的文章在这里
Sync SimpleQueue:  1.4309470653533936
AioSimpleQueue:  12.32670259475708
AioQueue:  14.342737436294556
AioPipe:  11.747064590454102
subprocess pipe stream:  7.344956159591675
socket stream:  4.360717058181763

# main.py
import sys
import time
import asyncio
import aioprocessing as ap
import multiprocessing as mp
import proc

count = 5*10**4
data = b'*'*100


async def sync_simple_queue_func():
    out_ = mp.SimpleQueue()
    in_ = mp.SimpleQueue()
    p = ap.AioProcess(target=proc.start_sync_queue_func, args=(out_, in_))
    p.start()

    begin_ts = time.time()
    for i in range(count):
        out_.put(data)
        res = in_.get()
    print('Sync SimpleQueue: ', time.time() - begin_ts)
    out_.put(None)


async def simple_queue_func():
    out_ = ap.AioSimpleQueue()
    in_ = ap.AioSimpleQueue()
    p = ap.AioProcess(target=proc.start_queue_func, args=(out_, in_))
    p.start()

    begin_ts = time.time()
    for i in range(count):
        await out_.coro_put(data)
        res = await in_.coro_get()
    print('AioSimpleQueue: ', time.time() - begin_ts)
    await out_.coro_put(None)


async def queue_func():
    out_ = ap.AioQueue()
    in_ = ap.AioQueue()
    p = ap.AioProcess(target=proc.start_queue_func, args=(out_, in_))
    p.start()
    begin_ts = time.time()
    for i in range(count):
        await out_.coro_put(data)
        res = await in_.coro_get()
    print('AioQueue: ', time.time() - begin_ts)
    await out_.coro_put(None)


async def pipe_func():
    main_, child_ = ap.AioPipe()
    p = ap.AioProcess(target=proc.start_pipe_func, args=(child_,))
    p.start()
    begin_ts = time.time()
    for i in range(count):
        await main_.coro_send(data)
        res = await main_.coro_recv()
    print('AioPipe: ', time.time() - begin_ts)
    await main_.coro_send(None)
    await p.coro_join()


server = None
async def handle_child(reader, writer):
    begin_ts = time.time()
    for i in range(count):
        writer.write(data)
        res = await reader.read(len(data))
    print('socket stream: ', time.time() - begin_ts)
    writer.close()


async def socket_func():
    global server
    addr = ('127.0.0.1', 8888)
    server = await asyncio.start_server(handle_child, *addr)
    p = ap.AioProcess(target=proc.start_socket_func, args=(addr,))
    p.start()
    async with server:
        await server.serve_forever()

async def subprocess_func():
    prog = await asyncio.create_subprocess_shell(
        'python proc.py',
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE)

    begin_ts = time.time()
    for i in range(count):
        prog.stdin.write(data)
        res = await prog.stdout.read(len(data))
    print('subprocess pipe stream: ', time.time() - begin_ts)
    prog.stdin.close()


async def main():
    await sync_simple_queue_func()
    await simple_queue_func()
    await queue_func()
    await pipe_func()
    await subprocess_func()
    await socket_func()


asyncio.run(main())

# proc.py

import asyncio
import sys

import aioprocessing as ap


async def sync_queue_func(in_, out_):
    while True:
        n = in_.get()
        if n is None:
            return
        out_.put(n)


async def queue_func(in_, out_):
    while True:
        n = await in_.coro_get()
        if n is None:
            return
        await out_.coro_put(n)

async def pipe_func(child):
    while True:
        n = await child.coro_recv()
        if n is None:
            return
        await child.coro_send(n)

data = b'*' * 100

async def socket_func(addr):

    reader, writer = await asyncio.open_connection(*addr)
    while True:
        n = await reader.read(len(data))
        if not n:
            break
        writer.write(n)


def start_sync_queue_func(in_, out_):
    asyncio.run(sync_queue_func(in_, out_))

def start_queue_func(in_, out_):
    asyncio.run(queue_func(in_, out_))


def start_pipe_func(child):
    asyncio.run(pipe_func(child))


def start_socket_func(addr):
    asyncio.run(socket_func(addr))


async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    dummy = asyncio.Protocol()
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)  # sets read_transport
    w_transport, _ = await loop.connect_write_pipe(lambda: dummy, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
    return reader, writer


async def main():
    reader, writer = await connect_stdin_stdout()
    while True:
        res = await reader.read(len(data))
        if not res:
            break
        writer.write(res)


if __name__ == "__main__":
    asyncio.run(main())



网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接