Mimicing glib.spawn_async with Popen…
Asked Answered
C

1

14

The function glib.spawn_async allows you to hook three callbacks which are called on event on stdout, stderr, and on process completion.

How can I mimic the same functionality with subprocess with either threads or asyncio?

I am more interested in the functionality rather than threading/asynio but an answer that contains both will earn a bounty.

Here is a toy program that shows what I want to do:

import glib
import logging
import os
import gtk


class MySpawn(object):
    def __init__(self):
        self._logger = logging.getLogger(self.__class__.__name__)

    def execute(self, cmd, on_done, on_stdout, on_stderr):
        self.pid, self.idin, self.idout, self.iderr = \
            glib.spawn_async(cmd,
                             flags=glib.SPAWN_DO_NOT_REAP_CHILD,
                             standard_output=True,
                             standard_error=True)
        fout = os.fdopen(self.idout, "r")
        ferr = os.fdopen(self.iderr, "r")
        glib.child_watch_add(self.pid, on_done)
        glib.io_add_watch(fout, glib.IO_IN, on_stdout)
        glib.io_add_watch(ferr, glib.IO_IN, on_stderr)
        return self.pid


if __name__ == '__main__':
    logging.basicConfig(format='%(thread)d %(levelname)s:  %(message)s',
                        level=logging.DEBUG)
    cmd = '/usr/bin/git ls-remote https://github.com/DiffSK/configobj'.split()

    def on_done(pid, retval, *args):
        logging.info("That's all folks!…")

    def on_stdout(fobj, cond):
        """This blocks which is fine for this toy example…"""
        for line in fobj.readlines():
            logging.info(line.strip())
        return True

    def on_stderr(fobj, cond):
        """This blocks which is fine for this toy example…"""
        for line in fobj.readlines():
            logging.error(line.strip())
        return True

    runner = MySpawn()
    runner.execute(cmd, on_done, on_stdout, on_stderr)
    try:
        gtk.main()
    except KeyboardInterrupt:
        print('')

I should add that since readlines() is blocking, the above will buffer all the output and send it at once. If this is not what one wants, then you have to use readline() and make sure that on end of command you finish reading all the lines you did not read before.

Clothesbasket answered 7/9, 2016 at 7:48 Comment(0)
M
5

asyncio has subprocess_exec, there is no need to use the subprocess module at all:

import asyncio

class Handler(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        # fd == 1 for stdout, and 2 for stderr
        print("Data from /bin/ls on fd %d: %s" % (fd, data.decode()))

    def pipe_connection_lost(self, fd, exc):
        print("Connection lost to /bin/ls")

    def process_exited(self):
        print("/bin/ls is finished.")

loop = asyncio.get_event_loop()
coro = loop.subprocess_exec(Handler, "/bin/ls", "/")

loop.run_until_complete(coro)
loop.close()

With subprocess and threading, it's simple as well. You can just spawn a thread per pipe, and one to wait() for the process:

import subprocess
import threading

class PopenWrapper(object):
    def __init__(self, args):
       self.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL)

       self.stdout_reader_thread = threading.Thread(target=self._reader, args=(self.process.stdout,))
       self.stderr_reader_thread = threading.Thread(target=self._reader, args=(self.process.stderr,))
       self.exit_watcher = threading.Thread(target=self._exit_watcher)

       self.stdout_reader_thread.start()
       self.stderr_reader_thread.start()
       self.exit_watcher.start()

    def _reader(self, fileobj):
        for line in fileobj:
            self.on_data(fileobj, line)

    def _exit_watcher(self):
        self.process.wait()
        self.stdout_reader_thread.join()
        self.stderr_reader_thread.join()
        self.on_exit()

    def on_data(self, fd, data):
        return NotImplementedError

    def on_exit(self):
        return NotImplementedError

    def join(self):
        self.process.wait()

class LsWrapper(PopenWrapper):
    def on_data(self, fd, data):
        print("Received on fd %r: %s" % (fd, data))

    def on_exit(self):
        print("Process exited.")


LsWrapper(["/bin/ls", "/"]).join()

However, mind that glib does not use threads to asynchroneously execute your callbacks. It uses an event loop, just as asyncio does. The idea is that at the core of your program is a loop that waits until something happens, and then synchronously executes an associated callback. In your case, that's "data becomes available for reading on one of the pipes", and "the subprocess has exited". In general, its also stuff like "the X11-server reported mouse movement", "there's incoming network traffic", etc. You can emulate glib's behaviour by writing your own event loop. Use the select module on the two pipes. If select reports that the pipes are readable, but read returns no data, the process likely exited - call the poll() method on the subprocess object in this case to check whether it is completed, and call your exit callback if it has, or an error callback elsewise.

Marder answered 14/9, 2016 at 8:22 Comment(2)
Thank you very much for taking the time to write this answer.Clothesbasket
Note that the above will buffer the lines in stdout and stderr as readlines() is blocking. If you want an update as it happens, use read() but make sure you empty the buffer when the reader threads finish.Clothesbasket

© 2022 - 2024 — McMap. All rights reserved.