Python: multiprocessing - terminate other processes after one process finished
Asked Answered
P

2

6

I have some programm in which multiple processes try to finish some function. My aim now is to stop all the other processes after one process has successfully finished the function.

The python program shown below unfortunately waits until all the processes successfully solved the question given in find function. How can I fix my problem?

import multiprocessing
import random

FIND = 50
MAX_COUNT = 100000
INTERVAL = range(10)

def find(process, initial, return_dict):
    succ = False
    while succ == False:
        start=initial
        while(start <= MAX_COUNT):
            if(FIND == start):
                return_dict[process] = f"Found: {process}, start: {initial}"
                succ = True
                break;
            i = random.choice(INTERVAL)
            start = start + i
            print(start)

processes = []
manager = multiprocessing.Manager()
return_code = manager.dict()
for i in range(5):
    process = multiprocessing.Process(target=find, args=(f'computer_{i}', i, return_code))
    processes.append(process)
    process.start()

for process in processes:
    process.join()


print(return_code.values())

output can be for example:

['Found: computer_0, start: 0', 'Found: computer_4, start: 4', 'Found: computer_2, start: 2', 'Found: computer_1, start: 1', 'Found: computer_3, start: 3']

But this output shows me the program is waiting until all processes are finished ...

Periodical answered 20/2, 2022 at 9:8 Comment(1)
You could use a "multiprocessing event" that signals a solution has been found in another process to make the other processes know they should exit cleanly pymotw.com/3/multiprocessing/…Gramercy
H
6

Use an Event to govern if the processes should keep running.

Basically, it replaces succ with something that works over all processes.

import multiprocessing
import random

FIND = 50
MAX_COUNT = 1000


def find(process, initial, return_dict, run):
    while run.is_set():
        start = initial
        while start <= MAX_COUNT:
            if FIND == start:
                return_dict[process] = f"Found: {process}, start: {initial}"
                run.clear() # Stop running.
                break
            start += random.randrange(0, 10)
            print(start)


if __name__ == "__main__":
    processes = []
    manager = multiprocessing.Manager()
    return_code = manager.dict()
    run = manager.Event()
    run.set()  # We should keep running.
    for i in range(5):
        process = multiprocessing.Process(
            target=find, args=(f"computer_{i}", i, return_code, run)
        )
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

    print(return_code.values())

Note that using __name__ is mandatory for multiprocessing to work properly when the process-creation method is set to 'spawn' which is the default on ms-windows and macOS but also available on linux. On those systems, the main module is imported into newly created Python processes. This needs to happen without side effects such as starting a process, and the __name__ mechanism ensures that.

Hillier answered 20/2, 2022 at 11:0 Comment(0)
S
2

You can do this using multiprocessing.Queue and multiprocessing.Queue.get. How this works is that get by default blocks until there's something in the queue. So it will return the first result that gets appended to the queue, i.e. one of the processes finishing the search. After that, we can iterate over the processes and terminate each one (note that terminating a process doesn't kill child processes spawned by the process unless daemon is set to True).

import multiprocessing
import random
import time 

FIND = 50
MAX_COUNT = 100000
INTERVAL = range(10)

queue = multiprocessing.Queue(maxsize=1)

def find(process, initial):
    succ = False
    while succ == False:
        start=initial
        while(start <= MAX_COUNT):
            if(FIND == start):
                queue.put(f"Found: {process}, start: {initial}")
                break;
            i = random.choice(INTERVAL)
            start = start + i
            print(process, start)

processes = []
manager = multiprocessing.Manager()
for i in range(5):
    process = multiprocessing.Process(target=find, args=(f'computer_{i}', i))
    processes.append(process)
    process.start()

ret = queue.get()
for i in range(5):
    process = processes[i]
    process.terminate()
    print(f'terminated {i}')

print(ret)

You might also want to look into setting the daemon, which kills the processes after the main process exits.

Springfield answered 20/2, 2022 at 10:6 Comment(2)
FYI, it just hangs in python3 and up.Dismay
Fixed it. You still need an Event, kind of like in the example above, to properly exit the processes. Just queue doesn't seem to work.Dismay

© 2022 - 2024 — McMap. All rights reserved.