Submit code for execution to all processes in a concurrent.futures.ProcessPool
Asked Answered
J

1

8

Context:

  • A Python application server that uses a concurrent.futures.process.ProcessPool to execute code
  • We sometimes want to hot reload imported code without restarting the entire server process

(yes I know importlib.reload has caveats)

To get this to work I imagine I would have to execute the importlib.reload in every multiprocessing process that is managed by the process pool.

Is there a way to submit something to all processes in a process pool?

Jeremiahjeremias answered 16/10, 2020 at 16:27 Comment(0)
A
6

I don't know how this will play out with the hot reloading attempt you mentioned, but the general question you really asked is answerable.

Is there a way to submit something to all processes in a process pool?

The challenge here lies in assuring that really all processes get this something once and only once and no further execution takes place until every process got it.

You can get this type of necessary synchronization with help of a multiprocessing.Barrier(parties[, action[, timeout]]). The barrier will hold back parties calling barrier.wait() until every party has done so and then release them all at once.

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


def foo(x):
    for _ in range(int(42e4)):
        pass
    return x


def reload(something):
    print(f"{mp.current_process().name} --- reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name} --- released.")


def init_barrier(barrier):
    globals()['barrier'] = barrier


if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
            MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
        print(list(executor.map(foo, range(10))))
        # then something for all processes
        futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
        for f in futures:
            f.result()

        print(list(executor.map(foo, range(10))))

Example Output:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3 --- reloading something and waiting.
ForkProcess-2 --- reloading something and waiting.
ForkProcess-1 --- reloading something and waiting.
ForkProcess-4 --- reloading something and waiting.
ForkProcess-1 --- released.
ForkProcess-4 --- released.
ForkProcess-3 --- released.
ForkProcess-2 --- released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

If you are okay with keeping barrier a global and multiprocessing.get_context()._name returns "fork", you don't need to use the initializer because globals will be inherited and accessible through forking.

Agenda answered 25/10, 2020 at 12:29 Comment(4)
In your example you have already waited for all outstanding tasks (submitted to foo) to complete, which you should do before submitting to reload, and now you are waiting for all task submitted to reload to complete, which you should do, before submitting further tasks (to foo). So it's not clear to me what purpose here the barrier serves.Bullfighter
@Bullfighter The barrier assures that really all processes get the reload by preventing a process to get the same task more than once. Without the barrier you don't know how many processes really end up executing the task (depends on OS-scheduling and how long the task takes).Agenda
So I think the point I was missing is that you are saying that Barrier(MAX_WORKERS) requires that MAX_WORKERS separate processes must make a call to wait before wait returns thus guaranteeing each process does the reload. I thought wait returned immediately with a count. Pretty silly.Bullfighter
@Bullfighter Well yeah, a process/worker is always separate. OP is essentially asking how to run an initializer multiple times after the processes have already started. Comment the barrier.wait() out and run it a couple of times. You'll eventually get to see an output where a worker doesn't reload because another did the task twice.Agenda

© 2022 - 2024 — McMap. All rights reserved.