Python: execute cat subprocess in parallel
Asked Answered
K

2

19

I am running several cat | zgrep commands on a remote server and gathering their output individually for further processing:

class MainProcessor(mp.Process):
    def __init__(self, peaks_array):
        super(MainProcessor, self).__init__()
        self.peaks_array = peaks_array

    def run(self):
        for peak_arr in self.peaks_array:
            peak_processor = PeakProcessor(peak_arr)
            peak_processor.start()

class PeakProcessor(mp.Process):
    def __init__(self, peak_arr):
        super(PeakProcessor, self).__init__()
        self.peak_arr = peak_arr

    def run(self):
        command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
        log_lines = (subprocess.check_output(command, shell=True)).split('\n')
        process_data(log_lines)

This, however, results in sequential execution of the subprocess('ssh ... cat ...') commands. Second peak waits for first to finish and so on.

How can I modify this code so that the subprocess calls run in parallel, while still being able to collect the output for each individually?

Karlenekarlens answered 12/5, 2014 at 14:13 Comment(1)
--mmap is useless when reading from a pipe...Wideangle
F
0

Another approach (rather than the other suggestion of putting shell processes in the background) is to use multithreading.

The run method that you have would then do something like this:

thread.start_new_thread ( myFuncThatDoesZGrep)

To collect results, you can do something like this:

class MyThread(threading.Thread):
   def run(self):
       self.finished = False
       # Your code to run the command here.
       blahBlah()
       # When finished....
       self.finished = True
       self.results = []

Run the thread as stated above in the link on multithreading. When your thread object has myThread.finished == True, then you can collect the results via myThread.results.

Faline answered 12/5, 2014 at 14:22 Comment(7)
With this approach, how can I get the output of each once the threads finish running? And I'm already using a process, why would a thread work but not a process?Karlenekarlens
A process will work - the other stated answer suggested that you do the multi-process work in the actual shell, by using &. In that approach, you only have one python process, but it spawns many shell processes. In the multi-threaded approach, you have multiple python processes, but one shell process per python process. To collect the results from multiple threads, you'd create classes that subclass Thread. Then put results from one thread as object data in that class.Faline
But isn't that what the code above is doing? I'm starting a new process for each peak, then run the subprocess and process_data from its' run method.Karlenekarlens
No, when you run subprocess, your code blocks (stops running) until the command is complete. So each time you do run(), you're running each command in serial. If you want to do it in parallel, that's where threading comes in -- you run multiple threads in parallel, and each one runs a single command serially.Faline
Great, is the self.finished variable really necessary or a call to thread.join() would be enough?Karlenekarlens
self.finished isn't really necessary.Faline
don't use low-level thread module; use threading module instead.Lissotrichous
L
41

You don't need multiprocessing or threading to run subprocesses in parallel. For example:

#!/usr/bin/env python
from subprocess import Popen

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
             for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]

it runs 5 shell commands simultaneously. Note: neither threads nor multiprocessing module are used here. There is no point to add ampersand & to the shell commands: Popen doesn't wait for the command to complete. You need to call .wait() explicitly.

It is convenient but it is not necessary to use threads to collect output from subprocesses:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
                   stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
             for i in range(5)]

# collect output in parallel
def get_lines(process):
    return process.communicate()[0].splitlines()

outputs = Pool(len(processes)).map(get_lines, processes)

Related: Python threading multiple bash subprocesses?.

Here's code example that gets output from several subprocesses concurrently in the same thread (Python 3.8+):

#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE, STDOUT


async def get_lines(shell_command):
    p = await asyncio.create_subprocess_shell(
        shell_command, stdin=PIPE, stdout=PIPE, stderr=STDOUT
    )
    return (await p.communicate())[0].splitlines()


async def main():
    # get commands output in parallel
    coros = [
        get_lines(
            f'"{sys.executable}" -c "print({i:d}); import time; time.sleep({i:d})"'
        )
        for i in range(5)
    ]
    print(await asyncio.gather(*coros))


if __name__ == "__main__":
    asyncio.run(main())

Old (2014) answer (Python 3.4?):

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

@asyncio.coroutine
def get_lines(shell_command):
    p = yield from asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (yield from p.communicate())[0].splitlines()

if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                    .format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()
Lissotrichous answered 12/5, 2014 at 18:29 Comment(5)
@j-f-sebastian Umm... I am confused as to what is the difference between code snippets #2 and #3 in your answer. Can you please point out to some resource or explain what "gets output... in the same thread" mean? BTW, thanks a lot for #2 :)Colonialism
@SaheelGodhane: multiprocessing.dummy.Pool()-based solution uses multiple (several/more than one) threads. asyncio-based solution uses a single thread here. To understand how to do several things at once in the same thread, see Python Concurrency From the Ground Up: LIVE!Lissotrichous
Excellent example! I tried to implement snippets #1 with the new subprocess.run() functionality but it looks like that won't work because that function always waits for the process to finish. I had to switch back to using Popen instead.Cusec
@Lissotrichous you rock! I'd been looking for a way to execute commands in parrallel on Windows without using if __name__ == __main__: for 2 days this was the breakthrough I needed, I ended up making a Q & A specifically for it since I had so much trouble figuring out how to do it after looking at over 20 similar Q&As. #53983647Alliber
How to do this with subprocess.run() with shell=False? Say a shell command like clip-set "$password" && sleep 15 && clip-set ""Gan
F
0

Another approach (rather than the other suggestion of putting shell processes in the background) is to use multithreading.

The run method that you have would then do something like this:

thread.start_new_thread ( myFuncThatDoesZGrep)

To collect results, you can do something like this:

class MyThread(threading.Thread):
   def run(self):
       self.finished = False
       # Your code to run the command here.
       blahBlah()
       # When finished....
       self.finished = True
       self.results = []

Run the thread as stated above in the link on multithreading. When your thread object has myThread.finished == True, then you can collect the results via myThread.results.

Faline answered 12/5, 2014 at 14:22 Comment(7)
With this approach, how can I get the output of each once the threads finish running? And I'm already using a process, why would a thread work but not a process?Karlenekarlens
A process will work - the other stated answer suggested that you do the multi-process work in the actual shell, by using &. In that approach, you only have one python process, but it spawns many shell processes. In the multi-threaded approach, you have multiple python processes, but one shell process per python process. To collect the results from multiple threads, you'd create classes that subclass Thread. Then put results from one thread as object data in that class.Faline
But isn't that what the code above is doing? I'm starting a new process for each peak, then run the subprocess and process_data from its' run method.Karlenekarlens
No, when you run subprocess, your code blocks (stops running) until the command is complete. So each time you do run(), you're running each command in serial. If you want to do it in parallel, that's where threading comes in -- you run multiple threads in parallel, and each one runs a single command serially.Faline
Great, is the self.finished variable really necessary or a call to thread.join() would be enough?Karlenekarlens
self.finished isn't really necessary.Faline
don't use low-level thread module; use threading module instead.Lissotrichous

© 2022 - 2024 — McMap. All rights reserved.