Is there any graceful way to interrupt a python concurrent future result() call?
Asked Answered
O

2

4

The only mechanism I can find for handling a keyboard interrupt is to poll. Without the while loop below, the signal processing never happens and the process hangs forever.

Is there any graceful mechanism for allowing a keyboard interrupt to function when given a concurrent future object?

Putting polling loops all over my code base seems to defeat the purpose of using futures at all.

More info:

  • Waiting on the future in the main thread in Windows blocks all signal handling, even if it's fully cancellable and even if it has not "started" yet. The word "exiting" doesn't even print. So 'cancellability' is only part (the easy part) of the issue.
  • In my real code, I obtain futures via executors (run coro threadsafe, in this case), this was just a simplified example
import concurrent.futures
import signal
import time
import sys


fut = concurrent.futures.Future()


def handler(signum, frame):
    print("exiting")
    fut.cancel()
    signal.signal(signal.SIGINT, orig)
    sys.exit()

orig = signal.signal(signal.SIGINT, handler)

# a time sleep is fully interruptible with a signal... but a future isnt
# time.sleep(100)

while True:
    try:
        fut.result(.03)
    except concurrent.futures.TimeoutError:
        pass
Obsequious answered 14/6, 2022 at 18:21 Comment(0)
O
1

OK, I wrote a solution to this based on digging in cypython source and some bug reports - but it's not pretty.

If you want to be able to interrupt a future, especially on Windows, the following seems to work:

@contextlib.contextmanager
def interrupt_futures(futures):  # pragma: no cover
    """Allows a list of futures to be interrupted.

    If an interrupt happens, they will all have their exceptions set to KeyboardInterrupt
    """

    # this has to be manually tested for now, because the tests interfere with the test runner

    def do_interr(*_):
        for ent in futures:
            try:
                ent.set_exception(KeyboardInterrupt)
            except:
                # if the future is already resolved or cancelled, ignore it
                pass
        return 1

    if sys.platform == "win32":
        from ctypes import wintypes  # pylint: disable=import-outside-toplevel

        kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)

        CTRL_C_EVENT = 0
        CTRL_BREAK_EVENT = 1

        HANDLER_ROUTINE = ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.DWORD)

        @HANDLER_ROUTINE
        def handler(ctrl):
            if ctrl == CTRL_C_EVENT:
                handled = do_interr()
            elif ctrl == CTRL_BREAK_EVENT:
                handled = do_interr()
            else:
                handled = False
            # If not handled, call the next handler.
            return handled

        if not kernel32.SetConsoleCtrlHandler(handler, True):
            raise ctypes.WinError(ctypes.get_last_error())

        was = signal.signal(signal.SIGINT, do_interr)

        yield

        signal.signal(signal.SIGINT, was)

        # restore default handler
        kernel32.SetConsoleCtrlHandler(handler, False)
    else:
        was = signal.signal(signal.SIGINT, do_interr)
        yield
        signal.signal(signal.SIGINT, was)

This allows you to do this:

with interrupt_futures([fut]):
    fut.result()

For the duration of that call, interrupt signals will be intercepted and will result in the future raising a KeyboardInterrupt to the caller requesting the result - instead of simply ignoring all interrupts.

Obsequious answered 11/7, 2022 at 18:17 Comment(0)
V
0

Defeating the purpose or not, it is how futures currently work in Python.

First of all, directly instantiating a Future() should only be done for testing purposes, normally you would obtain an instance by submitting work to an executor.

Furthermore, you cannot really cancel() a future cleanly that is executing in a thread; attempting to do so will make cancel() return False.

Indeed, in the following test I get could cancel: False printed out:

import concurrent.futures
import signal
import time
import sys


def task(delay):
    time.sleep(delay)
    return delay


def handler(signum, frame):
    print("exiting")
    print("could cancel:", fut.cancel())
    raise RuntimeError("if in doubt, use brute force")


signal.signal(signal.SIGINT, handler)


with concurrent.futures.ThreadPoolExecutor() as executor:
    fut = executor.submit(task, 240)

    try:
        print(fut.result())
    except Exception as ex:
        print(f"fut.result() ==> {type(ex).__name__}: {ex}")

If I also raise an exception in my signal handler, that exception is caught when trying to fetch the result, and I'm also seeing fut.result() ==> RuntimeError: if in doubt, use brute force printed out. However, that does not exit the executor loop immediately either, because the task is still running there.

Interestingly, clicking Ctrl-C a couple more times would eventually break even the cleanup loop, and the program would exit, but it's probably not what you're after. You might also be able to kill off futures more freely by employing a ProcessPoolExecutor, but .cancel() would still return False for running futures.

In that light, I think your approach to poll result() is not an unreasonable one. If possible, you could also move your program to asyncio where you would be able to cancel tasks at the yield points or I/O, or somehow make your task itself react to user input by exiting earlier, potentially based on information from a signal.

For instance, here I'm setting a global variable from my interrupt handler, which is then polled from my task:

import concurrent.futures
import signal
import time
import sys

interrupted = False


def task(delay):
    slept = 0

    for _ in range(int(delay)):
        time.sleep(1)
        slept += 1
        if interrupted:
            print("interrupted, wrapping up work prematurely")
            break

    return slept


def handler(signum, frame):
    global interrupted

    print("exiting")
    print("could cancel:", fut.cancel())

    interrupted = True


signal.signal(signal.SIGINT, handler)


with concurrent.futures.ThreadPoolExecutor() as executor:
    fut = executor.submit(task, 40)

    try:
        print(fut.result())
    except Exception as ex:
        print(f"fut.result() ==> {type(ex).__name__}: {ex}")

Now I'm able to interrupt my work in a more fine grained fashion:

^Cexiting
could cancel: False
interrupted, wrapping up work prematurely
5

In addition, you might also be able to split your work into many smaller tasks, then you could cancel any futures that aren't running yet, also improving responsiveness to SIGINT or other types of user input.

Vidal answered 14/6, 2022 at 20:45 Comment(1)
note: 1. control+c a zillion times doesn't do anything, at least on windows and 2. in real life i got the future from an executor, asyncio coro threadsafe, to be exact. 3. the problem is that waiting on the future blocks control-c from working... time sleep works fine.Obsequious

© 2022 - 2024 — McMap. All rights reserved.