我能否以某种方式与子进程共享异步队列?

20
我想使用队列将数据从父进程传递到通过 multiprocessing.Process 启动的子进程,但由于父进程使用 Python 的新 asyncio 库,所以队列方法需要是非阻塞的。据我所知,asyncio.Queue 用于任务间通信,不能用于进程间通信。此外,我知道 multiprocessing.Queueput_nowait()get_nowait() 方法,但我实际上需要协程,它仍然会阻塞当前任务(但不会阻塞整个进程)。是否有一些方法可以创建包装 put_nowait()/get_nowait() 的协程?另外,multiprocessing.Queue 内部使用的线程是否与在同一进程中运行的事件循环兼容?
如果不兼容,我还有哪些选择?我知道我可以通过利用异步套接字自己实现这样的队列,但我希望能够避免这种情况... 编辑: 我也考虑过使用管道而不是套接字,但似乎asynciomultiprocessing.Pipe()不兼容。更准确地说,Pipe()返回一个Connection对象的元组,这些对象不是文件对象。然而,asyncio.BaseEventLoop的方法add_reader()/add_writer()connect_read_pipe()/connect_write_pipe()都期望文件对象,因此无法异步读取/写入这样的Connection。相比之下,subprocess包使用的通常文件对象作为管道没有任何问题,并且可以轻松地与asyncio结合使用

更新: 我决定进一步探索管道方法:通过检索fileno()获取文件描述符,并将其传递给os.fdopen(),将由multiprocessing.Pipe()返回的Connection对象转换为类似文件的对象。最后,我将结果文件对象传递给事件循环的connect_read_pipe()/connect_write_pipe()。(如果有人对确切的代码感兴趣,可以参考邮件列表讨论中的相关问题。)然而,从流中read()时出现了OSError: [Errno 9] Bad file descriptor错误,我无法解决这个问题。另外,考虑到Windows缺乏支持, 我不会进一步追求这个方法。


子进程是如何启动的? - dano
子进程是通过 multiprocessing.Process 创建的。 - balu
3个回答

23
这是一个实现了 multiprocessing.Queue 对象,可用于 asyncio 的示例。它提供了整个 multiprocessing.Queue 接口,并增加了 coro_getcoro_put 方法,它们是可用于从队列中异步获取/放置的 asyncio.coroutine。实现细节与我的另一个答案的第二个示例基本相同:使用 ThreadPoolExecutor 使得获取/放置变为异步,使用 multiprocessing.managers.SyncManager.Queue 在进程间共享队列。唯一的额外技巧是实现 __getstate__ 来保持对象可用于 pickle(即序列化),尽管实例变量使用了不可 pickle 的 ThreadPoolExecutor
from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def AsyncProcessQueue(maxsize=0):
    m = Manager()
    q = m.Queue(maxsize=maxsize)
    return _ProcQueue(q)   

class _ProcQueue(object):
    def __init__(self, q):
        self._queue = q
        self._real_executor = None
        self._cancelled_join = False

    @property
    def _executor(self):
        if not self._real_executor:
            self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
        return self._real_executor

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_real_executor'] = None
        return self_dict

    def __getattr__(self, name):
        if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
                    'get', 'get_nowait', 'close']:
            return getattr(self._queue, name)
        else:
            raise AttributeError("'%s' object has no attribute '%s'" % 
                                    (self.__class__.__name__, name))

    @asyncio.coroutine
    def coro_put(self, item):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.put, item))

    @asyncio.coroutine    
    def coro_get(self):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.get))

    def cancel_join_thread(self):
        self._cancelled_join = True
        self._queue.cancel_join_thread()

    def join_thread(self):
        self._queue.join_thread()
        if self._real_executor and not self._cancelled_join:
            self._real_executor.shutdown()

@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    print("Passing %s to parent" % ok)
    yield from q.coro_put(ok)  # Non-blocking
    item = q.get() # Can be used with the normal blocking API, too
    print("got %s back from parent" % item)

def do_coro_proc_work(q, stuff, stuff2):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_coro_proc_work, q, 1, 2)
    item = yield from q.coro_get()
    print("Got %s from worker" % item)
    item = item + 25
    q.put(item)

if __name__  == "__main__":
    q = AsyncProcessQueue()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

输出:

Passing 3 to parent
Got 3 from worker
got 28 back from parent

正如您所看到的,您可以在父进程或子进程中同步或异步地使用AsyncProcessQueue。它不需要任何全局状态,并通过将大部分复杂性封装在一个类中,比我的原始回答更加优雅易用。

直接使用套接字可能能够获得更好的性能,但是以跨平台的方式使其工作似乎相当棘手。这种方法的优点是可用于多个工作进程,不需要对自己进行pickle / unpickle等操作。


接受。 :) 非常感谢您的时间和努力! - balu
@balu 没问题。非常有趣的问题!我仍然希望我们能看到一些更好的 asynciomultiprocessing 集成(类似于 asyncio.subprocess),或者至少是一个进程安全版本的 asyncio.Queue,在标准库中的某个时刻,但现在这似乎是一个不错的权宜之计。 - dano
2
说句实话,我最终采纳了这个想法并构建了一个完整的库,名为aioprocessing,为所有multiprocessing类提供类似的功能。 - dano

7

不幸的是,multiprocessing库并不特别适用于与asyncio一起使用。但是,根据您计划如何使用multiprocessing/multiprocessing.Queue,您可能完全可以使用concurrent.futures.ProcessPoolExecutor来替换它:

import asyncio
from concurrent.futures import ProcessPoolExecutor


def do_proc_work(stuff, stuff2):  # This runs in a separate process
    return stuff + stuff2

@asyncio.coroutine
def do_work():
    out = yield from loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                                          do_proc_work, 1, 2)
    print(out)

if __name__  == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work())

输出:

3

如果您绝对需要使用 multiprocessing.Queue,似乎在与 ProcessPoolExecutor 结合使用时它会表现得不错:

import asyncio
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor


def do_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    time.sleep(5) # Artificial delay to show that it's running asynchronously
    print("putting output in queue")
    q.put(ok)

@asyncio.coroutine
def async_get(q):
    """ Calls q.get() in a separate Thread. 

    q.get is an I/O call, so it should release the GIL.
    Ideally there would be a real non-blocking I/O-based 
    Queue.get call that could be used as a coroutine instead 
    of this, but I don't think one exists.

    """
    return (yield from loop.run_in_executor(ThreadPoolExecutor(max_workers=1), 
                                           q.get))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_proc_work, q, 1, 2)
    coro = async_get(q) # You could do yield from here; I'm not just to show that it's asynchronous
    print("Getting queue result asynchronously")
    print((yield from coro))

if __name__  == "__main__":
    m = multiprocessing.Manager()
    q = m.Queue() # The queue must be inherited by our worker, it can't be explicitly passed in
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

输出:

Getting queue result asynchronously
putting output in queue
3

我真的需要一个队列,因为两个进程都无限运行,我需要一直从一个进程传递数据到另一个进程。更具体地说,子进程通过队列转发到达SQLite数据库的查询。这背后的原因是,具有讽刺意味的是,通过将它们放入队列并在不同的进程中执行它们(因为SQLite调用是阻塞的),可以异步运行这些查询。无论如何,你的第二个建议看起来很有趣。虽然我觉得使用套接字更有效率,而且考虑到全局状态,更优雅。 - balu
顺便问一下:你能解释一下为什么无法显式地将队列传递给工作进程吗? - balu
@balu 尝试直接传递 multprocessing.Queue 会引发 RuntimeError: Queue objects should only be shared between processes through inheritance。在示例代码中让 Queue 被继承可以在 Linux 上工作,但我发现它实际上会在 Windows 上挂起。使用 multprocessing.manager.Queue(可以在进程之间显式传递)似乎在所有平台上都可以工作。我已经更新了我的答案以反映这一点。 - dano
谢谢您的回答。我现在有点困惑了,因为Python文档中的代码示例明确将multiprocessing.Queue传递给multiprocessing.Process - balu
1
@balu 是的,我不确定实现的哪个方面使其合法,但是你可以将一个 Queue 传递给 ProcessPool 的构造函数(使用 initializer/initargs 关键字参数)。然而,如果您尝试将 Queue 传递给 pool.apply 调用,则会引发 RuntimeError。看起来,如果在子进程实际启动之前将队列传递给它是允许的。在 pool.apply 的情况下,并将其传递给 ProcessPoolExecutor,子进程已经启动,因此无法传递 Queue - dano
@balu 我的假设是当您将队列传递给构造函数时,它避免了在将其传递给子进程时进行 pickling。不过我还没有查看代码来确认这一点。 - dano

1

这很好,但让我感到遗憾的是它要求使用自己的系统来启动子进程。 - user48956

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接