Python: concurrent.futures How to make it cancelable?
Asked Answered
Q

4

29

Python concurrent.futures and ProcessPoolExecutor provide a neat interface to schedule and monitor tasks. Futures even provide a .cancel() method:

cancel(): Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

Unfortunately in a simmilar question (concerning asyncio) the answer claims running tasks are uncancelable using this snipped of the documentation, but the docs dont say that, only if they are running AND uncancelable.

Submitting multiprocessing.Events to the processes is also not trivially possible (doing so via parameters as in multiprocess.Process returns a RuntimeError)

What am I trying to do? I would like to partition a search space and run a task for every partition. But it is enough to have ONE solution and the process is CPU intensive. So is there an actual comfortable way to accomplish this that does not offset the gains by using ProcessPool to begin with?

Example:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    for d in done:
        print(d.result())

    print("---")
    for d in not_done:
        # will return false for Cancel and Result for all futures
        print("Cancel: "+str(d.cancel()))
        print("Result: "+str(d.result()))
Quinquereme answered 14/3, 2017 at 10:6 Comment(3)
You can try to set the Event to global variable instead of passing it as a param, see #1676266Effusive
@Effusive thank you for the tipp. I'll probably try this as a workaround, as it doesnt feel well designed with calls to different modules.Quinquereme
Maybe this is all linked to the fact that there is no immediate cancel POSIX API either: #2085330Soapsuds
S
18

Unfortunately, running Futures cannot be cancelled. I believe the core reason is to ensure the same API over different implementations (it's not possible to interrupt running threads or coroutines).

The Pebble library was designed to overcome this and other limitations.

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  
Scalise answered 8/8, 2017 at 10:6 Comment(14)
I found it handy to know that the pebble futures inherit from the concurrent.futures futures. Therefore, many methods provided by concurrent.futures can also be applied to the pebble futures even if pebble does not implement these methods. This applies e.g. for the as_completed method of concurrent.futures. As a result, switching to pebble may be as easy as adding an import and changing the names of the ProcessPoolExecuter and pool.submit.Rascality
This may be obvious, but I just wanted to note that if you are using a ProcessPool you're no longer using multiple threads, but rather multiple processes. Many people will not care about the distinction but it's worth at least knowing what you're doing.Lahomalahore
OP uses ProcessPoolExecutor so it's fair to use ProcessPool. pebble also has ThreadPool if you want to replace ThreadPoolExecutor.Trump
@Trump If I read the docs right, threads cannot be interrupted with pebble either. Threadpool.stop(): "The ongoing jobs will be performed. [...]". pebble.readthedocs.io/en/latest/#pebble.ThreadPool.stopWidmer
@Widmer you don't use that, you use future.cancel(). While it's just a standard concurrent.futures.Future object, from my first hand experience, it works flawlessly (for stopping child threads immediately.) when you're using pebble.Trump
@Trump are you sure this works when threading as opposed to multiprocessing? According to the pebble docs for .cancel() it doesn’t kill a running thread.Widmer
A Python thread can only exit by its own, it cannot be terminated by another thread. Hence, only ProcessPool can stop a running execution. ThreadPool futures cancel function will act exactly as concurrent.futures.ThreadPoolExecutor.Scalise
@Widmer @Scalise there is absolutely a difference, it's very easy to try it yourself. I made a quick script: gist.github.com/fireattack/ab00953be7e31e8cfa72d18ca695c1ef try to time python a.py pebble and time python a.py concurrent and see the difference. I honestly don't know how they accomplish it, but it's real.Trump
@Trump This approach only makes it possible for Python to quit while threads are still running but it doesn't cancel threads. Add sleep(10) to the end of your script and print lines to your slow function to see for yourself.Widmer
@Joooeey: could you elaborate? Why does it work with pebble but not built-in concurrent.futures's ThreadPoolExecutor?Trump
What works with a pebble ThreadPool but not with a standard library ThreadPoolExecutor? In any case I don't think I know enough to explain why. I can only observe what does and does not work.Widmer
@Widmer Using pebble I'm able to terminate python process before children threads finishes, With standard lib I can't. This is often needed in certain scenarios to be able to force stop the work immediately.Trump
Exactly, when you terminate the Python process, you can terminate running threads when using pebble. But you can't terminate running threads without shutting down the Python process.Widmer
@Widmer the question is why you can't do so with concurrent.futures.Trump
V
15

I don't know why concurrent.futures.Future does not have a .kill() method, but you can accomplish what you want by shutting down the process pool with pool.shutdown(wait=False), and killing the remaining child processes by hand.

Create a function for killing child processes:

import signal, psutil

def kill_child_processes(parent_pid, sig=signal.SIGTERM):
    try:
        parent = psutil.Process(parent_pid)
    except psutil.NoSuchProcess:
        return
    children = parent.children(recursive=True)
    for process in children:
        process.send_signal(sig)

Run your code until you get the first result, then kill all remaining child processes:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
pool = ProcessPoolExecutor(max_workers=4)
for i in range(4):
    # run 4 tasks with a partition, but only *one* solution is needed
    partition = range(i*steps,(i+1)*steps)
    futures.append(pool.submit(m_run, partition))

done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)

# Shut down pool
pool.shutdown(wait=False)

# Kill remaining child processes
kill_child_processes(os.getpid())
Vulture answered 4/8, 2017 at 20:32 Comment(0)
H
1

I found your question interesting so here's my finding.

I found the behaviour of .cancel() method is as stated in python documentation. As for your running concurrent functions, unfortunately they could not be cancelled even after they were told to do so. If my finding is correct, then I reason that Python does require a more effective .cancel() method.

Run the code below to check my finding.

from concurrent.futures import ProcessPoolExecutor, as_completed
from time import time 

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 3351355150:
            return elem
            break #Added to terminate loop once found
    return False

start = time()
futures = []
# used to create the partitions
steps = 1000000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    ### New Code: Start ### 
    for f in as_completed(futures):
        print(f.result())
        if f.result():
            print('break')
            break

    for f in futures:
        print(f, 'running?',f.running())
        if f.running():
            f.cancel()
            print('Cancelled? ',f.cancelled())

    print('New Instruction Ended at = ', time()-start )
print('Total Compute Time = ', time()-start )

Update: It is possible to forcefully terminate the concurrent processes via bash, but the consequence is that the main python program will terminate too. If this isn't an issue with you, then try the below code.

You have to add the below codes between the last 2 print statements to see this for yourself. Note: This code works only if you aren't running any other python3 program.

import subprocess, os, signal 
result = subprocess.run(['ps', '-C', 'python3', '-o', 'pid='],
                        stdout=subprocess.PIPE).stdout.decode('utf-8').split()
print ('result =', result)
for i in result:
    print('PID = ', i)
    if i != result[0]:
        os.kill(int(i), signal.SIGKILL)
        try: 
           os.kill(int(i), 0)
           raise Exception("""wasn't able to kill the process 
                              HINT:use signal.SIGKILL or signal.SIGABORT""")
        except OSError as ex:
           continue
Hatchel answered 15/3, 2017 at 16:28 Comment(0)
M
0

For one of my software, I needed to kill all running processes of my executor when calling the .shutdown() method. As it was not possible with the standard ProcessPoolExecutor, I done it myself. I also fixed the wrong running state reported by future objects:

import sys
import functools
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures._base import RUNNING


def _callable_wrapper(is_running, fn, *args, **kwargs):
    is_running.value = True
    fn(*args, **kwargs)
    is_running.value = False


def _future_running_override(future, is_running):
    return future._state == RUNNING and is_running.value


class StoppableProcessPoolExecutor(ProcessPoolExecutor):
    """A concurrent.futures.ProcessPoolExecutor that kills running processes on
    shutdown.
    This also fix the wrong running state of futures. See
    https://bugs.python.org/issue37276
    """

    def __init__(self, *args, **kwargs):
        self._state_manager = multiprocessing.Manager()
        ProcessPoolExecutor.__init__(self, *args, **kwargs)

    def shutdown(self, *args, **kwargs):
        processes = self._processes

        # Python < 3.9: We should wait else we got an OSError:
        # https://bugs.python.org/issue36281
        if sys.version_info.major >= 3 and sys.version_info.minor < 9:
            kwargs["wait"] = True

        for pid, process in processes.items():
            process.kill()
        ProcessPoolExecutor.shutdown(self, *args, **kwargs)
        self._state_manager.shutdown()

    shutdown.__doc__ = ProcessPoolExecutor.shutdown.__doc__

    def submit(self, fn, *args, **kwargs):
        is_running = self._state_manager.Value(bool, False)
        future = ProcessPoolExecutor.submit(
            self,
            functools.partial(_callable_wrapper, is_running, fn),
            *args,
            **kwargs,
        )
        # Monkey patch future.running to return the real running state
        future.running = functools.partial(_future_running_override, future, is_running)
        return future

    submit.__doc__ = ProcessPoolExecutor.submit.__doc__

Original source: https://github.com/flozz/yoga-image-optimizer/blob/master/yoga_image_optimizer/stoppable_process_pool_executor.py

If it can help someone... :)

Monaural answered 28/2, 2023 at 13:51 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.