I thought I post my solution to a somewhat simpler problem as well, since whenever I'm looking for my problem this page comes up.
I somewhat based this loosely on @MikeHunter's solution above. The reason why I needed something slightly different is that the arrays that I want to write at the end of each process are fairly large meaning putting them into the queue and getting them from the queue and writing them using a different process means a lot of pickling and unpickling of extremely large arrays. This does not handle the problem of checking many sub-problems and sub-sub problems as requested by OP, but it handle the "title" of the question!
So what do I do?
I parse a lock that all processes have access to and write to the file inside a Lock.acquire()
and Lock.release()
wrapper. That way none of the processes can write when any of the other ones is. All of this to handle writing to HDF5 files in serial without the MPI compilation requirement.
from multiprocessing import Process, Queue, Lock
import h5py
import numpy as np
from time import sleep, time
def func(i, l, filename, subfilename):
# Reading from the subfile
with h5py.File(subfilename, 'r') as ds:
array = ds['array'][:]
sleeptime = np.random.rand(1)*4 + 1
sleep(sleeptime[0])
# Print array loaded to compare to output in the summary file
print(i, f'{sleeptime[0]:.3f}', array)
# Lock out any other process from writing to the summary file
l.acquire()
with h5py.File(filename, 'r+') as ds:
ds['array'][i, :] = array
# Release the lock
l.release()
if __name__ == '__main__':
N = 10
Nsample = 5
subfilenames = [f'sub_{i:>02d}.h5' for i in range(N)]
for i in range(N):
with h5py.File(subfilenames[i], 'w') as ds:
disp = ds.create_dataset(
'array', data=np.random.randint(0, 5, size=(5,)), dtype='f')
filename = 'test.h5'
with h5py.File(filename, 'w') as ds:
disp = ds.create_dataset('array', (N, Nsample), dtype='f')
# Create a lock that is communicated to the workers
l = Lock()
# Start the timer
t0 = time()
# Distribute the work to the workers
processes = []
print(" T sleeptime array", flush=True)
print("-----------------------", flush=True)
for i in range(N):
p = Process(target=func, args=(
i, l, filename, subfilenames[i]))
p.start()
processes.append(p)
# Wait for the workers to finish
for p in processes:
p.join()
# Print time taken
print(f'Total time taken: {time()-t0:.2f} s')
If you save the script as hello.py
you can run and sort the output like so:
python hello.py | sort
Which should generate something like this:
T sleeptime array
-----------------------
0 4.336 [4. 1. 1. 0. 2.]
1 2.363 [2. 1. 1. 1. 3.]
2 2.741 [1. 2. 2. 4. 3.]
3 1.078 [1. 4. 4. 3. 0.]
4 1.327 [4. 4. 4. 4. 1.]
5 4.174 [1. 3. 1. 0. 4.]
6 2.095 [4. 1. 0. 3. 0.]
7 1.091 [3. 4. 4. 0. 4.]
8 1.601 [4. 3. 3. 1. 4.]
9 4.550 [3. 3. 3. 4. 0.]
Total time taken: 4.94 s
Check against written HDF5 file:
h5dump test.h5
which should results in something like this
HDF5 "test.h5" {
GROUP "/" {
DATASET "array" {
DATATYPE H5T_IEEE_F32LE
DATASPACE SIMPLE { ( 10, 5 ) / ( 10, 5 ) }
DATA {
(0,0): 4, 1, 1, 0, 2,
(1,0): 2, 1, 1, 1, 3,
(2,0): 1, 2, 2, 4, 3,
(3,0): 1, 4, 4, 3, 0,
(4,0): 4, 4, 4, 4, 1,
(5,0): 1, 3, 1, 0, 4,
(6,0): 4, 1, 0, 3, 0,
(7,0): 3, 4, 4, 0, 4,
(8,0): 4, 3, 3, 1, 4,
(9,0): 3, 3, 3, 4, 0
}
}
}
}
Note on the update
I first was using a queue for my use-case, but I realized that a simple multiprocessing.Lock
would do the trick. No need for a complicated Queue.put
Queue.get
wrap.
Note there are better ways of doing this using mpi4py, but I needed the user not to worry about MPI.
multiprocessing.Lock
to synchronize multiple processes. – Rann