Keyboard Interrupts with python's multiprocessing Pool
Asked Answered
S

11

166

How can I handle KeyboardInterrupt events with python's multiprocessing Pools? Here is a simple example:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

When running the code above, the KeyboardInterrupt gets raised when I press ^C, but the process simply hangs at that point and I have to kill it externally.

I want to be able to press ^C at any time and cause all of the processes to exit gracefully.

Strade answered 10/9, 2009 at 23:59 Comment(1)
I solved my problem using psutil, you can see the solution here: #32160554Unyoke
M
145

This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

The KeyboardInterrupt exception won't be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.

Note that this doesn't happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace

    results = pool.map(slowly_square, range(40))

with

    results = pool.map_async(slowly_square, range(40)).get(9999999)

or similar.

Montes answered 11/9, 2009 at 0:45 Comment(11)
Is this bug in the official python tracker anywhere? I'm having trouble finding it but I'm probably just not using the best search terms.Eulalia
This bug has been filed as [Issue 8296][1]. [1]: bugs.python.org/issue8296Shul
Here's a hack which fixes pool.imap() in the same manner, making Ctrl-C possible when iterating over imap. Catch the exception and call pool.terminate() and your program will exit. gist.github.com/626518Rockefeller
This doesn't quite fix things. Sometimes I get the expected behavior when I press Control+C, other times not. I'm not sure why, but it looks like maybe The KeyboardInterrupt is received by one of the processes at random, and I only get the correct behavior if the parent process is the one that catches it.Lashondalashonde
The trick with .get(999999) slows everything down somehow. See below for the link to bryceboe.com with a solution that works.Cordwain
I've tried to use this work around - and the keyboard interrupt does give me back control of the REPL. But the other spawned processes in the background are not properly terminated; they seem to randomly re-run somehow.Uncap
This doesn't work for me with Python 3.6.1 on Windows. I get tons of stack traces and other garbage when I do Ctrl-C, i.e. same as without such workaround. In fact none of the solutions I've tried from this thread seem to work...Vacuole
Jehejj, it's still not fixed in 2019. Like doing IO in paralel is a novel idea :/Salpiglossis
It's tracked here now: bugs.python.org/issue22393Thursday
@AkosLukacs did you ever come across a solution that did work? I agree so far with the solutions I've tried :-/Leukoderma
@Leukoderma it was a long time ago when I had to do python. But looking at the issues - now migrated to Github - it might not be fixed.Salpiglossis
R
67

From what I have recently found, the best solution is to set up the worker processes to ignore SIGINT altogether, and confine all the cleanup code to the parent process. This fixes the problem for both idle and busy worker processes, and requires no error handling code in your child processes.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Explanation and full example code can be found at http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ and http://github.com/jreese/multiprocessing-keyboardinterrupt respectively.

Relinquish answered 31/5, 2011 at 18:39 Comment(8)
Hi John. Your solution doesn't accomplish the same thing as my, yes unfortunately complicated, solution. It hides behind the time.sleep(10) in the main process. If you were to remove that sleep, or if you wait until the process attempts to join on the pool, which you have to do in order to guarantee the jobs are complete, then you still suffer from the same problem which is the main process doesn't receive the KeyboardInterrupt while it it waiting on a the poll join operation.Squires
In the case of where I used this code in production, the time.sleep() was part of a loop that would check the status of each child process, and then restart certain processes on a delay if necessary. Rather than join() that would wait on all processes to complete, it would check on them individually, ensuring that the master process stayed responsive.Relinquish
So it was more a busy wait (maybe with small sleeps between checks) that polled for process completion via another method rather than join? If that's the case, perhaps it would be better to include this code in your blog post, since you can then guarantee that all the workers have completed before attempting to join.Squires
This doesn't work. Only the children are sent the signal. The parent never receives it, so pool.terminate() never gets executed. Having the children ignore the signal accomplishes nothing. @Glenn's answer solves the problem.Stalnaker
My version of this is at gist.github.com/admackin/003dd646e5fadee8b8d6 ; it doesn't call .join() except on interrupt - it simply manually checks the result of .apply_async() using AsyncResult.ready() to see if it is ready, meaning we've cleanly finished.Investigation
@Stalnaker I was trying to confirm that this solution breaks down somewhere and I found this win.tue.nl/~aeb/linux/lk/lk-10.html#ss10.2. I believe that if the signal is sent to the process group then the signal will be sent to the leader as well as the children. If so then ignoring the signal in all but the leader would make for a pretty nice solution.Barrack
It works for me, you just have to make sure to put the signal ignoring code only in children's initialization...Boggess
Hi from 2020 ... this works nicely for imap_unordered as well.Ify
S
34

For some reasons, only exceptions inherited from the base Exception class are handled normally. As a workaround, you may re-raise your KeyboardInterrupt as an Exception instance:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normally you would get the following output:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

So if you hit ^C, you will get:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Shul answered 1/4, 2010 at 16:6 Comment(2)
It seems that this is not a complete solution. If a KeyboardInterrupt is arrived while multiprocessing is performing its own IPC data exchange then the try..catch will not be activated (obviously).Shul
You could replace raise KeyboardInterruptError with a return. You just have to make sure that the child process ends as soon as KeyboardInterrupt is received. The return value seems to be ignored, in main still the KeyboardInterrupt is received.Arthur
E
18

The voted answer does not tackle the core issue but a similar side effect.

Jesse Noller, the author of the multiprocessing library, explains how to correctly deal with CTRL+C when using multiprocessing.Pool in a old blog post.

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
Eonian answered 2/7, 2017 at 9:43 Comment(4)
I've found that ProcessPoolExecutor also has the same issue. The only fix I was able to find was to call os.setpgrp() from inside the futureCastera
Sure, the only difference is that ProcessPoolExecutor does not support initializer functions. On Unix, you could leverage the fork strategy by disabling the sighandler on the main process before creating the Pool and re-enabling it afterwards. In pebble, I silence SIGINT on the child processes by default. I am not aware of the reason they don't do the same with the Python Pools. At the end, the user could re-set the SIGINT handler in case he/she wants to hurt himself/herself.Eonian
This solution seems to prevent Ctrl-C from interrupting the main process as well.Exceedingly
I just tested on Python 3.5 and it works, what version of Python are you using? What OS?Eonian
R
17

Many of these answers are old and/or they do not seem to work with later versions of Python (I am running 3.8.5) on Windows if you are executing a method such as Pool.map, which blocks until all the submitted tasks have completed. The following is my solution.

  1. Issue a call to signal.signal(signal.SIGINT, signal.SIG_IGN) in the main process to ignore Ctrl-C altogether.
  2. The processing pool will be initialized with a pool initializer that will initialize each processor thusly: Global variable ctrl_c_entered will be set to False and a a call to signal.signal(signal.SIGINT, signal.SIG_IGN) will be issued to initially ignore Ctrl-C. The return value from this call will be saved; this is the original, default handler that when re-established allows handing of KyboardInterrupt exceptions.
  3. A decorator, handle_ctrl_c, can be used to decorate multiprocessing functions and methods that should exit immediately on Ctrl-C being entered. This decorator will test to see if the global ctrl_c_entered flag is set and if so ,not even bother to run the function/method and instead will return a KeyboardInterrupt exception instance. Otherwise a try/catch handler for a KeyboardInterrupt will be established and the decorated function/method will be invoked. If Ctrl-C is entered, global ctrl_c_entered will be set to True and a KeyboardInterrupt exception instance will be returned. In any event, before returning the decorator will re-establish the SIG_IGN handler.

In essence all submitted tasks will be allowed to start but will immediately terminate with a return value of a KeyBoardInterrupt exception once a Ctrl-C has been entered. The main process can test the return values for the presence of such a return value to detect whether a Ctrl-C was entered.

from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps

def handle_ctrl_c(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        global ctrl_c_entered
        if not ctrl_c_entered:
            signal.signal(signal.SIGINT, default_sigint_handler) # the default
            try:
                return func(*args, **kwargs)
            except KeyboardInterrupt:
                ctrl_c_entered = True
                return KeyboardInterrupt()
            finally:
                signal.signal(signal.SIGINT, pool_ctrl_c_handler)
        else:
            return KeyboardInterrupt()
    return wrapper

@handle_ctrl_c
def slowly_square(i):
    sleep(1)
    return i*i

def pool_ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

def init_pool():
    # set global variable for each process in the pool:
    global ctrl_c_entered
    global default_sigint_handler
    ctrl_c_entered = False
    default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)

def main():
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = Pool(initializer=init_pool)
    results = pool.map(slowly_square, range(10))
    if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
        print('Ctrl-C was entered.')
    print(results)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Prints:

Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]
Remission answered 7/8, 2021 at 19:14 Comment(3)
Confirmed this works as expected on Python 3.7.7 on Windows. Thanks for posting!Beading
As shown above, this returns KeyboardInterrupt() for all unprocessed members of the list, which will evaluate to True and print nothing. I found it helpful to return None instead.Attainable
@JonathanRys And what if the worker function being used (which is slowly_square in my example ) happens to return None.? Then it is more useful to be able to distinguish the worker function completing normally vs, not being run at all due to a previous keyboard interrupt as is done in function main. It doesn't really matter what is returned by handle_ctrl_c when there has been a keyboard interrupt as long as it can be distinguished from any possible value a worker function might return. Perhaps this should be an instance of a specially defined empty class, e.g. UnProcessed.Remission
B
10

Usually this simple structure works for Ctrl-C on Pool :

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

As was stated in few similar posts:

Capture keyboardinterrupt in Python without try-except

Brambly answered 31/10, 2012 at 13:44 Comment(1)
This would have to be done on each of the worker processes as well, and may still fail if the KeyboardInterrupt is raised while the multiprocessing library is initializing.Stumpf
E
5

It seems there are two issues that make exceptions while multiprocessing annoying. The first (noted by Glenn) is that you need to use map_async with a timeout instead of map in order to get an immediate response (i.e., don't finish processing the entire list). The second (noted by Andrey) is that multiprocessing doesn't catch exceptions that don't inherit from Exception (e.g., SystemExit). So here's my solution that deals with both of these:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    https://mcmap.net/q/144019/-keyboard-interrupts-with-python-39-s-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
Exceedingly answered 15/5, 2014 at 15:23 Comment(2)
I've not noticed any performance penalty, but in my case the function is fairly long-lived (hundreds of seconds).Exceedingly
This actually isn't the case anymore, at least from my eyes and experience. If you catch the keyboard exception in the individual child processes and catch it once more in the main process, then you can continue using map and all is good. @Linux Cli Aik provided a solution below that produces this behavior. Using map_async is not always desired if the main thread is depended on the results from the child processes.Impotent
I
5

I'm a newbie in Python. I was looking everywhere for answer and stumble upon this and a few other blogs and youtube videos. I have tried to copy paste the author's code above and reproduce it on my python 2.7.13 in windows 7 64- bit. It's close to what I wanna achieve.

I made my child processes to ignore the ControlC and make the parent process terminate. Looks like bypassing the child process does avoid this problem for me.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

The part starting at pool.terminate() never seems to execute.

Incontestable answered 15/5, 2017 at 9:2 Comment(1)
I just figured this out as well! I honestly think this is the best solution for a problem like this. The accepted solution forces map_async onto the user, which I don't particularly like. In many situations, like mine, the main thread needs to wait for the individual processes to finish. This is one of the reasons why map exists!Impotent
S
4

I found, for the time being, the best solution is to not use the multiprocessing.pool feature but rather roll your own pool functionality. I provided an example demonstrating the error with apply_async as well as an example showing how to avoid using the pool functionality altogether.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

Squires answered 26/8, 2010 at 17:16 Comment(2)
Works like a charm. It's a clean solution and not some kind of hack (/me thinks).btw, the trick with .get(99999) as proposed by others hurts performance badly.Cordwain
I've not noticed any performance penalty from using a timeout, though I have been using 9999 instead of 999999. The exception is when an exception that doesn't inherit from the Exception class is raised: then you have to wait until the timeout is hit. The solution to that is to catch all exceptions (see my solution).Exceedingly
L
4

You can try using the apply_async method of a Pool object, like this:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Output:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

An advantage of this method is that results processed before interruption will be returned in the results dictionary:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
Leopardi answered 14/8, 2018 at 5:30 Comment(4)
Glorious and complete exampleStrophanthus
Excellent example.Braga
Thank you. I'm trying to figure out how this generalizes to multiple arguments. In particular, why do you pass [value] rather than value in jobs[value] = pool.apply_async(input_function, [value])?Chantalchantalle
Would it be possible to have interrupted processes return an intermediate result instead?Nevins
R
-4

Strangely enough it looks like you have to handle the KeyboardInterrupt in the children as well. I would have expected this to work as written... try changing slowly_square to:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

That should work as you expected.

Rideout answered 11/9, 2009 at 0:26 Comment(2)
I tried this, and it doesn't actually terminate the entire set of jobs. It terminates the currently-running jobs, but the script still assigns the remaining jobs in the pool.map call as if everything is normal.Strade
this is OK, but yuo may lose track of errors that occur. returning the error with a stacktrace might work so the parent process can tell that an error occurred, but it still doesn't exit immediately when the error occurs.Earth

© 2022 - 2024 — McMap. All rights reserved.