I don't know how this will play out with the hot reloading attempt you mentioned, but the general question you really asked is answerable.
Is there a way to submit something to all processes in a process pool?
The challenge here lies in assuring that really all processes get this something
once and only once and no further execution takes place until every process got it.
You can get this type of necessary synchronization with help of a multiprocessing.Barrier(parties[, action[, timeout]])
. The barrier will hold back parties calling barrier.wait()
until every party has done so and then release them all at once.
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
def foo(x):
for _ in range(int(42e4)):
pass
return x
def reload(something):
print(f"{mp.current_process().name} --- reloading {something} and waiting.")
barrier.wait()
print(f"{mp.current_process().name} --- released.")
def init_barrier(barrier):
globals()['barrier'] = barrier
if __name__ == '__main__':
MAX_WORKERS = 4
barrier = mp.Barrier(MAX_WORKERS)
with ProcessPoolExecutor(
MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
) as executor:
print(list(executor.map(foo, range(10))))
# then something for all processes
futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
for f in futures:
f.result()
print(list(executor.map(foo, range(10))))
Example Output:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3 --- reloading something and waiting.
ForkProcess-2 --- reloading something and waiting.
ForkProcess-1 --- reloading something and waiting.
ForkProcess-4 --- reloading something and waiting.
ForkProcess-1 --- released.
ForkProcess-4 --- released.
ForkProcess-3 --- released.
ForkProcess-2 --- released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Process finished with exit code 0
If you are okay with keeping barrier
a global and multiprocessing.get_context()._name
returns "fork"
, you don't need to use the initializer
because globals will be inherited and accessible through forking.
foo
) to complete, which you should do before submitting toreload
, and now you are waiting for all task submitted toreload
to complete, which you should do, before submitting further tasks (tofoo
). So it's not clear to me what purpose here the barrier serves. – Bullfighter