Python sharing a lock between processes
Asked Answered
F

2

88

I am attempting to use a partial function so that pool.map() can target a function that has more than one parameter (in this case a Lock() object).

Here is example code (taken from an answer to a previous question of mine):

from functools import partial

def target(lock, iterable_item):
    for item in items:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    l = multiprocessing.Lock()
    func = partial(target, l)
    pool.map(func, iterable)
    pool.close()
    pool.join()

However when I run this code, I get the error:

Runtime Error: Lock objects should only be shared between processes through inheritance.

What am I missing here? How can I share the lock between my subprocesses?

Fash answered 28/8, 2014 at 20:43 Comment(1)
There's another question about this same issue, though their particular error is different - Trouble using a lock with multiprocessing.Pool: pickling errorCuprite
T
164

You can't pass normal multiprocessing.Lock objects to Pool methods, because they can't be pickled. There are two ways to get around this. One is to create Manager() and pass a Manager.Lock():

def main():
    iterable = [1, 2, 3, 4, 5]
    pool = multiprocessing.Pool()
    m = multiprocessing.Manager()
    l = m.Lock()
    func = partial(target, l)
    pool.map(func, iterable)
    pool.close()
    pool.join()

This is a little bit heavyweight, though; using a Manager requires spawning another process to host the Manager server. And all calls to acquire/release the lock have to be sent to that server via IPC.

The other option is to pass the regular multiprocessing.Lock() at Pool creation time, using the initializer kwarg. This will make your lock instance global in all the child workers:

def target(iterable_item):
    for item in items:
        # Do cool stuff
        if (... some condition here ...):
            lock.acquire()
            # Write to stdout or logfile, etc.
            lock.release()
def init(l):
    global lock
    lock = l

def main():
    iterable = [1, 2, 3, 4, 5]
    l = multiprocessing.Lock()
    pool = multiprocessing.Pool(initializer=init, initargs=(l,))
    pool.map(target, iterable)
    pool.close()
    pool.join()

The second solution has the side-effect of no longer requiring partial.

Trentontrepan answered 28/8, 2014 at 21:29 Comment(13)
Well thank you again, sir. This looks like exactly what I need. Really appreciate the continued help! Other options looked super involved. I will go with the initializer function to share the global Lock.Fash
This worked great. I also put a Queue into the init to save passing that in each call.Particolored
@Trentontrepan thanks a lot for your answer, I too had the same query and this one solves it perfectly, however I have another query that why is this approach not used frequently to share state between processes rather than doing so via a Manager object which has its own overhead of running a server processes and proxied access ?Antione
@Antione multiprocessing.Lock is a process-safe object, so you can pass it directly to child processes and safely use it across all of them. However, most mutable Python objects (like list, dict, most user-created classes) are not process safe, so passing them between processes leads to completely distinct copies of the objects being created in each process. In those cases, you need to use multiprocessing.Manager.Trentontrepan
@Trentontrepan thanks, so I have a list of locks and a list of browser objects that I need to share as global variables among processes, how would you suggest doing that ? I tried using Manager object but that gives an error about non-serializability of thread.lock while returning Manager.Lock() objects from the list of locks. Sample code here: codeshare.io/dnhhwAntione
Could you explain why Lock needs to be created as global variable when using Pool, but can be passed as argument when using Process?Coben
@neilxdims With a Process, the Lock is inherited by the child process as it is forked, and passed directly into whatever method you passed as the target of the Process. With a Pool, when you pass arguments to methods like map and apply, the processes are already forked, so inheritance won't work. However the arguments to the Pool's initializer function are inherited, so the Lock can be passed. But, for the methods the pool processes execute to have access to the Lock, it needs to be global; otherwise it will be inaccessible outside the initializer function.Trentontrepan
Why can't you simply declare lock as global and access it from within each pool function?Cathe
@Cathe On Linux platforms, you can do that. On Windows, you can't, because Windows doesn't support forking. You'll end up with a different lock object in each child process.Trentontrepan
For anyone requiring related sample code for joblib's Parallel, see this link: github.com/davidheryanto/etc/blob/master/python-recipes/…Retrochoir
Should you pass the logger to your processes via Pool's initializer as well?Photolithography
When using the second approach, it might be worth explicitly calling multiprocessing.set_start_method('fork') to make it more explicit that sharing locks/barriers/etc without a Manager process is relying on 'nix systems defaulting to fork, especially as the macOS default start method changed in Python 3.8: docs.python.org/3/library/…Piddle
Alternative option to force forking (or an error if it isn't available) that doesn't have a global side effect: mp_ctx = multiprocessing.get_context('fork'). Then use the returned multiprocessing context to create the Pool and Lock/Barrier objects rather than the top level multiprocessing module methods.Piddle
H
2

Here's a version (using Barrier instead of Lock, but you get the idea) which would also work on Windows (where the missing fork is causing additional troubles):

import multiprocessing as mp

def procs(uid_barrier):
    uid, barrier = uid_barrier
    print(uid, 'waiting')
    barrier.wait()
    print(uid, 'past barrier')    

def main():
    N_PROCS = 10
    with mp.Manager() as man:
        barrier = man.Barrier(N_PROCS)
        with mp.Pool(N_PROCS) as p:
            p.map(procs, ((uid, barrier) for uid in range(N_PROCS)))

if __name__ == '__main__':
    mp.freeze_support()
    main()
Hendricks answered 4/9, 2018 at 10:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.