I've also struggled with this. I had functions as data members of a class, as a simplified example:
from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# Needed to do something like this (the following line won't work)
return pool.map(self.f,list1,list2)
I needed to use the function self.f in a Pool.map() call from within the same class and self.f did not take a tuple as an argument. Since this function was embedded in a class, it was not clear to me how to write the type of wrapper other answers suggested.
I solved this problem by using a different wrapper that takes a tuple/list, where the first element is the function, and the remaining elements are the arguments to that function, called eval_func_tuple(f_args). Using this, the problematic line can be replaced by return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)). Here is the full code:
File: util.py
def add(a, b): return a+b
def eval_func_tuple(f_args):
"""Takes a tuple of a function and args, evaluates and returns result"""
return f_args[0](*f_args[1:])
File: main.py
from multiprocessing import Pool
import itertools
import util
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# The following line will now work
return pool.map(util.eval_func_tuple,
itertools.izip(itertools.repeat(self.f), list1, list2))
if __name__ == '__main__':
myExample = Example(util.add)
list1 = [1, 2, 3]
list2 = [10, 20, 30]
print myExample.add_lists(list1, list2)
Running main.py will give [11, 22, 33]. Feel free to improve this, for example eval_func_tuple could also be modified to take keyword arguments.
On another note, in another answers, the function "parmap" can be made more efficient for the case of more Processes than number of CPUs available. I'm copying an edited version below. This is my first post and I wasn't sure if I should directly edit the original answer. I also renamed some variables.
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
numProcesses = len(processes)
processNum = 0
outputList = []
while processNum < numProcesses:
endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)
for proc in processes[processNum:endProcessNum]:
proc.start()
for proc in processes[processNum:endProcessNum]:
proc.join()
for proc,c in pipe[processNum:endProcessNum]:
outputList.append(proc.recv())
processNum = endProcessNum
return outputList
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
IPython.Parallel
, but there you could get around the problem by pushing the objects to the nodes. It seems pretty annoying to get around this problem with multiprocessing. – Monobasiccalculate
is picklable, so it seems like this can be solved by 1) creating a function object with a constructor that copies over acalculate
instance and then 2) passing an instance of this function object toPool
'smap
method. No? – Puissantmultiprocessing
module are due to its goal of being a cross-platform implementation, and the lack of afork(2)
-like system call in Windows. If you don't care about Win32 support, there may be a simpler process-based workaround. Or if you're prepared to use threads instead of processes, you can substitutefrom multiprocessing import Pool
withfrom multiprocessing.pool import ThreadPool as Pool
. – Norvan