The workers in ThreadPoolExecutor is not really daemon
Asked Answered
C

3

36

The thing I cannot figure out is that although ThreadPoolExecutor uses daemon workers, they will still run even if main thread exit.

I can provide a minimal example in python3.6.4:

import concurrent.futures
import time


def fn():
    while True:
        time.sleep(5)
        print("Hello")


thread_pool = concurrent.futures.ThreadPoolExecutor()
thread_pool.submit(fn)
while True:
    time.sleep(1)
    print("Wow")

Both main thread and the worker thread are infinite loops. So if I use KeyboardInterrupt to terminate main thread, I expect that the whole program will terminate too. But actually the worker thread is still running even though it is a daemon thread.

The source code of ThreadPoolExecutor confirms that worker threads are daemon thread:

t = threading.Thread(target=_worker,
                     args=(weakref.ref(self, weakref_cb),
                           self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)

Further, if I manually create a daemon thread, it works like a charm:

from threading import Thread
import time


def fn():
    while True:
        time.sleep(5)
        print("Hello")


thread = Thread(target=fn)
thread.daemon = True
thread.start()
while True:
    time.sleep(1)
    print("Wow")

So I really cannot figure out this strange behavior.

Contemptuous answered 24/4, 2018 at 1:54 Comment(2)
Reviving because I had test suites hangs (because of long running I/O such as http server sockets) in my CI/CD because of this. It pushed me to re implement the pool without the atexit handler: gist.github.com/BinarSkugga/edc52b9f3fad44f0a4de8739125e3d3fLachman
indeed the documentation doesn't say that the threads of a ThreadPoolExecutor will terminate along with the main thread, unless they are idle and the shutdown method is used with the right argument values.Floatation
C
45

Suddenly... I found why. According to much more source code of ThreadPoolExecutor:

# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
#   - The workers would still be running during interpreter shutdown,
#     meaning that they would fail in unpredictable ways.
#   - The workers could be killed while evaluating a work item, which could
#     be bad if the callable being evaluated has external side-effects e.g.
#     writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.

_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False

def _python_exit():
    global _shutdown
    _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
        q.put(None)
    for t, q in items:
        t.join()

atexit.register(_python_exit)

There is an exit handler which will join all unfinished worker...

Contemptuous answered 24/4, 2018 at 2:10 Comment(2)
Then, what is the benefit of daemon=True if it doesn't fulfil the main purpose?Obsolete
Indeed a ThreadPoolExecutor doesn't come with that guarantee, and even its shutdown method's cancel_futures argument does not say that the pool will be force shut when calling its shutdown.Floatation
R
5

Here's the way to avoid this problem. Bad design can be beaten by another bad design. People write daemon=True only if they really know that the worker won't damage any objects or files.

In my case, I created TreadPoolExecutor with a single worker and after a single submit I just deleted the newly created thread from the queue so the interpreter won't wait till this thread stops on its own. Notice that worker threads are created after submit, not after the initialization of TreadPoolExecutor.

import concurrent.futures.thread
from concurrent.futures import ThreadPoolExecutor

...

executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(lambda: self._exec_file(args))
del concurrent.futures.thread._threads_queues[list(executor._threads)[0]]

It works in Python 3.8 but may not work in 3.9+ since this code is accessing private variables.

See the working piece of code on github

Receptionist answered 4/7, 2020 at 15:51 Comment(1)
Thanks so much for posting the workaround! I'm using concurrent.futures.thread._threads_queues.clear() after every submit() now.Dilan
M
1

I've seen this behavior with long-running threads created under a ThreadPoolExecutor. In one test: It thoroughly blocks the Python process from exiting, while the atexit handler waits for a thread to return from time.sleep(double("inf")). This blocking thread could otherwise be discarded at exit, consistently so.

Searching for other approaches that might not encounter this blocking-in-atexit behavior, I was looking at green threads as one alternative. For instance, there's futurist with the eventlet backend enabled. Of course for concurrent load-balancing, there's also the process pools model - assuming a compatible data model for communicating with the process pool's worker objects.

Here is another possible approach, albeit not effectively different than above.

This will define a subclass of ThreadPoolExecutor such that will remove each thread of the pool from concurrent.futures.thread._threads_queues during executor.shutdown()

import concurrent.futures as cofutures
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    import weakref

class Executor(cofutures.ThreadPoolExecutor):
    def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
        rs = super().shutdown(wait, cancel_futures=cancel_futures)
        if hasattr(cofutures.thread, "_threads_queues"):
            # prevent blocking for thread.join during atexit handlers
            queues: weakref.WeakKeyDictionary = cofutures.thread._threads_queues
            for thr in self._threads:
                queues.pop(thr, None)
        return rs

In effect, this approach may not be much different than the approaches described above. Here, it applies the additional cleanup in the executor.shutdown() method?

Of course, this feature of the design of the ThreadPoolExecutor is denoted in the documentation

Note that the exit handler which does this is executed before any exit handlers added using atexit. This means exceptions in the main thread must be caught and handled in order to signal threads to exit gracefully. For this reason, it is recommended that ThreadPoolExecutor not be used for long-running tasks.

Perhaps the implementation can be applied for long-running tasks, however? When not blocking in atexit for the executor's worker threads to be rejoined, then those long-running tasks would not prevent the process from exiting. Design goals may vary?

Mitre answered 14/12, 2023 at 11:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.