Python multiprocessing module: join processes with timeout
Asked Answered
L

4

19

I'm doing an optimization of parameters of a complex simulation. I'm using the multiprocessing module for enhancing the performance of the optimization algorithm. The basics of multiprocessing I learned at http://pymotw.com/2/multiprocessing/basics.html. The complex simulation lasts different times depending on the given parameters from the optimization algorithm, around 1 to 5 minutes. If the parameters are chosen very badly, the simulation can last 30 minutes or more and the results are not useful. So I was thinking about build in a timeout to the multiprocessing, that terminates all simulations that last more than a defined time. Here is an abstracted version of the problem:

import numpy as np
import time
import multiprocessing

def worker(num):
    
    time.sleep(np.random.random()*20)

def main():
    
    pnum = 10    
    
    procs = []
    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)
        
    for p in procs:
        p.join(5)
        print('stopping', p.name)
     
if __name__ == "__main__":
    main()

The line p.join(5) defines the timeout of 5 seconds. Because of the for-loop for p in procs: the program waits 5 seconds until the first process is finished and then again 5 seconds until the second process is finished and so on, but i want the program to terminate all processes that last more than 5 seconds. Additionally, if none of the processes last longer than 5 seconds the program must not wait this 5 seconds.

Lexicon answered 26/9, 2014 at 16:12 Comment(2)
Take a look here: https://mcmap.net/q/25529/-using-module-39-subprocess-39-with-timeout/2615940. It may be a duplicate, but I'm not sure enough to flag it for you. If the proposed solution to that answer doesn't work for you, please let us know why.Weinshienk
This is an interesting article, but as I see it, its the solution for consecutively and not simultaneously started processes. My program should start the processes at the same time and kill those which exceed a 'global' timeout.Lexicon
S
18

You can do this by creating a loop that will wait for some timeout amount of seconds, frequently checking to see if all processes are finished. If they don't all finish in the allotted amount of time, then terminate all of the processes:

TIMEOUT = 5 
start = time.time()
while time.time() - start <= TIMEOUT:
    if not any(p.is_alive() for p in procs):
        # All the processes are done, break now.
        break

    time.sleep(.1)  # Just to avoid hogging the CPU
else:
    # We only enter this if we didn't 'break' above.
    print("timed out, killing all processes")
    for p in procs:
        p.terminate()
        p.join()
Slickenside answered 26/9, 2014 at 16:34 Comment(6)
Thank you, this seems like a appropriate solution. Unfortunately this code does not break if the processes are finished before the timeout is reached. I tried it by setting the worker function to time.sleep(1), and after 1 second all p.is_alive() return False. So the code should go to the break statement now, but its still waiting for the timeout...Lexicon
I found the problem: print (p.is_alive() for p in procs) returns <generator object <genexpr> at 0x05712B20> but it should be a list with elements True or False to be comprehensible for any()Lexicon
@Lexicon use any([p.is_alive() for p in procs]). That way it becomes a list comprehension instead of a generator expression.Slickenside
@Lexicon oh, I just noticed you're using np.any instead of the built-in any. That's why the generator expression didn't work. np.any only works with array-like objects.Slickenside
built-in any with list comprehension is the key! Thank you!Lexicon
Is it needed to call join() after terminating the process?Unbelieving
B
14

If you want to kill all the processes you could use the Pool from multiprocessing you'll need to define a general timeout for all the execution as opposed of individual timeouts.

import numpy as np
import time
from multiprocessing import Pool

def worker(num):
    xtime = np.random.random()*20
    time.sleep(xtime)
    return xtime

def main():

    pnum = 10
    pool = Pool()
    args = range(pnum)
    pool_result = pool.map_async(worker, args)

    # wait 5 minutes for every worker to finish
    pool_result.wait(timeout=300)

    # once the timeout has finished we can try to get the results
    if pool_result.ready():
        print(pool_result.get(timeout=1))

if __name__ == "__main__":
    main()

This will get you a list with the return values for all your workers in order.
More information here: https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool

Bennet answered 31/3, 2015 at 21:8 Comment(2)
I don't think this actually terminates the threads in the pool -- it just returns execution to the main thread even if they're not doneAnnmarie
I do not understand why we do pool_result.get(timeout=1) i.e.: if the pool_result is ready already, shouldn't the results also be ready, and no timeout is needed?Fur
L
3

Thanks to the help of dano I found a solution:

import numpy as np
import time
import multiprocessing

def worker(num):

    time.sleep(np.random.random()*20)

def main():

    pnum = 10    
    TIMEOUT = 5 
    procs = []
    bool_list = [True]*pnum

    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)

    start = time.time()
    while time.time() - start <= TIMEOUT:
        for i in range(pnum):
            bool_list[i] = procs[i].is_alive()
            
        print(bool_list)
            
        if np.any(bool_list):  
            time.sleep(.1)  
        else:
            break
    else:
        print("timed out, killing all processes")
        for p in procs:
            p.terminate()
            
    for p in procs:
        print('stopping', p.name,'=', p.is_alive())
        p.join()

if __name__ == "__main__":
    main()

Its not the most elegant way, I'm sure there is a better way than using bool_list. Processes that are still alive after the timeout of 5 seconds will be killed. If you are setting shorter times in the worker function than the timeout, you will see that the program stops before the timeout of 5 seconds is reached. I'm still open for more elegant solutions if there are :)

Lexicon answered 27/9, 2014 at 11:31 Comment(0)
A
0

I am working on a similar problem, managing long-running computations which may take a very long time, on a limited queue of workers. Rather than timing out the entire set of workers when the timeout is reached, I want each process to only last for timeout seconds from the time it is started. In addition, I built in the ability to automatically retry the task, up to n_attempts.

sim_specs_remaining = [(s, 0) for s in sim_specs]
queue = Queue()
processes = [
    start_new_process(*sim_specs_remaining.pop(0))
    for i in range(min(num_workers, len(sim_specs_remaining)))
]
results = []

while sim_specs_remaining or len(processes) > 0:
    if len(processes) < num_workers and len(sim_specs_remaining) > 0:
        spec, n_attempts = sim_specs_remaining.pop(0)
        if n_attempts < 3:
            processes.append(start_new_process(spec, n_attempts))

    for p in processes:
        start_time, process, spec, n_attempts = p
        if process.is_alive():
            if timeout is not None and time.time() - start_time > timeout:
                print(f"{spec.uid} exceeded the time limit and was terminated.")
                process.terminate()
                process.join()
                processes.remove(p)
                sim_specs_remaining.append((spec, n_attempts + 1))
            else:
                process.join(0.1)  # This reduces the cadence of the `while` loop
        else:
            try:
                result = queue.get(timeout=1)
                results.append(result)
            except TimeoutError:
                print("A task exceeded the time limit and was terminated.")
            finally:
                process.join()
                processes.remove(p)
Amusement answered 20/1, 2024 at 22:27 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.