How to terminate Python's `ProcessPoolExecutor` when parent process dies?
Asked Answered
A

3

8

Is there a way to make the processes in concurrent.futures.ProcessPoolExecutor terminate if the parent process terminates for any reason?

Some details: I'm using ProcessPoolExecutor in a job that processes a lot of data. Sometimes I need to terminate the parent process with a kill command, but when I do that the processes from ProcessPoolExecutor keep running and I have to manually kill them too. My primary work loop looks like this:

with concurrent.futures.ProcessPoolExecutor(n_workers) as executor:
    result_list = [executor.submit(_do_work, data) for data in data_list]
    for id, future in enumerate(
            concurrent.futures.as_completed(result_list)):
        print(f'{id}: {future.result()}')

Is there anything I can add here or do differently to make the child processes in executor terminate if the parent dies?

Adverse answered 28/2, 2022 at 19:44 Comment(2)
Which kill command?Wrongdoer
Its a command I execute from a ConEmu window, the key bindings describe it just as "terminate". I'm guessing it's a windows equivalent of kill -9Adverse
W
14

You can start a thread in each process to terminate when parent process dies:

def start_thread_to_terminate_when_parent_process_dies(ppid):
    pid = os.getpid()

    def f():
        while True:
            try:
                os.kill(ppid, 0)
            except OSError:
                os.kill(pid, signal.SIGTERM)
            time.sleep(1)

    thread = threading.Thread(target=f, daemon=True)
    thread.start()

Usage: pass initializer and initargs to ProcessPoolExecutor

with concurrent.futures.ProcessPoolExecutor(
        n_workers,
        initializer=start_thread_to_terminate_when_parent_process_dies,  # +
        initargs=(os.getpid(),),                                         # +
) as executor:

This works even if the parent process is SIGKILL/kill -9'ed.

Wrongdoer answered 6/3, 2022 at 11:16 Comment(0)
F
1

I would suggest two changes:

  1. Use a kill -15 command, which can be handled by the Python program as a SIGTERM signal rather than a kill -9 command.
  2. Use a multiprocessing pool created with the multiprocessing.pool.Pool class, whose terminate method works quite differently than that of the concurrent.futures.ProcessPoolExecutor class in that it will kill all processes in the pool so any tasks that have been submitted and running will be also immediately terminated.

Your equivalent program using the new pool and handling a SIGTERM interrupt would be:

from multiprocessing import Pool
import signal
import sys
import os
...

def handle_sigterm(*args):
    #print('Terminating...', file=sys.stderr, flush=True)
    pool.terminate()
    sys.exit(1)


# The process to be "killed", if necessary:
print(os.getpid(), file=sys.stderr)
pool = Pool(n_workers)
signal.signal(signal.SIGTERM, handle_sigterm)
results = pool.imap_unordered(_do_work, data_list)
for id, result in enumerate(results):
    print(f'{id}: {result}')
Fulcrum answered 3/3, 2022 at 20:50 Comment(3)
I like this idea, I've used multiprocessing.Pool in the past and it behaves the way I expect. But I like the concurrent.futures.as_completed functionality. Looking at the imap_unordered docs, they only state that the returned results are considered arbitrary. Does this mean that it's the order in which each result completes first like as_completed does?Adverse
If you use the default value for the chunksize argument, which is 1, then that "arbitrary" order will be the completion order. That is, results are returned as soon as they become available. When the chunksize is greater than 1, for example, 5, that would be a different story. Tasks are submitted in chunks of 5 and an idle pool process pulls from the task queue the next chunk of 5 tasks and completes those 5 tasks and puts on the result queue a chunk of 5 results. So the first 5 results that the main process would see consists of the first set of 5 that completes.Fulcrum
For very large iterables you would possibly want to specify a chunksize value to reduce the number of reads and writes to the various queues. When using concurrent.futures.ProcessPoolExecutor.submit, you are essentially implicitly using a chunksize of 1.Fulcrum
P
0

You could run the script in a kill-cgroup. When you need to kill the whole thing, you can do so by using the cgroup's kill switch. Even a cpu-cgroup will do the trick as you can access the group's pids.

Check this article on how to use cgexec.

Pentacle answered 10/3, 2022 at 14:47 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.