在PyQt中是否有类似于异步的模式?或者更干净的后台调用模式?

14
我想编写一个短小的程序(一个pyqt文件),它具有响应式功能(因此,依赖于Python / lxml / qt之外的库特别是我无法直接将其放入文件中的库,在这种用例中存在一些缺点,但我仍然可能会尝试它们)。 我正在尝试在工作线程上执行可能很长(可取消的)操作(实际上,后台操作周围有一个锁来防止同时进行多个操作(因为它使用的库一次只能使用一个调用),并且设置了超时,因此生成多个线程也可以)。
就我所知,使用Qt的“基本”方法如下。(请注意,代码未经过测试,因此可能不正确)
class MainWindow(QWidget):
    #self.worker moved to background thread
    def initUI(self):
        ...
        self.cmd_button.clicked.connect(self.send)
        ...

    @pyqtslot()
    def send(self):
        ...
        ...#get cmd from gui
        QtCore.QTimer.singleShot(0, lambda : self.worker(cmd))


    @pyqtslot(str)
    def end_send(self, result):
        ...
        ...# set some gui to display result
        ...



class WorkerObject(QObject):    
   def send_cmd(self, cmd):
       ... get result of cmd
       QtCore.QTimer.singleShot(0, lambda: self.main_window.end_send())

(我是否正确使用了QTimer(它在不同的线程上运行对吗)?)

我真的希望有些更简单、更抽象的东西,类似于C#中的async。 (注意,我没有使用过asyncio,所以可能会出现一些错误)

class MainWindow(QWidget):
    ...
    @asyncio.coroutine
    def send(self):
        ...
        ...#get cmd from gui
        result = yield from self.worker(cmd)
        #set gui textbox to result

class WorkerObject(QObject):
   @asyncio.coroutine
   def send_cmd(self, cmd):
       ... get result of cmd
       yield from loop.run_in_executor(None, self.model.send_command, cmd)

我听说Python 3有类似的功能,并且有一个回溯版本,但它能够与Qt正常工作吗?

如果有人知道另一种更合理的模式,那也将是有用的/可以接受的答案。


5
有一件事可以快速回答:只要您的“worker”位于主 GUI 线程中,QTimer 调用的函数不会在不同的线程上运行。 这是一篇关于 Qt 和线程的好文章: http://mayaposch.wordpress.com/2011/11/01/how-to-really-truly-use-qthreads-the-full-explanation/ - sebastian
所以你想要一个 Python 2.X 的解决方案? - Trilarion
是的,我打算使用Python 2.7。 - Roman A. Taycher
你考虑的是取消长时间命令的机制吗?这似乎相当复杂,所以你是否只想要终止它,还是只想要一些机制来通知工作人员已经发出取消长命令的请求,并且你会处理实际意义上的问题? - three_pineapples
最好能够通知我,这样我就可以锁定/解锁用户界面(一次只进行一个操作,不想让用户频繁点击)。 - Roman A. Taycher
1
你可能想要查看Quamash项目,它在PyQt/PySide事件循环的基础上实现了asyncio,尽管它还没有移植到Python 2。如果你愿意花时间,可能可以通过Trollius使其工作。 - aknuds1
3个回答

24

回答你的问题(“是否有一种方法可以在PyQt中使用类似于asyncio的模式?”)的简短答案是肯定的,但这相当复杂,对于小程序来说可能不值得。以下是一些原型代码,允许您使用像您描述的异步模式:

import types
import weakref
from functools import partial

from PyQt4 import QtGui 
from PyQt4 import QtCore
from PyQt4.QtCore import QThread, QTimer

## The following code is borrowed from here: 
# https://dev59.com/RmAf5IYBdhLWcg3wZSBI
# It provides a child->parent thread-communication mechanism.
class ref(object):
    """
    A weak method implementation
    """
    def __init__(self, method):
        try:
            if method.im_self is not None:
                # bound method
                self._obj = weakref.ref(method.im_self)
            else:
                # unbound method
                self._obj = None
            self._func = method.im_func
            self._class = method.im_class
        except AttributeError:
            # not a method
            self._obj = None
            self._func = method
            self._class = None

    def __call__(self):
        """
        Return a new bound-method like the original, or the
        original function if refers just to a function or unbound
        method.
        Returns None if the original object doesn't exist
        """
        if self.is_dead():
            return None
        if self._obj is not None:
            # we have an instance: return a bound method
            return types.MethodType(self._func, self._obj(), self._class)
        else:
            # we don't have an instance: return just the function
            return self._func

    def is_dead(self):
        """
        Returns True if the referenced callable was a bound method and
        the instance no longer exists. Otherwise, return False.
        """
        return self._obj is not None and self._obj() is None

    def __eq__(self, other):
        try:
            return type(self) is type(other) and self() == other()
        except:
            return False

    def __ne__(self, other):
        return not self == other

class proxy(ref):
    """
    Exactly like ref, but calling it will cause the referent method to
    be called with the same arguments. If the referent's object no longer lives,
    ReferenceError is raised.

    If quiet is True, then a ReferenceError is not raise and the callback 
    silently fails if it is no longer valid. 
    """

    def __init__(self, method, quiet=False):
        super(proxy, self).__init__(method)
        self._quiet = quiet

    def __call__(self, *args, **kwargs):
        func = ref.__call__(self)
        if func is None:
            if self._quiet:
                return
            else:
                raise ReferenceError('object is dead')
        else:
            return func(*args, **kwargs)

    def __eq__(self, other):
        try:
            func1 = ref.__call__(self)
            func2 = ref.__call__(other)
            return type(self) == type(other) and func1 == func2
        except:
            return False

class CallbackEvent(QtCore.QEvent):
    """
    A custom QEvent that contains a callback reference

    Also provides class methods for conveniently executing 
    arbitrary callback, to be dispatched to the event loop.
    """
    EVENT_TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())

    def __init__(self, func, *args, **kwargs):
        super(CallbackEvent, self).__init__(self.EVENT_TYPE)
        self.func = func
        self.args = args
        self.kwargs = kwargs

    def callback(self):
        """
        Convenience method to run the callable. 

        Equivalent to:  
            self.func(*self.args, **self.kwargs)
        """
        self.func(*self.args, **self.kwargs)

    @classmethod
    def post_to(cls, receiver, func, *args, **kwargs):
        """
        Post a callable to be delivered to a specific
        receiver as a CallbackEvent. 

        It is the responsibility of this receiver to 
        handle the event and choose to call the callback.
        """
        # We can create a weak proxy reference to the
        # callback so that if the object associated with
        # a bound method is deleted, it won't call a dead method
        if not isinstance(func, proxy):
            reference = proxy(func, quiet=True)
        else:
            reference = func
        event = cls(reference, *args, **kwargs)

        # post the event to the given receiver
        QtGui.QApplication.postEvent(receiver, event)

## End borrowed code

## Begin Coroutine-framework code

class AsyncTask(QtCore.QObject):
    """ Object used to manage asynchronous tasks.

    This object should wrap any function that you want
    to call asynchronously. It will launch the function
    in a new thread, and register a listener so that
    `on_finished` is called when the thread is complete.

    """
    def __init__(self, func, *args, **kwargs):
        super(AsyncTask, self).__init__()
        self.result = None  # Used for the result of the thread.
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.finished = False
        self.finished_cb_ran = False
        self.finished_callback = None
        self.objThread = RunThreadCallback(self, self.func, self.on_finished, 
                                           *self.args, **self.kwargs)
        self.objThread.start()

    def customEvent(self, event):
        event.callback()

    def on_finished(self, result):
        """ Called when the threaded operation is complete.

        Saves the result of the thread, and
        executes finished_callback with the result if one
        exists. Also closes/cleans up the thread.

        """
        self.finished = True
        self.result = result
        if self.finished_callback:
            self.finished_ran = True
            func = partial(self.finished_callback, result)
            QTimer.singleShot(0, func)
        self.objThread.quit()
        self.objThread.wait()

class RunThreadCallback(QtCore.QThread):
    """ Runs a function in a thread, and alerts the parent when done. 

    Uses a custom QEvent to alert the main thread of completion.

    """
    def __init__(self, parent, func, on_finish, *args, **kwargs):
        super(RunThreadCallback, self).__init__(parent)
        self.on_finished = on_finish
        self.func = func
        self.args = args
        self.kwargs = kwargs

    def run(self):
        try:
            result = self.func(*self.args, **self.kwargs)
        except Exception as e:
            print "e is %s" % e
            result = e
        finally:
            CallbackEvent.post_to(self.parent(), self.on_finished, result)


def coroutine(func):
    """ Coroutine decorator, meant for use with AsyncTask.

    This decorator must be used on any function that uses
    the `yield AsyncTask(...)` pattern. It shouldn't be used
    in any other case.

    The decorator will yield AsyncTask objects from the
    decorated generator function, and register itself to
    be called when the task is complete. It will also
    excplicitly call itself if the task is already
    complete when it yields it.

    """
    def wrapper(*args, **kwargs):
        def execute(gen, input=None):
            if isinstance(gen, types.GeneratorType):
                if not input:
                    obj = next(gen)
                else:
                    try:
                        obj = gen.send(input)
                    except StopIteration as e:
                        result = getattr(e, "value", None)
                        return result
                if isinstance(obj, AsyncTask):
                    # Tell the thread to call `execute` when its done
                    # using the current generator object.
                    func = partial(execute, gen)
                    obj.finished_callback = func
                    if obj.finished and not obj.finished_cb_ran:
                        obj.on_finished(obj.result)
                else:
                    raise Exception("Using yield is only supported with AsyncTasks.")
            else:
                print("result is %s" % result)
                return result
        result = func(*args, **kwargs)
        execute(result)
    return wrapper

## End coroutine-framework code

如果您将上述代码放入模块中(比如说qtasync.py),您可以将其导入脚本并像下面这样使用它来获得类似于asyncio的行为:
import sys
import time
from qtasync import AsyncTask, coroutine
from PyQt4 import QtGui
from PyQt4.QtCore import QThread

class MainWindow(QtGui.QMainWindow):
    def __init__(self):
        super(MainWindow, self).__init__()
        self.initUI()

    def initUI(self):
        self.cmd_button = QtGui.QPushButton("Push", self)
        self.cmd_button.clicked.connect(self.send_evt)
        self.statusBar()
        self.show()

    def worker(self, inval):
        print "in worker, received '%s'" % inval
        time.sleep(2)
        return "%s worked" % inval

    @coroutine
    def send_evt(self, arg):
        out = AsyncTask(self.worker, "test string")
        out2 = AsyncTask(self.worker, "another test string")
        QThread.sleep(3)
        print("kicked off async task, waiting for it to be done")
        val = yield out
        val2 = yield out2
        print ("out is %s" % val)
        print ("out2 is %s" % val2)
        out = yield AsyncTask(self.worker, "Some other string")
        print ("out is %s" % out)


if __name__ == "__main__":
    app = QtGui.QApplication(sys.argv)
    m = MainWindow()
    sys.exit(app.exec_())

输出结果(当按钮被按下时):

in worker, received 'test string'
in worker, received 'another test string'
kicked off async task, waiting for it to be done
out is test string worked
out2 is another test string worked
in worker, received 'Some other string'
out is Some other string worked

正如您所看到的,每当通过AsyncTask类调用worker时,它都会在一个线程中异步运行,但是它的返回值可以直接从send_evtyield出来,而无需使用回调函数。
该代码使用了 Python 生成器的协程支持功能 (generator_object.send) 和一个我在 ActiveState 上找到的提供子线程到主线程通信机制的配方,来实现一些非常基本的协程。这些协程相当受限制:您无法从中返回任何内容,也无法将协程调用链接在一起。虽然可能可以实现这两个功能,但除非您确实需要它们,否则可能不值得花费这样的努力。我还没有对此进行过太多的负面测试,因此工作人员和其他地方的异常可能无法正确处理。不过,它确实能够很好地通过AsyncTask类在单独的线程中调用方法,然后在准备好结果时从线程中yield出结果,而不会阻塞Qt事件循环。通常,这种情况需要使用回调函数来完成,这可能难以跟踪,并且通常比将所有代码放在单个函数中更难以阅读。
如果您可以接受我提到的限制,那么欢迎使用这种方法,但这只是一个概念验证;在考虑将其投入生产之前,您需要进行大量测试。
正如您所提到的,Python 3.3 和 3.4 通过引入yield fromasyncio使异步编程变得更加容易。我认为yield from实际上在这里非常有用,可以允许协程链接(即一个协程调用另一个协程并从中获取结果)。asyncio没有 PyQt4 事件循环集成,因此它的实用性相当有限。
另一个选择是完全放弃此协程部分,直接使用基于回调的线程间通信机制
import sys
import time

from qtasync import CallbackEvent  # No need for the coroutine stuff
from PyQt4 import QtGui
from PyQt4.QtCore import QThread

class MyThread(QThread):
    """ Runs a function in a thread, and alerts the parent when done. 

    Uses a custom QEvent to alert the main thread of completion.

    """
    def __init__(self, parent, func, on_finish, *args, **kwargs):
        super(MyThread, self).__init__(parent)
        self.on_finished = on_finish
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.start()

    def run(self):
        try:
            result = self.func(*self.args, **self.kwargs)
        except Exception as e:
            print "e is %s" % e
            result = e
        finally:
            CallbackEvent.post_to(self.parent(), self.on_finished, result)


class MainWindow(QtGui.QMainWindow):
    def __init__(self):
        super(MainWindow, self).__init__()
        self.initUI()

    def initUI(self):
        self.cmd_button = QtGui.QPushButton("Push", self)
        self.cmd_button.clicked.connect(self.send)
        self.statusBar()
        self.show()

    def customEvent(self, event):
        event.callback()

    def worker(self, inval):
        print("in worker, received '%s'" % inval)
        time.sleep(2)
        return "%s worked" % inval

    def end_send(self, cmd):
        print("send returned '%s'" % cmd)

    def send(self, arg):
        t = MyThread(self, self.worker, self.end_send, "some val")
        print("Kicked off thread")


if __name__ == "__main__":
    app = QtGui.QApplication(sys.argv)
    m = MainWindow()
    sys.exit(app.exec_())

输出:

Kicked off thread
in worker, received 'some val'
send returned 'some val worked'

如果你处理的是一个长的回调链,这可能会变得有些笨重,但它并不依赖于更加未经验证的coroutine代码。


Dano,你能否批评一下我的答案? - Roman A. Taycher
这可能是一个有点晚的评论,但是:当将partial(execute, gen)直接与在AsyncTask中实现的一个finished信号连接(例如,仅通过从objThread转发信号)时,您是否看到任何可能的问题?我真的不明白为什么需要所有手动的回调实现... - sebastian
协程代码似乎存在至少一个问题。协程包装器代码引用了 if obj.finished and not obj.finished_cb_ran:,但是 AsyncTask 对象中标记完成的变量名为 self.finished_ran - Isaiah
这是适用于使用多线程的应用程序的良好解决方案。异步编程的一大好处在于保持单线程使UI更新变得更加直观。如果使用Quamash并将asyncio直接与Qt事件循环集成,可能通过发送异步信号来实现一个版本的答案,这将是解决此问题的非常好的解决方案,因为所有操作都发生在主线程上。 - Daniel Farrell

3

如果你想要一种非常简单(代码行数少)的方法来完成这个任务,你可以创建一个 QThread 并使用 pyqtSignal 来通知父对象线程何时执行完毕。这里有两个按钮,一个用来控制可以取消的后台线程。第一次按下按钮启动线程,第二次按下按钮则取消后台线程。另一个按钮在后台线程运行时会自动被禁用,并在后台线程完成后重新启用。

from PyQt4 import QtGui
from PyQt4 import QtCore

class MainWindow(QtGui.QMainWindow):
    def __init__(self):
        super(MainWindow, self).__init__()
        self.initUI()
        self.task = None

    def initUI(self):
        self.cmd_button = QtGui.QPushButton("Push/Cancel", self)
        self.cmd_button2 = QtGui.QPushButton("Push", self)
        self.cmd_button.clicked.connect(self.send_cancellable_evt)
        self.cmd_button2.clicked.connect(self.send_evt)
        self.statusBar()
        self.layout = QtGui.QGridLayout()
        self.layout.addWidget(self.cmd_button, 0, 0)
        self.layout.addWidget(self.cmd_button2, 0, 1)
        widget = QtGui.QWidget()
        widget.setLayout(self.layout)
        self.setCentralWidget(widget)
        self.show()

    def send_evt(self, arg):
        self.t1 = RunThread(self.worker, self.on_send_finished, "test")
        self.t2 = RunThread(self.worker, self.on_send_finished, 55)
        print("kicked off async tasks, waiting for it to be done")

    def worker(self, inval):
        print "in worker, received '%s'" % inval
        time.sleep(2)
        return inval

    def send_cancellable_evt(self, arg):
        if not self.task:
            self.task = RunCancellableThread(None, self.on_csend_finished, "test")
            print("kicked off async task, waiting for it to be done")
        else:
            self.task.cancel()
            print("Cancelled async task.")

    def on_csend_finished(self, result):
        self.task = None  # Allow the worker to be restarted.
        print "got %s" % result

    def on_send_finished(self, result):
        print "got %s. Type is %s" % (result, type(result))


class RunThread(QtCore.QThread):
    """ Runs a function in a thread, and alerts the parent when done. 

    Uses a pyqtSignal to alert the main thread of completion.

    """
    finished = QtCore.pyqtSignal(["QString"], [int])

    def __init__(self, func, on_finish, *args, **kwargs):
        super(RunThread, self).__init__()
        self.args = args
        self.kwargs = kwargs
        self.func = func
        self.finished.connect(on_finish)
        self.finished[int].connect(on_finish)
        self.start()

    def run(self):
        try:
            result = self.func(*self.args, **self.kwargs)
        except Exception as e:
            print "e is %s" % e
            result = e
        finally:
            if isinstance(result, int):
                self.finished[int].emit(result)
            else:
                self.finished.emit(str(result)) # Force it to be a string by default.

class RunCancellableThread(RunThread):
    def __init__(self, *args, **kwargs):
        self.cancelled = False
        super(RunCancellableThread, self).__init__(*args, **kwargs)

    def cancel(self):
        self.cancelled = True  # Use this if you just want to signal your run() function.
        # Use this to ungracefully stop the thread. This isn't recommended,
        # especially if you're doing any kind of work in the thread that could
        # leave things in an inconsistent or corrupted state if suddenly
        # terminated
        #self.terminate() 

    def run(self):
        try:
            start = cur_time = time.time()
            while cur_time - start < 10:
                if self.cancelled:
                    print("cancelled")
                    result = "cancelled"
                    break
                print "doing work in worker..."
                time.sleep(1)
                cur_time = time.time()
        except Exception as e:
            print "e is %s" % e
            result = e
        finally:
            if isinstance(result, int):
                self.finished[int].emit(result)
            else:
                self.finished.emit(str(result)) # Force it to be a string by default.


if __name__ == "__main__":
    app = QtGui.QApplication(sys.argv)
    m = MainWindow()
    sys.exit(app.exec_())

输出(从“Push”按钮推送):

in worker, received 'test'kicked off async tasks, waiting for it to be done

 in worker, received '55'
got 55. Type is <type 'int'>
got test. Type is <class 'PyQt4.QtCore.QString'>
in worker, received 'test'
 in worker, received '55'

输出(从按下“推送/取消”按钮):

kicked off async task, waiting for it to be done
doing work in worker...
doing work in worker...
doing work in worker...
doing work in worker...
doing work in worker...
doing work in worker...
<I pushed the button again here>
Cancelled async task.
cancelled
got cancelled

这里有一些令人烦恼的限制:
  1. finished 信号不易处理任意类型。您必须显式声明和连接每个要返回的类型的处理程序,然后确保在获得结果时向正确的处理程序发出 emit。这称为信号重载。一些 Python 类型具有相同的 C++ 签名使用起来不正确,例如 pyqtSignal([dict], [list])。也许最好创建几个不同的 QThread 子类来处理可以从线程中运行的不同类型。
  2. 您必须保存对创建的 RunThread 对象的引用,否则它会在超出范围时立即被销毁,导致工作线程在完成之前终止。这有点浪费,因为在完成后仍保留对已完成的 QThread 对象的引用(除非您通过 on_finish 处理程序或其他机制清理它)。

我选择将此内容与我的其他答案分开,因为它不依赖于大量自定义类/函数来工作,并且具有完全独立的一组缺点。 - dano

0

如果要运行一个长时间的处理事件,创建依赖于QThreads的事件驱动工作对象就太复杂了。有两种方法可以调用单个处理方法:

第一种方法是使用QtConcurrent()。如果只是运行冗长的函数,这将是一个不错的选择。不确定在pyqt中是否可用。

第二种方法是子类化QThread,并在子类的run()方法中实现处理代码。然后只需调用QThreadSubclass.start()。这应该在PyQt中可用,也可能是最好的方法。复杂性被缩小到一个简单的子类。与线程通信很容易实现,就像与任何其他类通信一样。

当使用分配给QThread的对象时(这可能不是最好的方法),而不是使用QTimer,您应该使用Qt.QueuedConnection发出信号。使用QueuedConnection将确保槽在对象所在的线程中运行。


QTConcurrent 在 PyQt 中似乎不可用。 - Roman A. Taycher
是的,但如果可以避免,我不想添加额外的信号。 - Roman A. Taycher
子类化 QThread 不会导致实现新的信号。start可以从创建线程中调用。运行本身将在不同的线程中进行。使用一个布尔值来检查运行结束或者响应 QThreads 的 finished() 信号即可完成其余操作。 - Sebastian Lange

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接