adapt multiprocessing Pool to mpi4py
Asked Answered
E

3

9

I'm using multiprocessing Pool to run a parallelized simulation in Python and it works well in a computer with multiple cores. Now I want to execute the program on a cluster using several nodes. I suppose multiprocessing cannot apply on distributed memory. But mpi4py seems a good option. So what is the simplest mpi4py equivalence to these codes:

from multiprocessing import Pool

pool = Pool(processes=16)

pool.map(functionName,parameters_list)
Erick answered 10/7, 2014 at 7:1 Comment(0)
R
5

There's an old package of mine that is built on mpi4py which enables a functional parallel map for MPI jobs. It's not built for speed -- it was built to enable aMPI parallel map from the interpreter onto a compute cluster (i.e. without the need to run from the mpiexec from the command line). Essentially:

>>> from pyina.launchers import MpiPool, MpiScatter
>>> pool = MpiPool()
>>> jobs = MpiScatter()
>>> def squared(x):
...   return x**2
... 
>>> pool.map(squared, range(4))
[0, 1, 4, 9]
>>> jobs.map(sqaured, range(4))
[0, 1, 4, 9]

Showing off the "worker pool" strategy and the "scatter-gather" strategy of distributing jobs to the workers. Of course, I wouldn't use it for such a small job like squared because the overhead of spawning the MPI world is really quite high (much higher than setting up a multiprocessing Pool). However, if you have a big job to run, like you would normally run on a cluster using MPI, then pyina can be a big benefit for you.

However, the real advantage of using pyina is that it can not only spawn jobs with MPI, but it can spawn jobs to a scheduler. pyina understands and abstracts the launch syntax for several schedulers.

A typical call to a pyina map using a scheduler goes like this:

>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>> 
>>> # instantiate and configure a worker pool
>>> from pyina.launchers import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]

Several common configurations are available as pre-configured maps. The following is identical to the above example:

>>> # instantiate and configure a pre-configured worker pool
>>> from pyina.launchers import TorqueMpiPool
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> pool = TorqueMpiPool(**config)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]

pyina needs some TLC, in that it's still python2.7 and that it hasn't had a release in several years… but it's been kept up to date (on github) otherwise and is able to "get the job done" for me running jobs on large-scale computing clusters over the past 10 years -- especially when coupled with pathos (which provides ssh tunneling and a unified interface for multiprocessing and ParallelPython maps). pyina doesn't yet utilize shared memory, but does do embarrassingly functional parallel computing pretty well. The interactions with the scheduler are pretty good in general, but can be a bit rough around the edges for several failure cases -- and the non-blocking maps need a lot of work. That having been said, it provides a pretty useful interface to run embarrassingly parallel jobs on a cluster with MPI.

Get pyina (and pathos) here: https://github.com/uqfoundation

Ritchey answered 5/1, 2016 at 18:33 Comment(0)
M
4

There is an MPIPool class implemented here.

For an example of how I use this, check out this gist on GitHub.

Mathis answered 19/8, 2014 at 15:38 Comment(1)
Can you include a summary of how to use it, or a short example in your answer as well? From the Help Center How to Write a Good Answer: "Links to external resources are encouraged, but please add context around the link so your fellow users will have some idea what it is and why it’s there. Always quote the most relevant part of an important link, in case the target site is unreachable or goes permanently offline."Vexation
O
0

I use the following code to be equivalent to multiprocessing.Pool. It has not yet been tested extensively, but it seems to work just fine:

from functools import partial
function = partial(...)  # Store all fixed parameters this way if needed

if use_MPI:
    arguments = range(num_runs)
    run_data = None

    # mpi4py
    comm = MPI.COMM_SELF.Spawn(sys.executable, args=['MPI_slave.py'], maxprocs=num_runs)  # Init
    comm.bcast(function, root=MPI.ROOT)     # Equal for all processes
    comm.scatter(arguments, root=MPI.ROOT)  # Different for each process
    comm.Barrier()                          # Wait for everything to finish...
    run_data = comm.gather(run_data, root=MPI.ROOT)  # And gather everything up
else:        
    # multiprocessing
    p = Pool(multiprocessing.cpu_count())
    run_data = p.map(function, range(num_runs))

This then uses a separate file 'MPI_slave.py':

from mpi4py import MPI
# import the function you actually pass to this file here!!!
comm = MPI.COMM_SELF.Get_parent()
size = comm.Get_size()
rank = comm.Get_rank()

def runSlaveRun():
    function = None
    options = None
    # print("Process {}/{} reporting for duty!".format(rank, size))

    function = comm.bcast(function, root=0)
    arguments = comm.scatter(options, root=0)
    results = function(arguments)
    comm.Barrier()
    comm.gather(results, root=0)
    comm.Disconnect()

if __name__ == '__main__':
    runSlaveRun()
Offspring answered 5/1, 2016 at 10:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.