在GUI应用程序中使用多进程进行异步调用

5
我有一个GUI应用程序,需要从网络中获取和解析各种资源,除了GUI主循环。 我搜索了使用Python多进程模块的选项,因为这些获取操作不仅包含阻塞IO,还包括重量级解析,所以在这里使用多进程可能是更好的选择,而不是Python线程。 使用Twisted会很容易,但这次Twisted不是一个选项。 我在这里找到了一个简单的解决方案: Python subprocess: callback when cmd exits 问题是回调神奇地没有在MainThread内被调用。 所以我想出了以下解决方案:

delegate.py

import os
import multiprocessing as mp
import signal
from collections import namedtuple
import uuid
import logging


_CALLBACKS = {}
_QUEUE = mp.Queue()

info = logging.getLogger(__name__).info


class Call(namedtuple('Call', 'id finished result error')):

    def attach(self, func):
        if not self.finished:
            _CALLBACKS.setdefault(self.id, []).append(func)
        else:
            func(self.result or self.error)

        return self

    def callback(self):
        assert self.finished, 'Call not finished yet'
        r = self.result or self.error
        for func in _CALLBACKS.pop(self.id, []):
            func(r)

    def done(self, result=None, error=None):
        assert not self.finished, 'Call already finished'
        return self._replace(finished=(-1 if error else 1),
            result=result, error=error)

    @classmethod
    def create(clss):
        call = clss(uuid.uuid4().hex, 0, None, None) # uuid ???
        return call

def run(q, cb, func, args=None, kwargs=None):
    info('run: try running %s' % func)
    try:
        cb = cb.done(result=func(*(args or ()), **(kwargs or {})))
    except Exception, err:
        cb = cb.done(error=err)
    q.put(cb)
    os.kill(os.getppid(), signal.SIGUSR2) # SIGUSR2 ???
    info('run: leaving')

def on_callback(sig, frame):
    info('on_callback: checking queue ...')
    c = _QUEUE.get(True, 2)
    info('on_callback: got call - %s' % repr(c))
    c.callback()

signal.signal(signal.SIGUSR2, on_callback) # SIGUSR2 ???

def delegate(func, *args, **kwargs):
    info('delegate: %s %s' % (func, args,))
    cb = Call.create()
    mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()
    return cb


__all__ = ['delegate']

使用

from delegate import delegate

def sleeper(secs):
    assert secs >= 1, 'I need my Augenpflege'
    info('sleeper: will go to sleep for %s secs' % secs)
    sleep(secs)
    info('sleeper: woke up - returning result')
    return ['sleeper', 'result']

def on_sleeper_result(r):
    if isinstance(r, Exception):
        info('on_sleeper_result: got error: %s' % r)
    else:
        info('on_sleeper_result: got result: %s' % r)

from delegate import delegate
delegate(sleeper, 3).attach(on_sleeper_result)
delegate(sleeper, -3).attach(on_sleeper_result)
while 1:
    info('main: loop')
    sleep(1)

输出

0122 08432 MainThread INFO  delegate: <function sleeper at 0x163e320> (3,)
0123 08432 MainThread INFO  delegate: <function sleeper at 0x163e320> (-3,)
0124 08437 MainThread INFO  run: try running <function sleeper at 0x163e320>
0124 08437 MainThread INFO  sleeper: will go to sleep for 3 secs
0124 08432 MainThread INFO  main: loop
0125 08438 MainThread INFO  run: try running <function sleeper at 0x163e320>
0126 08438 MainThread INFO  run: leaving
0126 08432 MainThread INFO  on_callback: checking queue ...
0126 08432 MainThread INFO  on_callback: got call - Call(id='057649cba7d840e3825aa5ac73248f78', finished=-1, result=None, error=AssertionError('I need my Augenpflege',))
0127 08432 MainThread INFO  on_sleeper_result: got error: I need my Augenpflege
0127 08432 MainThread INFO  main: loop
1128 08432 MainThread INFO  main: loop
2129 08432 MainThread INFO  main: loop
3127 08437 MainThread INFO  sleeper: woke up - returning result
3128 08437 MainThread INFO  run: leaving
3128 08432 MainThread INFO  on_callback: checking queue ...
3129 08432 MainThread INFO  on_callback: got call - Call(id='041420c6c83a489aa5c7409c662d4917', finished=1, result=['sleeper', 'result'], error=None)
3129 08432 MainThread INFO  on_sleeper_result: got result: ['sleeper', 'result']
3129 08432 MainThread INFO  main: loop
4130 08432 MainThread INFO  main: loop
5132 08432 MainThread INFO  main: loop
...

到目前为止,这个方案运行得相当不错,但是我对多进程模块的经验有限,我有些不确定它是否会带来影响。我的问题是,在使用多进程时,我应该特别注意哪些事项...或者说,使用Python标准库进行异步回调机制的'更正确'的模式是什么?


Eli Bendersky写了一些关于这个的内容:链接 - JBernardo
我之前读过那篇博客文章,但正如我所提到的,Twisted对于这个项目不是一个选项 - 无论如何还是谢谢。 - hooblei
2个回答

2

您没有使用Python多进程和在主循环中忙等待的理由(低级API)。

您需要在QThread中运行(修改后的)事件循环,这可以直接调用Qt代码,或者使用QApplication.postEvent(或pyqtSignal)在主线程中执行它。

# this should be in the delegate module
while 1:
    c = _QUEUE.get(True) # no timeout
    c.callback() # or post event to main thread

您还可以查看此页面,以了解在Qt中线程间通信的讨论。


啊,就是这个!我把所有的委托代码都塞进了一个 QThread(或者更好的 QRunnable + QThreadPool),现在它像魔术一样正常工作了 - 谢谢。 - hooblei

1

你的代码能够工作,但是它并不够简单易懂。让我们逐步分析一下代码。

这将在主进程中创建一个Call实例:

def delegate(func, *args, **kwargs):
    cb = Call.create()

但是当您将cb传递给工作进程时,

mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()

Call 实例在 os.fork 期间被复制,从而创建第二个独立的实例。然后调用 cb.done,它调用 cb._replace,返回第三个 Call 实例:

def done(self, result=None, error=None):
    assert not self.finished, 'Call already finished'
    return self._replace(finished=(-1 if error else 1),
        result=result, error=error)

上述代码调用了私有的namedtuple方法_replace。它本可以是一个简单的Python语句,比如:
self.finished = -1 if error else 1

如果Callobject的子类而不是namedtuple的子类。虽然在__init__中,通过继承namedtuple可以节省一些输入,但后来变得非常笨拙,因为我们需要修改namedtuple的属性...。
同时,在主进程中由delegate(...)返回的原始Call实例调用attach
delegate(...).attach(on_sleeper_result)

这会修改全局的_CALLBACKS字典。工作进程无法知道_CALLBACKS的变化;在工作进程中,_CALLBACKS仍然是空字典。因此,您必须通过mp.Queue将工作进程的Call实例传回主进程,该实例使用cb.id引用_CALLBACKS中的正确函数。

所以一切都可以正常工作,但是每次调用delegate都会创建三个Call实例,并且代码可能会误导未经培训的人认为这三个Call实例都是同一个对象... 一切都能正常工作,但有点复杂。

您是否考虑过使用mp.Pool.apply_asynccallback参数呢?

import multiprocessing as mp
import logging
import time
import collections

_CALLBACKS=collections.defaultdict(list)

logger=mp.log_to_stderr(logging.DEBUG)

def attach(name,func):
    _CALLBACKS[name].append(func)

def delegate(func, *args, **kwargs):
    id=kwargs.pop('id')
    try:
        result=func(*args,**kwargs)
    except Exception, err:
        result=err
    return (id,result)

def sleeper(secs):
    assert secs >= 1, 'I need my Augenpflege'
    logger.info('sleeper: will go to sleep for %s secs' % secs)
    time.sleep(secs)
    logger.info('sleeper: woke up - returning result')
    return ['sleeper', 'result']

def callback(r):
    id,result=r
    for func in _CALLBACKS[id]:
        func(result)

def on_sleeper_result(r):
    if isinstance(r, Exception):
        logger.error('on_sleeper_result: got error: %s' % r)
    else:
        logger.info('on_sleeper_result: got result: %s' % r)

if __name__=='__main__':
    pool=mp.Pool()
    pool.apply_async(delegate,args=(sleeper, -3),kwds={'id':1},
                     callback=callback)
    attach(1,on_sleeper_result)
    pool.apply_async(delegate,args=(sleeper, 3),kwds={'id':2},
                     callback=callback)
    attach(2,on_sleeper_result)    
    while 1:
        logger.info('main: loop')
        time.sleep(1)

你好,我完全意识到在两个进程之间传递时存在多个Call实例和_replace调用,但出于可读性的考虑,我选择这样做,因为这些委托操作在应用程序中相对较少。 是的,apply_async是我的第一次尝试,但正如我所提到的 - 回调处理程序未在当前主线程中调用。您也可以在此处看到相同的行为: https://dev59.com/gnE85IYBdhLWcg3w2HXs#5209746 但感谢您详细的澄清。 - hooblei

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接