Why I cannot use python module concurrent.futures in class method?
Asked Answered
B

2

12

I want to make my class method runs parallel, but it only produces some kind of error that I can not solve. My code is:

import concurrent.futures as futures

samples = ['asfd', 'zxcv', 'asf', 'qwer']

class test:
    def __init__(self, samples):
        maturedb = {}
        with futures.ProcessPoolExecutor() as exe:
            for samplename, dResult in exe.map(self.make_readdb, samples):
                maturedb[samplename] = dResult
        print(maturedb)

    def make_readdb(self, samplename):
        return samplename, 1

test(samples)

If I run this code in Ubuntu machine, an Error like below occurs:

Traceback (most recent call last):
    File "/usr/lib/python3.2/multiprocessing/queues.py", line 272, in _feedsend(obj)
    _pickle.PicklingError: Can't pickle <class 'method'>: attribute lookup builtins.method failed

The method make_readdb is just simplified to make an example, but it is an bottleneck in real code and I need to make it parallel.

Bouldin answered 2/7, 2013 at 7:29 Comment(2)
your code works on Python 3.3 as isEyeleteer
I tested my codes on Python 3.5.2, and it worked well. Many thanks to all your answers.Bouldin
P
4

From the docs:

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

Try a ThreadPoolExecutor

I looked over your code again, the problem is that the function - make_readdb - is a member of the class test. Can you refactor and pull this function out?

Propertied answered 2/7, 2013 at 7:36 Comment(2)
Multithreading is not helping at all. Since cPython run only one thread at one time, ThreadPoolExecutor makes it even slower. In this case, single thread design took ~600 sec while ThreadPoolExecutor took ~1300 sec.Bouldin
Is your job CPU bound or IO bound? What value did you use for max_workers? In your call to ProcessPoolExecutor, this will default to the number of processors in your machine.Propertied
S
2

self should be passed as an explicit argument, even in multiple processes. like this:

class test:
    def __init__(self, samples):
        maturedb = {}
        with futures.ProcessPoolExecutor() as exe:
            for samplename, dResult in exe.map(test.make_readdb,self, samples):
                maturedb[samplename] = dResult
        print(maturedb)

    def make_readdb(self, samplename):
        return samplename, 1

But only one process will actually run. So this might be a better way to write it: Do not pass self to ProcessPoolExecutor in the class

class test:
    def __init__(self, samples):
        maturedb = {}
        with futures.ProcessPoolExecutor() as exe:
            for samplename, dResult in exe.map(test.make_readdb, samples):
                maturedb[samplename] = dResult
        print(maturedb)

    @staticmethod
    def make_readdb(samplename):
        return samplename, 1
Saffian answered 20/12, 2021 at 1:11 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.