PyQt: QThreadPool with QRunnables taking time to quit
Asked Answered
B

1

7

I've a class who create QRunnables and start them in a QThreadPool instance.

My threads are working well, but in case user want to quit application, the application takes a long time to stop. Certainly due to the fact that the requests launched take time.

Here is a snippet code of how I use QThreadPool, QRunnables:

import sys
from PyQt5.Qt import QThreadPool, QApplication, QWidget, QVBoxLayout
from PyQt5.Qt import QTimer, QObject, QPushButton, QLabel
from PyQt5.Qt import QRunnable


class BackendQRunnable(QRunnable):
    """
        Class who create a QThread to trigger requests
    """

    def __init__(self, task):
        super(BackendQRunnable, self).__init__()
        self.task = task

    def run(self):
        """
        Run the QRunnable. Trigger actions depending on the selected task

        """

        # Here I make long requests
        if 'user' in self.task:
            self.query_user_data()
        elif 'host' in self.task:
            self.query_hosts_data()
        elif 'service' in self.task:
            self.query_services_data()
        elif 'alignakdaemon' in self.task:
            self.query_daemons_data()
        elif 'livesynthesis' in self.task:
            self.query_livesynthesis_data()
        elif 'history' in self.task:
            self.query_history_data()
        elif 'notifications' in self.task:
            self.query_notifications_data()
        else:
            pass

    @staticmethod
    def query_user_data():
        """
        Launch request for "user" endpoint

        """

        print('Query user data')

    @staticmethod
    def query_hosts_data():
        """
        Launch request for "host" endpoint

        """

        print('Query hosts')

    @staticmethod
    def query_services_data():
        """
        Launch request for "service" endpoint

        """

        print("Query services")

    @staticmethod
    def query_daemons_data():
        """
        Launch request for "alignakdaemon" endpoint

        """

        print('Query daemons')

    @staticmethod
    def query_livesynthesis_data():
        """
        Launch request for "livesynthesis" endpoint

        """

        print('query livesynthesis')

    @staticmethod
    def query_history_data():
        """
        Launch request for "history" endpoint but only for hosts in "data_manager"

        """

        print('Query history')

    @staticmethod
    def query_notifications_data():
        """
        Launch request for "history" endpoint but only for notifications of current user

        """

        print('Query notifications')


class ThreadManager(QObject):
    """
        Class who create BackendQRunnable to periodically request on a Backend
    """

    def __init__(self, parent=None):
        super(ThreadManager, self).__init__(parent)
        self.backend_thread = BackendQRunnable(self)
        self.pool = QThreadPool.globalInstance()
        self.tasks = self.get_tasks()

    def start(self):
        """
        Start ThreadManager

        """

        print("Start backend Manager...")

        # Make a first request
        self.create_tasks()

        # Then request periodically
        timer = QTimer(self)
        timer.setInterval(10000)
        timer.start()
        timer.timeout.connect(self.create_tasks)

    @staticmethod
    def get_tasks():
        """
        Return the tasks to run in BackendQRunnable

        :return: tasks to run
        :rtype: list
        """

        return [
            'notifications', 'livesynthesis', 'alignakdaemon', 'history', 'service', 'host', 'user',
        ]

    def create_tasks(self):
        """
        Create tasks to run

        """

        for cur_task in self.tasks:
            backend_thread = BackendQRunnable(cur_task)

            # Add task to QThreadPool
            self.pool.start(backend_thread)

    def exit_pool(self):
        """
        Exit all BackendQRunnables and delete QThreadPool

        """

        # When trying to quit, the application takes a long time to stop
        self.pool.globalInstance().waitForDone()
        self.pool.deleteLater()

        sys.exit(0)


if __name__ == '__main__':
    app = QApplication(sys.argv)

    thread_manager = ThreadManager()
    thread_manager.start()

    layout = QVBoxLayout()

    label = QLabel("Start")
    button = QPushButton("DANGER!")
    button.pressed.connect(thread_manager.exit_pool)

    layout.addWidget(label)
    layout.addWidget(button)

    w = QWidget()

    w.setLayout(layout)
    w.show()

    sys.exit(app.exec_())

In function exit_pool, I wait until threads are finished and delete the QThreadPool instance...

Is there a way not to wait for each thread and stop everything directly ?

EDIT Solution:

So I have approached the subject differently. I replaced my QRunnable with a QThread. I removed QThreadPool and I manage threads myself in a list. I also added a pyqtSignal in order to stop the QTimer and close the running threads by quit() function.

Like that all my thread quit without problem.

import sys
from PyQt5.Qt import QThread, QApplication, QWidget, QVBoxLayout
from PyQt5.Qt import QTimer, QObject, QPushButton, QLabel, pyqtSignal


class BackendQThread(QThread):
    """
        Class who create a QThread to trigger requests
    """

    quit_thread = pyqtSignal(name='close_thread')

    def __init__(self, task):
        super(BackendQThread, self).__init__()
        self.task = task

    def run(self):
        """
        Run the actions depending on the selected task

        """

        # Here I make long requests
        if 'user' in self.task:
            self.query_user_data()
        elif 'host' in self.task:
            self.query_hosts_data()
        elif 'service' in self.task:
            self.query_services_data()
        elif 'alignakdaemon' in self.task:
            self.query_daemons_data()
        elif 'livesynthesis' in self.task:
            self.query_livesynthesis_data()
        elif 'history' in self.task:
            self.query_history_data()
        elif 'notifications' in self.task:
            self.query_notifications_data()
        else:
            pass

    @staticmethod
    def query_user_data():
        """
        Launch request for "user" endpoint

        """

        print('Query user data')

    @staticmethod
    def query_hosts_data():
        """
        Launch request for "host" endpoint

        """

        print('Query hosts')

    @staticmethod
    def query_services_data():
        """
        Launch request for "service" endpoint

        """

        print("Query services")

    @staticmethod
    def query_daemons_data():
        """
        Launch request for "alignakdaemon" endpoint

        """

        print('Query daemons')

    @staticmethod
    def query_livesynthesis_data():
        """
        Launch request for "livesynthesis" endpoint

        """

        print('query livesynthesis')

    @staticmethod
    def query_history_data():
        """
        Launch request for "history" endpoint but only for hosts in "data_manager"

        """

        print('Query history')

    @staticmethod
    def query_notifications_data():
        """
        Launch request for "history" endpoint but only for notifications of current user

        """

        print('Query notifications')


class ThreadManager(QObject):
    """
        Class who create BackendQThread to periodically request on a Backend
    """

    def __init__(self, parent=None):
        super(ThreadManager, self).__init__(parent)
        self.tasks = self.get_tasks()
        self.timer = QTimer()
        self.threads = []

    def start(self):
        """
        Start ThreadManager

        """

        print("Start backend Manager...")

        # Make a first request
        self.create_tasks()

        # Then request periodically
        self.timer.setInterval(10000)
        self.timer.start()
        self.timer.timeout.connect(self.create_tasks)

    @staticmethod
    def get_tasks():
        """
        Return the available tasks to run

        :return: tasks to run
        :rtype: list
        """

        return [
            'notifications', 'livesynthesis', 'alignakdaemon', 'history', 'service', 'host', 'user',
        ]

    def create_tasks(self):
        """
        Create tasks to run

        """

        # Here I reset the list of threads
        self.threads = []
        for cur_task in self.tasks:
            backend_thread = BackendQThread(cur_task)

            # Add task to QThreadPool
            backend_thread.start()
            self.threads.append(backend_thread)

    def stop(self):
        """
        Stop the manager and close all QThreads

        """

        print("Stop tasks")
        self.timer.stop()
        for task in self.threads:
            task.quit_thread.emit()

        print("Tasks finished")


if __name__ == '__main__':
    app = QApplication(sys.argv)

    layout = QVBoxLayout()
    widget = QWidget()
    widget.setLayout(layout)

    thread_manager = ThreadManager()

    start_btn = QPushButton("Start")
    start_btn.clicked.connect(thread_manager.start)
    layout.addWidget(start_btn)

    stop_btn = QPushButton("Stop")
    stop_btn.clicked.connect(thread_manager.stop)
    layout.addWidget(stop_btn)

    widget.show()

    sys.exit(app.exec_())
Birdwell answered 1/10, 2017 at 9:57 Comment(0)
C
7

You cannot stop a QRunnable once it's started. However, there are a few simple things you can do to reduce the wait time in your example.

Firstly, you can stop the timer, so that it doesn't add any more tasks. Secondly, you can clear the thread-pool so that it removes any pending tasks. Thirdly, you could try setting a smaller maximum thread count to see whether it still achieves acceptable performance. By default, the thread-pool will use QThread.idealThreadCount() to set the maximum number of threads - which usually means one for each processor core on the system.

A final option is to provide a way to interrupt the code that executes in your runnables. This will only really be possible if the code runs a loop which can periodically check a flag to see if it should continue. In your example, it looks like you could use a single shared class attribute for the flag, since all the tasks call static methods. But if the code is not interruptable in this way, there is nothing else you can do - you will just have to wait for the currently running tasks to finish.

Checkered answered 1/10, 2017 at 16:49 Comment(4)
Ok @Checkered I've post my solution to manage Qthread and stop them without problem. Thanks to pointing me that QRunnable can't be stopped.Birdwell
You absolutely can stop a started QRunnable. As this solution itself notes, simply "...periodically check a flag to see if it should continue." This is the canonical solution across all frameworks, languages, and platforms to the problem of gracefully halting a thread. All other solutions invite data loss or corruption. Moreover, this solution does not require iteration – though iteration does admittedly streamline matters. Non-iterative procedural threads may still gracefully halt by periodically checking a flag and returning immediately as indicated by said flag.Arise
@CecilCurry. It is nonsensical to say that a QRunnable object can be stopped - there is, after all, no such public API for doing so. People regularly make this conceptual mistake. They look for ways to forcibly terminate a QThread or QRunnable, when what they really need to do is stop the code that is being run. They don't seem to to realise that the code is blocking interaction with the worker thread in exactly the same way that it would do with the gui. I think Qt is partly to blame for some of this confusion: the QThread class should have been called QThreadManager.Checkered
This worked great for me, I used a combo of these suggestions. In my worker I create a threading.Event() and a slot to set the event, then in my run() method, an infinite loop set to break if the event is_set. This didn't work unitill I added killTimer and clear method calls to my mainwindow's closeEvent.Unifoliate

© 2022 - 2024 — McMap. All rights reserved.