Multiprocessing: How to use Pool.map on a function defined in a class?
Asked Answered
A

20

212

When I run something like:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

it works fine. However, putting this as a function of a class:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Gives me the following error:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I've seen a post from Alex Martelli dealing with the same kind of problem, but it wasn't explicit enough.

Acotyledon answered 20/7, 2010 at 9:25 Comment(5)
"this as a function of a class"? Can you post the code that actually gets the actual error. Without the actual code we can only guess what you're doing wrong.Skedaddle
As a general remark, there exist pickling modules more powerful than Python's standard pickle module (like the picloud module mentioned in this answer).Bombay
I had a similar problem with closures in IPython.Parallel, but there you could get around the problem by pushing the objects to the nodes. It seems pretty annoying to get around this problem with multiprocessing.Monobasic
Here calculate is picklable, so it seems like this can be solved by 1) creating a function object with a constructor that copies over a calculate instance and then 2) passing an instance of this function object to Pool's map method. No?Puissant
@math I don't believe any of Python's "recent changes" are going to be of any help. Some limitations of the multiprocessing module are due to its goal of being a cross-platform implementation, and the lack of a fork(2)-like system call in Windows. If you don't care about Win32 support, there may be a simpler process-based workaround. Or if you're prepared to use threads instead of processes, you can substitute from multiprocessing import Pool with from multiprocessing.pool import ThreadPool as Pool.Norvan
M
74

I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))
Manasseh answered 26/4, 2011 at 15:10 Comment(6)
This has worked very well for me, thank you. I have found one weakness: I tried using parmap on some functions that passed around a defaultdict and got the PicklingError again. I did not figure out a solution to this, I just reworked my code to not use the defaultdict.Contrabandist
This doesn't work in Python 2.7.2 (default, Jun 12 2011, 15:08:59) [MSC v.1500 32 bit (Intel)] on win32Sedgewick
This does work on Python 2.7.3 Aug 1,2012, 05:14:39. This does not work on giant iterables -> it causes a OSError: [Errno 24] Too many open files due to the number of pipes it opens.Toga
This solution spawns a process for each work item. The solution of "klaus se" below is more efficient.Kiley
I have a similar questionSag
Is it me or there is no class in this solution? Does it answer the original question then?Easily
B
95

I could not use the code posted so far because code using "multiprocessing.Pool" do not work with lambda expressions and code not using "multiprocessing.Pool" spawn as many processes as there are work items.

I adapted the code s.t. it spawns a predefined amount of workers and only iterates through the input list if there exists an idle worker. I also enabled the "daemon" mode for the workers s.t. ctrl-c works as expected.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
Bombay answered 17/4, 2013 at 22:51 Comment(12)
How would you get a progress bar to properly work with this parmap function?Kehr
A question -- I used this solution but noticed that the python processes I spawned stayed active in memory. Any quick thought on how to kill those when your parmap exits?Whale
@klaus-se I know we are discouraged from just saying thanks in comments, but your answer is just too valuable for me, i couldn't resist. I wish i could give you more than just one reputation...Kelvinkelwen
@klaus-se what is the reason for [q_in.put((None,None)) for _ in range(nprocs)]?Merrymerryandrew
@Merrymerryandrew passing (None, None) as the last item indicates to fun that it has reached the end of the sequence of items for each process.Callean
@deshtop: you can with a bounty, if you have enough reputation yourself :-)Procryptic
@klaus se, it doesn't work for me. I still get an error: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failedMalacostracan
Actually it works for me now! Thank you very much! I had a problem because my function takes multiple arguments but I fixed it.Malacostracan
@klaus se: What do you mean by "strg-c works as expected"?Paraffinic
@martineau: this probably means Ctrl+C halts as expected. Strg is the name for Ctrl on a German keyboardZellazelle
_pickle.PicklingError: Can't pickle <function <lambda> at 0x00000254E1FDE6A8>: attribute lookup <lambda> on __main__ failed, using Python 3.7.0 on Windows 10.Eger
This is a great solution, I've been wrestling with this problem <i>all</i> day. One thing I'd like to do is process chunks of my data at a time, using multiprocessing like this, but yielding a single result at a time by wrapping this in a generator function. But when I try to set that up, even when I pass chunks of data to the param code, it somehow manages to run away and consume all the processing in one go.Unpaidfor
S
78

Multiprocessing and pickling is broken and limited unless you jump outside the standard library.

If you use a fork of multiprocessing called pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing's map functions. This is because dill is used instead of pickle or cPickle, and dill can serialize almost anything in python.

pathos.multiprocessing also provides an asynchronous map function… and it can map functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))

See discussions: What can multiprocessing and dill do together?

and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

It even handles the code you wrote initially, without modification, and from the interpreter. Why do anything else that's more fragile and specific to a single case?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

Get the code here: https://github.com/uqfoundation/pathos

And, just to show off a little more of what it can do:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
Sukhum answered 25/1, 2014 at 1:15 Comment(14)
pathos.multiprocessing also has an asynchronous map (amap) that enables the use of a progress bars and other asynchronous programming.Sukhum
I like pathos.multiprocessing, which can serve almost a drop-in replacement of non-parallel map while enjoying the multiprocessing. I have a simple wrapper of pathos.multiprocessing.map, such that it is more memory-efficient when processing a read-only large data structure across multiple cores, see this git repository.Spearman
Seems interesting, but it doesn't install. This is the message pip gives: Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)Eldred
Yes. I have not released in a while as I have been splitting up the functionality into separate packages, and also converting to 2/3 compatible code. Much of the above has been modularized in multiprocess which is 2/3 compatible. See #27873593 and pypi.python.org/pypi/multiprocess.Sukhum
You should probably add a disclaimer if you are the maintainer, by the way.Raven
I still got the pickle error - the other methods worked fine for me though. This could be my own error but there wasn't much in the way of docs on the package site although this was my first choice.Laue
@AlexanderMcFarlane: How did you install it? See: github.com/uqfoundation/pathos/issues/2 for instructions on how to install the most up to date version using pip. Or, if you are looking for pathos.multiprocessing it actually lives in a package called multiprocess, where you can just pip install multiprocess. See the comments above.Sukhum
@xApple: Just as a follow-up, pathos has had a new stable release and is also 2.x and 3.x compatible.Sukhum
@MikeMcKerns How can I run multiprocessing for a nested class?Vivianaviviane
@crazjo: ask this as it's own SO question, or on the multiprocess or pathos GitHub page. The short answer is that (a) I need to see some code, and the comments aren't a great format for that, and (b) in the abstract, it could work by using multiprocess.Pool if your class instance is serializable with dill. I have a feeling that you are producing a class within another class... that's a difficult one to handle.Sukhum
thanks @MikeMcKerns, yes I am talking about a class within another class.. But will create a separate SO question as suggested. ThanksVivianaviviane
@crazjo: please point me to it when you do (mention or whatever).Sukhum
Shoud i add p.join() p.close() in the end of the code?Pretension
@rsujatha: for pathos, one should generally end with close, join, and clear.Sukhum
M
74

I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))
Manasseh answered 26/4, 2011 at 15:10 Comment(6)
This has worked very well for me, thank you. I have found one weakness: I tried using parmap on some functions that passed around a defaultdict and got the PicklingError again. I did not figure out a solution to this, I just reworked my code to not use the defaultdict.Contrabandist
This doesn't work in Python 2.7.2 (default, Jun 12 2011, 15:08:59) [MSC v.1500 32 bit (Intel)] on win32Sedgewick
This does work on Python 2.7.3 Aug 1,2012, 05:14:39. This does not work on giant iterables -> it causes a OSError: [Errno 24] Too many open files due to the number of pipes it opens.Toga
This solution spawns a process for each work item. The solution of "klaus se" below is more efficient.Kiley
I have a similar questionSag
Is it me or there is no class in this solution? Does it answer the original question then?Easily
D
42

There is currently no solution to your problem, as far as I know: the function that you give to map() must be accessible through an import of your module. This is why robert's code works: the function f() can be obtained by importing the following code:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

I actually added a "main" section, because this follows the recommendations for the Windows platform ("Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects").

I also added an uppercase letter in front of Calculate, so as to follow PEP 8. :)

Delta answered 26/7, 2010 at 15:11 Comment(0)
E
18

The solution by mrule is correct but has a bug: if the child sends back a large amount of data, it can fill the pipe's buffer, blocking on the child's pipe.send(), while the parent is waiting for the child to exit on pipe.join(). The solution is to read the child's data before join()ing the child. Furthermore the child should close the parent's end of the pipe to prevent a deadlock. The code below fixes that. Also be aware that this parmap creates one process per element in X. A more advanced solution is to use multiprocessing.cpu_count() to divide X into a number of chunks, and then merge the results before returning. I leave that as an exercise to the reader so as not to spoil the conciseness of the nice answer by mrule. ;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))
Extinctive answered 9/5, 2012 at 23:18 Comment(2)
How do you choose the number of processes?Malacostracan
However it dies pretty quickly because of the error OSError: [Errno 24] Too many open files. I think there need to be some sort of limits on the number of processes for it to work properly...Malacostracan
D
15

I've also struggled with this. I had functions as data members of a class, as a simplified example:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

I needed to use the function self.f in a Pool.map() call from within the same class and self.f did not take a tuple as an argument. Since this function was embedded in a class, it was not clear to me how to write the type of wrapper other answers suggested.

I solved this problem by using a different wrapper that takes a tuple/list, where the first element is the function, and the remaining elements are the arguments to that function, called eval_func_tuple(f_args). Using this, the problematic line can be replaced by return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)). Here is the full code:

File: util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

File: main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

Running main.py will give [11, 22, 33]. Feel free to improve this, for example eval_func_tuple could also be modified to take keyword arguments.

On another note, in another answers, the function "parmap" can be made more efficient for the case of more Processes than number of CPUs available. I'm copying an edited version below. This is my first post and I wasn't sure if I should directly edit the original answer. I also renamed some variables.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         
Donahoe answered 16/5, 2011 at 17:8 Comment(0)
C
14

I know that this question was asked 8 years and 10 months ago but I want to present you my solution:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @staticmethod
    def methodForMultiprocessing(x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

You just need to make your class function into a static method. But it's also possible with a class method:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @classmethod
    def methodForMultiprocessing(cls, x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

Tested in Python 3.7.3

Columbary answered 10/6, 2019 at 21:50 Comment(0)
A
11

I know this was asked over 6 years ago now, but just wanted to add my solution, as some of the suggestions above seem horribly complicated, but my solution was actually very simple.

All I had to do was wrap the pool.map() call to a helper function. Passing the class object along with args for the method as a tuple, which looked a bit like this.

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)
Alexandros answered 22/4, 2017 at 18:22 Comment(0)
E
9

I took klaus se's and aganders3's answer, and made a documented module that is more readable and holds in one file. You can just add it to your project. It even has an optional progress bar !

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from https://mcmap.net/q/47345/-multiprocessing-how-to-use-pool-map-on-a-function-defined-in-a-class

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

EDIT: Added @alexander-mcfarlane suggestion and a test function

Eldred answered 28/5, 2016 at 13:56 Comment(5)
one issue with your progress bar... The bar only measures how inefficiently the workload was split across the processors. If the workload is perfectly split then all the processors will join() at the same time and you will just get a flash of 100% completed in the tqdm display. The only time it will be useful is if each processor has a biased workloadLaue
move tqdm() to wrap the line: result = [q_out.get() for _ in tqdm(sent)] and it works a lot better - great effort though really appreciate this so +1Laue
Thanks for that advice, I will try it and then update the answer !Eldred
The answer is updated, and the progress bar works much better!Eldred
I do not know why, but there was an error when trying this snippet ! _pickle.PicklingError: Can't pickle <function <lambda> at 0x000001717B311E18>: attribute lookup <lambda> on __main__ failedClowers
C
7

Functions defined in classes (even within functions within classes) don't really pickle. However, this works:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()
Catlaina answered 20/7, 2010 at 12:21 Comment(4)
thanks, but i find it a bit dirty to define the function outside the class. The class should bundle all it needs to achieve a given task.Acotyledon
@Memoz: "The class should bundle all it needs" Really? I can't find many examples of this. Most classes depend on other classes or functions. Why call a class dependency "dirty"? What's wrong with a dependency?Skedaddle
Well, the function shouldn't modify existing class data--because it would modify the version in the other process--so it could be a static method. You can sort of pickle a static method: #1914761 Or, for something this trivial, you could use a lambda.Catlaina
@Skedaddle Here is an example. I have a FileParser class with a method parse_one_file(self, filepath: str) and I want to apply it to an entire folder with parse_all_files_in(self, dirpath: str): for filepath in os.listdir(dirpath): self.parse_one_file(filepath), maybe the folder has 1000 files and parsing one file takes time, so I replace my for-loop with a pool.map(self.parse_one_file, os.listdir(dirpath)). It's a simple example, a clean one-liner, yet to make it work I have to put an equivalent function outside the class instead of a neat method? It feels so wrong.Lumberyard
C
3

I modified klaus se's method because while it was working for me with small lists, it would hang when the number of items was ~1000 or greater. Instead of pushing the jobs one at a time with the None stop condition, I load up the input queue all at once and just let the processes munch on it until it's empty.

from multiprocessing import cpu_count, Queue, Process

def apply_func(f, q_in, q_out):
    while not q_in.empty():
        i, x = q_in.get()
        q_out.put((i, f(x)))

# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
    q_in, q_out   = Queue(), Queue()
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [p.start() for p in proc]
    res = [q_out.get() for _ in sent]
    [p.join() for p in proc]

    return [x for i,x in sorted(res)]

Edit: unfortunately now I am running into this error on my system: Multiprocessing Queue maxsize limit is 32767, hopefully the workarounds there will help.

Callean answered 2/9, 2015 at 21:32 Comment(0)
H
3

Here is my solution, which I think is a bit less hackish than most others here. It is similar to nightowl's answer.

someclasses = [MyClass(), MyClass(), MyClass()]

def method_caller(some_object, some_method='the method'):
    return getattr(some_object, some_method)()

othermethod = partial(method_caller, some_method='othermethod')

with Pool(6) as pool:
    result = pool.map(othermethod, someclasses)
Hairbreadth answered 23/2, 2018 at 12:22 Comment(0)
B
3

You can run your code without any issues if you somehow manually ignore the Pool object from the list of objects in the class because it is not pickleable as the error says. You can do this with the __getstate__ function (look here too) as follow. The Pool object will try to find the __getstate__ and __setstate__ functions and execute them if it finds it when you run map, map_async etc:

class calculate(object):
    def __init__(self):
        self.p = Pool()
    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['p']
        return self_dict
    def __setstate__(self, state):
        self.__dict__.update(state)

    def f(self, x):
        return x*x
    def run(self):
        return self.p.map(self.f, [1,2,3])

Then do:

cl = calculate()
cl.run()

will give you the output:

[1, 4, 9]

I've tested the above code in Python 3.x and it works.

Barrie answered 10/7, 2019 at 3:6 Comment(1)
very interesting approach and it does work but for some reason it is orders of magnitude slower than just defining the predicate function in the global scope.Impermanent
A
3

This may not be a very good solution but in my case, I solve it like this.

from multiprocessing import Pool

def foo1(data):
    self = data.get('slf')
    lst = data.get('lst')
    return sum(lst) + self.foo2()

class Foo(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def foo2(self):
        return self.a**self.b   

    def foo(self):
        p = Pool(5)
        lst = [1, 2, 3]
        result = p.map(foo1, (dict(slf=self, lst=lst),))
        return result

if __name__ == '__main__':
    print(Foo(2, 4).foo())

I had to pass self to my function as I have to access attributes and functions of my class through that function. This is working for me. Corrections and suggestions are always welcome.

Arouse answered 28/11, 2019 at 13:7 Comment(0)
B
1

Here is a boilerplate I wrote for using multiprocessing Pool in python3, specifically python3.7.7 was used to run the tests. I got my fastest runs using imap_unordered. Just plug in your scenario and try it out. You can use timeit or just time.time() to figure out which works best for you.

import multiprocessing
import time

NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'starmap'  # 'imap_unordered' or 'starmap' or 'apply_async'

def process_chunk(a_chunk):
    print(f"processig mp chunk {a_chunk}")
    return a_chunk


map_jobs = [1, 2, 3, 4]

result_sum = 0

s = time.time()
if MP_FUNCTION == 'imap_unordered':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    for i in pool.imap_unordered(process_chunk, map_jobs):
        result_sum += i
elif MP_FUNCTION == 'starmap':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    try:
        map_jobs = [(i, ) for i in map_jobs]
        result_sum = pool.starmap(process_chunk, map_jobs)
        result_sum = sum(result_sum)
    finally:
        pool.close()
        pool.join()
elif MP_FUNCTION == 'apply_async':
    with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
        result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
    result_sum = sum(result_sum)
print(f"result_sum is {result_sum}, took {time.time() - s}s")

In the above scenario imap_unordered actually seems to perform the worst for me. Try out your case and benchmark it on the machine you plan to run it on. Also read up on Process Pools. Cheers!

Book answered 19/5, 2020 at 7:18 Comment(0)
D
0

I'm not sure if this approach has been taken but a work around i'm using is:

from multiprocessing import Pool

t = None

def run(n):
    return t.f(n)

class Test(object):
    def __init__(self, number):
        self.number = number

    def f(self, x):
        print x * self.number

    def pool(self):
        pool = Pool(2)
        pool.map(run, range(10))

if __name__ == '__main__':
    t = Test(9)
    t.pool()
    pool = Pool(2)
    pool.map(run, range(10))

Output should be:

0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81
Disapprobation answered 13/9, 2016 at 15:2 Comment(0)
L
0
class Calculate(object):
  # Your instance method to be executed
  def f(self, x, y):
    return x*y

if __name__ == '__main__':
  inp_list = [1,2,3]
  y = 2
  cal_obj = Calculate()
  pool = Pool(2)
  results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

There is a possibility that you would want to apply this function for each different instance of the class. Then here is the solution for that also

class Calculate(object):
  # Your instance method to be executed
  def __init__(self, x):
    self.x = x

  def f(self, y):
    return self.x*y

if __name__ == '__main__':
  inp_list = [Calculate(i) for i in range(3)]
  y = 2
  pool = Pool(2)
  results = pool.map(lambda x: x.f(y), inp_list)
Laresa answered 23/2, 2017 at 22:20 Comment(0)
D
0

From http://www.rueckstiess.net/research/snippets/show/ca1d7d90 and http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html

We can make an external function and seed it with the class self object:

from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
    return square_class.square_int(*arg, **kwarg)

class square_class:
    def square_int(self, i):
        return i * i

    def run(self, num):
        results = []
        results = Parallel(n_jobs= -1, backend="threading")\
            (delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
        print(results)

OR without joblib:

from multiprocessing import Pool
import time

def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)

class C:
    def f(self, name):
        print 'hello %s,'%name
        time.sleep(5)
        print 'nice to meet you.'

    def run(self):
        pool = Pool(processes=2)
        names = ('frank', 'justin', 'osi', 'thomas')
        pool.map(unwrap_self_f, zip([self]*len(names), names))

if __name__ == '__main__':
    c = C()
    c.run()
Dotard answered 13/8, 2018 at 4:42 Comment(0)
C
0

To implement multiprocessing in aws lambda we have two ways. Note : Threadpool doesn't work in aws lambda

  1. use the example solution which is provided by aws team please use this link https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/

  2. use this package https://pypi.org/project/lambda-multiprocessing/

i have implemented my lambda function with both the solution and both is working fine can't share my code here but this 2 links will help you for sure.

i find 2 nd way more easy to implement.

Charmain answered 23/12, 2022 at 6:6 Comment(0)
I
0

There are also some libraries to make this easier, for example autothread (only for Python 3.6 and up):

import autothread

class calculate(object):
    def run(self):
        @autothread.multiprocessed()
        def f(x: int):
            return x*x

        return f([1,2,3])

cl = calculate()
print(cl.run())

You can also take a look at lox.

Ivonneivor answered 24/12, 2022 at 12:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.