Python subprocess: callback when cmd exits
Asked Answered
S

10

68

I'm currently launching a programme using subprocess.Popen(cmd, shell=TRUE)

I'm fairly new to Python, but it 'feels' like there ought to be some api that lets me do something similar to:

subprocess.Popen(cmd, shell=TRUE,  postexec_fn=function_to_call_on_exit)

I am doing this so that function_to_call_on_exit can do something based on knowing that the cmd has exited (for example keeping count of the number of external processes currently running)

I assume that I could fairly trivially wrap subprocess in a class that combined threading with the Popen.wait() method, but as I've not done threading in Python yet and it seems like this might be common enough for an API to exist, I thought I'd try and find one first.

Thanks in advance :)

Sexdecillion answered 5/4, 2010 at 23:45 Comment(2)
Could someone please comment on or look into the asyncio subprocess module? I would expect that to be the correct tool for this, but I have never used it myself.Dayan
@BenMares though it's been a while since the question, perhaps one may add a comment about asyncio here? Though the API may differ in some nuances, in general asyncio can be used similar to examples below. It needs an asyncio event loop, generally with at least one top-level synchronous call for starting the event loop or returning some running loop. iPython and Spyder IDE might provide some examples of asyncio in GUI and networking applications, probably some popen-like calls too. The stdlib docs are complete about the asyncio API and its stdlib similarities. There's also TrioCommitteewoman
P
77

You're right - there is no nice API for this. You're also right on your second point - it's trivially easy to design a function that does this for you using threading.

import threading
import subprocess

def popen_and_call(on_exit, popen_args):
    """
    Runs the given args in a subprocess.Popen, and then calls the function
    on_exit when the subprocess completes.
    on_exit is a callable object, and popen_args is a list/tuple of args that 
    would give to subprocess.Popen.
    """
    def run_in_thread(on_exit, popen_args):
        proc = subprocess.Popen(*popen_args)
        proc.wait()
        on_exit()
        return
    thread = threading.Thread(target=run_in_thread, args=(on_exit, popen_args))
    thread.start()
    # returns immediately after the thread starts
    return thread

Even threading is pretty easy in Python, but note that if on_exit() is computationally expensive, you'll want to put this in a separate process instead using multiprocessing (so that the GIL doesn't slow your program down). It's actually very simple - you can basically just replace all calls to threading.Thread with multiprocessing.Process since they follow (almost) the same API.

Parable answered 6/4, 2010 at 0:27 Comment(3)
Thanks. This is what I was going to do. Unfortunately there's a problem that I can't replicate in a simple scenario but can in my actual programme :( If I use threading, not multiprocessing, proc.wait() doesn't return until I do something else with subprocess. If I use multiprocessing it works perfectly. However, using multiprocessing I have to fuss with shared memory. I've done that now, but I'm not sure I'm happy with the overhead. Any ideas why subprocess might behave differently in a thread to a process (changing which one I use and nothing else causes/solves the issue)?Sexdecillion
@Sexdecillion I'm sorry - I don't know why threading wouldn't work, or why it would work any differently than multithreading in this case. That seems pretty strange. Is the overhead of shared memory a performance bottleneck, or is it just ugly?Parable
@DanielG You might consider adopting the change from Phil's answer, so that the Popen interface maintained.Tlemcen
L
22

There is concurrent.futures module in Python 3.2 (available via pip install futures for older Python < 3.2):

pool = Pool(max_workers=1)
f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
f.add_done_callback(callback)

The callback will be called in the same process that called f.add_done_callback().

Full program

import logging
import subprocess
# to install run `pip install futures` on Python <3.2
from concurrent.futures import ThreadPoolExecutor as Pool

info = logging.getLogger(__name__).info

def callback(future):
    if future.exception() is not None:
        info("got exception: %s" % future.exception())
    else:
        info("process returned %d" % future.result())

def main():
    logging.basicConfig(
        level=logging.INFO,
        format=("%(relativeCreated)04d %(process)05d %(threadName)-10s "
                "%(levelname)-5s %(msg)s"))

    # wait for the process completion asynchronously
    info("begin waiting")
    pool = Pool(max_workers=1)
    f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
    f.add_done_callback(callback)
    pool.shutdown(wait=False) # no .submit() calls after that point
    info("continue waiting asynchronously")

if __name__=="__main__":
    main()

Output

$ python . && python3 .
0013 05382 MainThread INFO  begin waiting
0021 05382 MainThread INFO  continue waiting asynchronously
done
2025 05382 Thread-1   INFO  process returned 0
0007 05402 MainThread INFO  begin waiting
0014 05402 MainThread INFO  continue waiting asynchronously
done
2018 05402 Thread-1   INFO  process returned 0
Labrecque answered 6/3, 2011 at 9:43 Comment(2)
jeesus this is goldDebar
@skamsie: using a pool might be an overkill compared to a single thread unless you use the pool already e.g., to limit the number of concurrent subprocesses or to collect their output. Here's a verbose low-level approach that does not use threads and uses SIGCHLD signal instead.QProcess.finished() shows an example how it can be done efficiently while providing a simple portable API.Labrecque
R
16

I modified Daniel G's answer to simply pass the subprocess.Popen args and kwargs as themselves instead of as a separate tuple/list, since I wanted to use keyword arguments with subprocess.Popen.

In my case I had a method postExec() that I wanted to run after subprocess.Popen('exe', cwd=WORKING_DIR)

With the code below, it simply becomes popenAndCall(postExec, 'exe', cwd=WORKING_DIR)

import threading
import subprocess

def popenAndCall(onExit, *popenArgs, **popenKWArgs):
    """
    Runs a subprocess.Popen, and then calls the function onExit when the
    subprocess completes.

    Use it exactly the way you'd normally use subprocess.Popen, except include a
    callable to execute as the first argument. onExit is a callable object, and
    *popenArgs and **popenKWArgs are simply passed up to subprocess.Popen.
    """
    def runInThread(onExit, popenArgs, popenKWArgs):
        proc = subprocess.Popen(*popenArgs, **popenKWArgs)
        proc.wait()
        onExit()
        return

    thread = threading.Thread(target=runInThread,
                              args=(onExit, popenArgs, popenKWArgs))
    thread.start()

    return thread # returns immediately after the thread starts
Rudolf answered 30/5, 2012 at 20:39 Comment(0)
A
7

I had same problem, and solved it using multiprocessing.Pool. There are two hacky tricks involved:

  1. make size of pool 1
  2. pass iterable arguments within an iterable of length 1

result is one function executed with callback on completion

def sub(arg):
    print arg             #prints [1,2,3,4,5]
    return "hello"

def cb(arg):
    print arg             # prints "hello"

pool = multiprocessing.Pool(1)
rval = pool.map_async(sub,([[1,2,3,4,5]]),callback =cb)
(do stuff) 
pool.close()

In my case, I wanted invocation to be non-blocking as well. Works beautifully

Adamsite answered 8/1, 2011 at 23:3 Comment(1)
I needed pool.starmap(...) or .starmap_async(...).Consistent
Z
3

On POSIX systems, the parent process receives a SIGCHLD signal when a child process exits. To run a callback when a subprocess command exits, handle the SIGCHLD signal in the parent. Something like this:

import signal
import subprocess

def sigchld_handler(signum, frame):
    # This is run when the child exits.
    # Do something here ...
    pass

signal.signal(signal.SIGCHLD, sigchld_handler)

process = subprocess.Popen('mycmd', shell=TRUE)

Note that this will not work on Windows.

Zephaniah answered 13/11, 2021 at 5:21 Comment(0)
S
2

I was inspired by Daniel G. answer and implemented a very simple use case - in my work I often need to make repeated calls to the same (external) process with different arguments. I had hacked a way to determine when each specific call was done, but now I have a much cleaner way to issue callbacks.

I like this implementation because it is very simple, yet it allows me to issue asynchronous calls to multiple processors (notice I use multiprocessing instead of threading) and receive notification upon completion.

I tested the sample program and works great. Please edit at will and provide feedback.

import multiprocessing
import subprocess

class Process(object):
    """This class spawns a subprocess asynchronously and calls a
    `callback` upon completion; it is not meant to be instantiated
    directly (derived classes are called instead)"""
    def __call__(self, *args):
    # store the arguments for later retrieval
    self.args = args
    # define the target function to be called by
    # `multiprocessing.Process`
    def target():
        cmd = [self.command] + [str(arg) for arg in self.args]
        process = subprocess.Popen(cmd)
        # the `multiprocessing.Process` process will wait until
        # the call to the `subprocess.Popen` object is completed
        process.wait()
        # upon completion, call `callback`
        return self.callback()
    mp_process = multiprocessing.Process(target=target)
    # this call issues the call to `target`, but returns immediately
    mp_process.start()
    return mp_process

if __name__ == "__main__":

    def squeal(who):
    """this serves as the callback function; its argument is the
    instance of a subclass of Process making the call"""
    print "finished %s calling %s with arguments %s" % (
        who.__class__.__name__, who.command, who.args)

    class Sleeper(Process):
    """Sample implementation of an asynchronous process - define
    the command name (available in the system path) and a callback
    function (previously defined)"""
    command = "./sleeper"
    callback = squeal

    # create an instance to Sleeper - this is the Process object that
    # can be called repeatedly in an asynchronous manner
    sleeper_run = Sleeper()

    # spawn three sleeper runs with different arguments
    sleeper_run(5)
    sleeper_run(2)
    sleeper_run(1)

    # the user should see the following message immediately (even
    # though the Sleeper calls are not done yet)
    print "program continued"

Sample output:

program continued
finished Sleeper calling ./sleeper with arguments (1,)
finished Sleeper calling ./sleeper with arguments (2,)
finished Sleeper calling ./sleeper with arguments (5,)

Below is the source code of sleeper.c - my sample "time consuming" external process

#include<stdlib.h>
#include<unistd.h>

int main(int argc, char *argv[]){
  unsigned int t = atoi(argv[1]);
  sleep(t);
  return EXIT_SUCCESS;
}

compile as:

gcc -o sleeper sleeper.c
Spoilt answered 6/3, 2011 at 7:14 Comment(1)
Thanks! There are some problems with the indentation, but after that, this is just what I need to spin-off asynchronous processes in an XMLRPC server when a client issues a "run" command.Guan
J
2

There is also ProcesPoolExecutor since 3.2 in concurrent.futures (https://docs.python.org/3/library/concurrent.futures.html). The usage is as of the ThreadPoolExecutor mentioned above. With on exit callback being attached via executor.add_done_callback().

Jhansi answered 5/5, 2020 at 11:49 Comment(0)
D
2

Thanks guys, for pointing me into the right direction. I made a class from what I found here and added a stop-function to kill the process:

class popenplus():
  def __init__(self, onExit, *popenArgs, **popenKWArgs):
    thread = Thread(target=self.runInThread, args=(onExit, popenArgs, popenKWArgs))
    thread.start()

  def runInThread(self, onExit, popenArgs, popenKWArgs):
    self.proc = Popen(*popenArgs, **popenKWArgs)
    self.proc.wait()
    self.proc = None
    onExit()

  def stop(self):
    if self.proc:
      self.proc.kill()
Discommode answered 20/3, 2022 at 13:56 Comment(0)
M
2

Most of the current answers to this question suggest spinning up one thread per process just to wait for that callback. That strikes me as needlessly wasteful: A single thread should suffice for all callbacks from all processes created this way.

Another answer suggests using signals, but that creates a race condition where the signal handler might get called again before the previous call finished. On Linux, signalfd(2) could help with that but it's not supported by Python (although it's easy enough to add via ctypes).

The alternative used by asyncio in Python is to use signal.set_wakeup_fd. However, there is another solution based on the fact that the OS will close all open fds on process exit:

import os
import select
import subprocess
import threading
import weakref


def _close_and_join(fd, thread):
    os.close(fd)
    thread.join()


def _run_poll_callbacks(quitfd, poll, callbacks):
    poll.register(quitfd, select.POLLHUP)
    while True:
        for fd, event in poll.poll(1000.0):
            poll.unregister(fd)
            if fd == quitfd:
                return
            callback = callbacks.pop(fd)
            if callback is not None:
                callback()


class PollProcs:
    def __init__(self):
        self.poll = select.poll()
        self.callbacks = {}
        self.closed = False

        r, w = os.pipe()
        self.thread = threading.Thread(
            target=_run_poll_callbacks, args=(r, self.poll, self.callbacks)
        )
        self.thread.start()
        self.finalizer = weakref.finalize(self, _close_and_join, w, self.thread)

    def run(self, cmd, callback=None):
        if self.closed:
            return

        r, w = os.pipe()
        self.callbacks[r] = callback
        self.poll.register(r, select.POLLHUP)
        popen = subprocess.Popen(cmd, pass_fds=(w,))
        os.close(w)
        print("running", " ".join(cmd), "as", popen.pid)
        return popen


def main():
    procs = PollProcs()

    for i in range(3, 0, -1):
        procs.run(["sleep", str(i)], callback=lambda i=i: print(f"sleep {i} done?"))

    import time

    print("Waiting...")
    time.sleep(3)


if __name__ == "__main__":
    main()

If supporting MacOS isn't a requirement select.epoll is likely a better choice as it allows updating ongoing polling.

Mediatorial answered 19/9, 2023 at 22:29 Comment(0)
P
0

AFAIK there is no such API, at least not in subprocess module. You need to roll something on your own, possibly using threads.

Psychosocial answered 5/4, 2010 at 23:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.