Python Multiprocessing concurrency using Manager, Pool and a shared list not working
Asked Answered
G

2

9

I am learning python multiprocessing, and I am trying to use this feature to populate a list with all the files present in an os. However, the code that I wrote is executing sequentially only.

#!/usr/bin/python
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
manager = multiprocessing.Manager()
files = manager.list()


def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
      for x in range(len(tld))]

for i in mp:
    i.start()
    i.join()
print len(files)

When I checked the process tree, I can see only a single chile processes spawned. (man pstree says {} denotes the child process spawned by the parent.)

---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
                               `-python(12750)`

What I was looking for was, to spawn a process for each tld directory, populate the shared list files, and that would be around 10-15 processes depending on the number of directories. What am I doing wrong?

EDIT::

I used multiprocessing.Pool to create worker threads, and this time the processes are spawned, but is giving errors when I try to usemultiprocessing.Pool.map(). I was referring to the following code in python docs that shows

from multiprocessing import Pool
def f(x):
return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3])) 

Following that example, I rewrote the code as

import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(processes=len(tld))
print pool
files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)

and it is forking multiple processes.

---bash(10949)---python(12890)-+-python(12967)
                               |-python(12968)
                               |-python(12970)
                               |-python(12971)
                               |-python(12972)
                               ---snip---

But the code is erroring saying

Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

What am I doing wrong here, and why do the get_files() function errors out?

Gig answered 8/10, 2015 at 10:5 Comment(0)
M
8

It's simply because you instantiate your pool before defining the function get_files :

import os
import multiprocessing

tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()

files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here

pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)

The overall idea of a process is that at the instant you start it, you fork the memory of the main process. So any definition done in the main process after the fork will not be in the subprocess.

If you want a shared memory, you can use the threading library, but you will have some issues with it (cf: The global interpreter lock)

Monostome answered 8/10, 2015 at 10:54 Comment(5)
Thank you, that worked. However, I was wondering, since tld was already defined, why was defining pool before the function mattered? There was no reference to the function when defining the pool.Gig
There is one :) in your pool.map. By using pool.map you ask your process to use the function get_files.Monostome
Agreed. But pool.map was defined after the function, eventhough pool was defined before, because while defining pool, it would just be the number of worker processes to be spawned, according to python doc. ` pool = Pool(processes=4) # start 4 worker processes` Please correct me where am I misinterpreting. Once again, Thanks a lot @MonostomeGig
I'm not sure to have understand all the meaning of your sentence (my english need to be improved :)). As far as I understood, you think that Pool only set the number of processes, and do not start the processes. That's not true (and thanks god :D). The overall idea is to start the pool in order to use it after (with, for example, a pool.map). Initialization process is quite long (believe me, I worked on this a lot), and it should be run only once.Monostome
That makes perfect sense. Whatever you lack in English, you make up in Python :) Brilliant. Appreciate all the help. Thank you @Monostome :)Gig
W
1

I ran across this and tried the accepted answer on Python 3.x, it doesn't work for a couple of reasons. Here then is a modified version that does work (as of this writing on Python 3.10.1):

import multiprocessing
import os


def get_files(x, files_):
    proc = multiprocessing.Process()
    for root, dir, file in os.walk(x):
        for name in file:
            full_path = os.path.join(root, name)
            # print(filename"worker:{proc.name} path:{full_path}")
            files_.append(full_path)


if __name__ == '__main__':
    # See https://docs.python.org/3/library/multiprocessing.html
    with multiprocessing.Manager() as manager:
        # The code will count the number of result_files under the specified root:
        root = '/'

        # Create the top level list of folders which will be walked (and result_files counted)
        tld = [os.path.join(os.pathsep, root, filename) for filename in next(os.walk(root))[1]]

        # Creates result list object in the manager, which is passed to the workers to collect results into.
        result_files = manager.list()

        # Create a pool of workers, with the size being equal to the number of top level folders:
        pool = multiprocessing.Pool(processes=len(tld))

        # Use starmap() instead of map() to allow passing multiple arguments (e.g. the folder and the result_files list).
        pool.starmap(get_files, [(folder, result_files) for folder in tld])

        pool.close()
        pool.join()

        # The result, the count of the number of result_files.
        print(len(result_files))
Whiten answered 24/4, 2022 at 17:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.