Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?
Asked Answered
V

3

49

I am trying to write an application that applies a function concurrently with a multiprocessing.Pool. I would like this function to be an instance method (so I can define it differently in different subclasses). This doesn't seem to be possible; as I have learned elsewhere, apparently bound methods can't be pickled. So why does starting a multiprocessing.Process with a bound method as a target work? The following code:

import multiprocessing

def test1():
    print "Hello, world 1"

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print answer
        print
        for answer in pool.imap(self.square, range(10)):
            print answer

    def test2(self):
        print "Hello, world 2"

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

Produces this output:

Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner
    self.run()
  File "C:\Python27\Lib\threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

Why can Processes handle bound methods, but not Pools?

Valedictorian answered 5/12, 2014 at 14:39 Comment(4)
It's because they can't be serialized by pickle. If you need to be in python2.7, and you need to make your code work as is… you should use a fork of multiprocessing that can pickle instance methods and can pickle a Pool. Look at pathos.multiprocessing, which you can find in the stackoverflow link you quoted in the post above.Macron
More specifically, this link shows how instance methods in 2.x can be trivially serialized in a Pool: https://mcmap.net/q/83399/-can-39-t-pickle-lt-type-39-instancemethod-39-gt-when-using-multiprocessing-pool-mapMacron
Does it have to be instance method? Are you able to use classmethod? I tried it and worked fine for me.Tsosie
this is not the case for python 3 afaikCockhorse
C
43

The pickle module normally can't pickle instance methods:

>>> import pickle
>>> class A(object):
...  def z(self): print "hi"
... 
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects

However, the multiprocessing module has a custom Pickler that adds some code to enable this feature:

#
# Try making some callable types picklable
#

from pickle import Pickler
class ForkingPickler(Pickler):
    dispatch = Pickler.dispatch.copy()

    @classmethod
    def register(cls, type, reduce):
        def dispatcher(self, obj):
            rv = reduce(obj)
            self.save_reduce(obj=obj, *rv)
        cls.dispatch[type] = dispatcher

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)

You can replicate this using the copy_reg module to see it work for yourself:

>>> import copy_reg
>>> def _reduce_method(m):
...     if m.im_self is None:
...         return getattr, (m.im_class, m.im_func.func_name)
...     else:
...         return getattr, (m.im_self, m.im_func.func_name)
... 
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n."

When you use Process.start to spawn a new process on Windows, it pickles all the parameters you passed to the child process using this custom ForkingPickler:

#
# Windows
#

else:
    # snip...
    from pickle import load, HIGHEST_PROTOCOL

    def dump(obj, file, protocol=None):
        ForkingPickler(file, protocol).dump(obj)

    #
    # We define a Popen class similar to the one from subprocess, but
    # whose constructor takes a process object as its argument.
    #

    class Popen(object):
        '''
        Start a subprocess to run the code of a process object
        '''
        _tls = thread._local()

        def __init__(self, process_obj):
            # create pipe for communication with child
            rfd, wfd = os.pipe()

            # get handle for read end of the pipe and make it inheritable
            ...
            # start process
            ...

            # set attributes of self
            ...

            # send information to child
            prep_data = get_preparation_data(process_obj._name)
            to_child = os.fdopen(wfd, 'wb')
            Popen._tls.process_handle = int(hp)
            try:
                dump(prep_data, to_child, HIGHEST_PROTOCOL)
                dump(process_obj, to_child, HIGHEST_PROTOCOL)
            finally:
                del Popen._tls.process_handle
                to_child.close()

Note the "send information to the child" section. It's using the dump function, which uses ForkingPickler to pickle the data, which means your instance method can be pickled.

Now, when you use methods on multiprocessing.Pool to send a method to a child process, it's using a multiprocessing.Pipe to pickle the data. In Python 2.7, multiprocessing.Pipe is implemented in C, and calls pickle_dumps directly, so it doesn't take advantage of the ForkingPickler. That means pickling the instance method doesn't work.

However, if you use copy_reg to register the instancemethod type, rather than a custom Pickler, all attempts at pickling will be affected. So you can use that to enable pickling instance methods, even via Pool:

import multiprocessing
import copy_reg
import types

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)

def test1():
    print("Hello, world 1")

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print(answer)
        print
        for answer in pool.imap(self.square, range(10)):
            print(answer)

    def test2(self):
        print("Hello, world 2")

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

Output:

Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
 1GOT (0, 5, (True, 6))

GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
 GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10

GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
 GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
 GOT (1, 6, (True, 36))
36
 GOT (1, 7, (True, 49))
49
 GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None

Also note that in Python 3.x, pickle can pickle instance method types natively, so none of this stuff matters any more. :)

Cardoon answered 5/12, 2014 at 16:19 Comment(1)
Thanks for the suggestion; I'm surprised that the multiprocessing module doesn't implement this already. Your exact solution doesn't work for me because it involves pickling the instance that the method is bound to, which causes other problems, but it pointed me in the right direction. I am instead defining the methods to be run during multiprocessing on the top level of a module to circumvent both problems and get the behavior I want.Valedictorian
J
12

Here's an alternative that I use sometimes, and it works in Python2.x:

You can create a top-level "alias" of sorts to instance methods, that accept an object whose instance methods you want to run in a pool, and have it call the instance methods for you:

import functools
import multiprocessing

def _instance_method_alias(obj, arg):
    """
    Alias for instance method that allows the method to be called in a 
    multiprocessing pool
    """
    obj.instance_method(arg)
    return

class MyClass(object):
    """
    Our custom class whose instance methods we want to be able to use in a 
    multiprocessing pool
    """

    def __init__(self):
        self.my_string = "From MyClass: {}"

    def instance_method(self, arg):
        """
        Some arbitrary instance method
        """

        print(self.my_string.format(arg))
        return

# create an object of MyClass
obj = MyClass()

# use functools.partial to create a new method that always has the 
# MyClass object passed as its first argument
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj)

# create our list of things we will use the pool to map
l = [1,2,3]

# create the pool of workers
pool = multiprocessing.Pool()

# call pool.map, passing it the newly created function
pool.map(_bound_instance_method_alias, l)

# cleanup
pool.close()
pool.join()

This code produces this output:

From MyClass: 1
From MyClass: 2
From MyClass: 3

One limitation is that you can't use this for methods that modify the object. Each process gets a copy of the object it is calling the methods on, so changes won't be propagated back to the main process. If you don't need to modify the object from the methods you're calling though, this can be a simple solution.

Jannette answered 26/4, 2015 at 4:9 Comment(2)
Thanks for the post, makes sense to me after diving into pickling etc and this works for me. Python 3 will (eventually) bridge this gap. Cheers!Eiser
Worked, as opposed to the currently accepted answer.Providential
S
6

Here is a easier way work in Python 2, just wrap the original instance method. Works well on macOS and Linux, not work on Windows, tested Python 2.7

from multiprocessing import Pool

class Person(object):
    def __init__(self):
        self.name = 'Weizhong Tu'

    def calc(self, x):
        print self.name
        return x ** 5


def func(x, p=Person()):
    return p.calc(x)


pool = Pool()
print pool.map(func, range(10))
Salinometer answered 23/2, 2016 at 1:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.