PyQt:QThreadPool使用QRunnables需要时间退出

7

我有一个类,它创建QRunnables并在QThreadPool实例中启动它们。

我的线程工作正常,但是如果用户想退出应用程序,应用程序需要很长时间才能停止。显然是由于启动的请求花费了一定的时间。

以下是我如何使用QThreadPool、QRunnables的代码片段:

import sys
from PyQt5.Qt import QThreadPool, QApplication, QWidget, QVBoxLayout
from PyQt5.Qt import QTimer, QObject, QPushButton, QLabel
from PyQt5.Qt import QRunnable


class BackendQRunnable(QRunnable):
    """
        Class who create a QThread to trigger requests
    """

    def __init__(self, task):
        super(BackendQRunnable, self).__init__()
        self.task = task

    def run(self):
        """
        Run the QRunnable. Trigger actions depending on the selected task

        """

        # Here I make long requests
        if 'user' in self.task:
            self.query_user_data()
        elif 'host' in self.task:
            self.query_hosts_data()
        elif 'service' in self.task:
            self.query_services_data()
        elif 'alignakdaemon' in self.task:
            self.query_daemons_data()
        elif 'livesynthesis' in self.task:
            self.query_livesynthesis_data()
        elif 'history' in self.task:
            self.query_history_data()
        elif 'notifications' in self.task:
            self.query_notifications_data()
        else:
            pass

    @staticmethod
    def query_user_data():
        """
        Launch request for "user" endpoint

        """

        print('Query user data')

    @staticmethod
    def query_hosts_data():
        """
        Launch request for "host" endpoint

        """

        print('Query hosts')

    @staticmethod
    def query_services_data():
        """
        Launch request for "service" endpoint

        """

        print("Query services")

    @staticmethod
    def query_daemons_data():
        """
        Launch request for "alignakdaemon" endpoint

        """

        print('Query daemons')

    @staticmethod
    def query_livesynthesis_data():
        """
        Launch request for "livesynthesis" endpoint

        """

        print('query livesynthesis')

    @staticmethod
    def query_history_data():
        """
        Launch request for "history" endpoint but only for hosts in "data_manager"

        """

        print('Query history')

    @staticmethod
    def query_notifications_data():
        """
        Launch request for "history" endpoint but only for notifications of current user

        """

        print('Query notifications')


class ThreadManager(QObject):
    """
        Class who create BackendQRunnable to periodically request on a Backend
    """

    def __init__(self, parent=None):
        super(ThreadManager, self).__init__(parent)
        self.backend_thread = BackendQRunnable(self)
        self.pool = QThreadPool.globalInstance()
        self.tasks = self.get_tasks()

    def start(self):
        """
        Start ThreadManager

        """

        print("Start backend Manager...")

        # Make a first request
        self.create_tasks()

        # Then request periodically
        timer = QTimer(self)
        timer.setInterval(10000)
        timer.start()
        timer.timeout.connect(self.create_tasks)

    @staticmethod
    def get_tasks():
        """
        Return the tasks to run in BackendQRunnable

        :return: tasks to run
        :rtype: list
        """

        return [
            'notifications', 'livesynthesis', 'alignakdaemon', 'history', 'service', 'host', 'user',
        ]

    def create_tasks(self):
        """
        Create tasks to run

        """

        for cur_task in self.tasks:
            backend_thread = BackendQRunnable(cur_task)

            # Add task to QThreadPool
            self.pool.start(backend_thread)

    def exit_pool(self):
        """
        Exit all BackendQRunnables and delete QThreadPool

        """

        # When trying to quit, the application takes a long time to stop
        self.pool.globalInstance().waitForDone()
        self.pool.deleteLater()

        sys.exit(0)


if __name__ == '__main__':
    app = QApplication(sys.argv)

    thread_manager = ThreadManager()
    thread_manager.start()

    layout = QVBoxLayout()

    label = QLabel("Start")
    button = QPushButton("DANGER!")
    button.pressed.connect(thread_manager.exit_pool)

    layout.addWidget(label)
    layout.addWidget(button)

    w = QWidget()

    w.setLayout(layout)
    w.show()

    sys.exit(app.exec_())

在函数exit_pool中,我等待线程完成并删除QThreadPool实例...
有没有办法不等待每个线程直接停止所有东西?
编辑解决方案:
所以我用不同的方法处理了这个问题。我用QThread替换了我的QRunnable。我去掉了QThreadPool,并在列表中自己管理线程。我还添加了一个pyqtSignal,以便通过quit()函数停止QTimer并关闭正在运行的线程。
这样,我的所有线程都能顺利退出。
import sys
from PyQt5.Qt import QThread, QApplication, QWidget, QVBoxLayout
from PyQt5.Qt import QTimer, QObject, QPushButton, QLabel, pyqtSignal


class BackendQThread(QThread):
    """
        Class who create a QThread to trigger requests
    """

    quit_thread = pyqtSignal(name='close_thread')

    def __init__(self, task):
        super(BackendQThread, self).__init__()
        self.task = task

    def run(self):
        """
        Run the actions depending on the selected task

        """

        # Here I make long requests
        if 'user' in self.task:
            self.query_user_data()
        elif 'host' in self.task:
            self.query_hosts_data()
        elif 'service' in self.task:
            self.query_services_data()
        elif 'alignakdaemon' in self.task:
            self.query_daemons_data()
        elif 'livesynthesis' in self.task:
            self.query_livesynthesis_data()
        elif 'history' in self.task:
            self.query_history_data()
        elif 'notifications' in self.task:
            self.query_notifications_data()
        else:
            pass

    @staticmethod
    def query_user_data():
        """
        Launch request for "user" endpoint

        """

        print('Query user data')

    @staticmethod
    def query_hosts_data():
        """
        Launch request for "host" endpoint

        """

        print('Query hosts')

    @staticmethod
    def query_services_data():
        """
        Launch request for "service" endpoint

        """

        print("Query services")

    @staticmethod
    def query_daemons_data():
        """
        Launch request for "alignakdaemon" endpoint

        """

        print('Query daemons')

    @staticmethod
    def query_livesynthesis_data():
        """
        Launch request for "livesynthesis" endpoint

        """

        print('query livesynthesis')

    @staticmethod
    def query_history_data():
        """
        Launch request for "history" endpoint but only for hosts in "data_manager"

        """

        print('Query history')

    @staticmethod
    def query_notifications_data():
        """
        Launch request for "history" endpoint but only for notifications of current user

        """

        print('Query notifications')


class ThreadManager(QObject):
    """
        Class who create BackendQThread to periodically request on a Backend
    """

    def __init__(self, parent=None):
        super(ThreadManager, self).__init__(parent)
        self.tasks = self.get_tasks()
        self.timer = QTimer()
        self.threads = []

    def start(self):
        """
        Start ThreadManager

        """

        print("Start backend Manager...")

        # Make a first request
        self.create_tasks()

        # Then request periodically
        self.timer.setInterval(10000)
        self.timer.start()
        self.timer.timeout.connect(self.create_tasks)

    @staticmethod
    def get_tasks():
        """
        Return the available tasks to run

        :return: tasks to run
        :rtype: list
        """

        return [
            'notifications', 'livesynthesis', 'alignakdaemon', 'history', 'service', 'host', 'user',
        ]

    def create_tasks(self):
        """
        Create tasks to run

        """

        # Here I reset the list of threads
        self.threads = []
        for cur_task in self.tasks:
            backend_thread = BackendQThread(cur_task)

            # Add task to QThreadPool
            backend_thread.start()
            self.threads.append(backend_thread)

    def stop(self):
        """
        Stop the manager and close all QThreads

        """

        print("Stop tasks")
        self.timer.stop()
        for task in self.threads:
            task.quit_thread.emit()

        print("Tasks finished")


if __name__ == '__main__':
    app = QApplication(sys.argv)

    layout = QVBoxLayout()
    widget = QWidget()
    widget.setLayout(layout)

    thread_manager = ThreadManager()

    start_btn = QPushButton("Start")
    start_btn.clicked.connect(thread_manager.start)
    layout.addWidget(start_btn)

    stop_btn = QPushButton("Stop")
    stop_btn.clicked.connect(thread_manager.stop)
    layout.addWidget(stop_btn)

    widget.show()

    sys.exit(app.exec_())
1个回答

6
你无法在启动QRunnable后停止它。然而,在你的示例中,有一些简单的方法可以减少等待时间。
首先,你可以停止计时器,这样它就不会再添加任何任务了。其次,你可以清除线程池,这样它就会删除所有待处理的任务。第三,你可以尝试设置较小的最大线程数,以查看是否仍能获得可接受的性能。默认情况下,线程池将使用QThread.idealThreadCount()来设置最大线程数,通常表示系统上每个处理器核心一个线程。
最后一个选项是提供一种方法来中断在运行中的可运行代码。只要该代码运行循环可以定期检查标志以查看是否应继续,这一点就很容易实现。在你的示例中,由于所有任务都调用静态方法,因此看起来可以使用单个共享类属性作为标志。但是,如果代码不能以这种方式中断,则没有其他办法 - 你只能等待当前正在运行的任务完成。

好的@ekhumoro,我已经发布了我的解决方案来管理Qthread并且没有问题地停止它们。感谢指出QRunnable无法停止。 - Algorys
2
你绝对可以停止已启动的 QRunnable。正如此解决方案本身所指出的那样,只需“定期检查标志以查看是否应该继续”。这是在所有框架、语言和平台上优雅地停止线程问题的规范解决方案。所有其他解决方案都会导致数据丢失或损坏。此外,此解决方案不需要迭代——尽管迭代确实简化了事情。非迭代过程线程仍可通过定期检查标志并根据该标志立即返回来优雅地停止。 - Cecil Curry
2
@CecilCurry。说一个 QRunnable 对象可以被停止是毫无意义的 - 毕竟没有这样的公共 API 可以这样做。人们经常犯这种概念上的错误。他们寻找强制终止 QThreadQRunnable 的方法,而实际上他们真正需要做的是停止正在运行的代码。他们似乎没有意识到,该代码会以与 GUI 相同的方式阻塞与工作线程的交互。我认为 Qt 在某种程度上应对这种混淆负有责任:QThread 类应该被称为 QThreadManager - ekhumoro

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接