Unexpected multiprocessing behavior with omitted lock
Asked Answered
C

1

0

I have the following code, all pretty simple:

from multiprocessing import Process, Lock
from multiprocessing.managers import BaseManager


class DatabaseConnection:
    def __init__(self, conn_id):
        self.id = conn_id

    def __repr__(self):
        return f'connection(id={self.id})'

class DatabaseConnectionPool:
    def __init__(self):
        self.mutex = Lock()
        self.max_connections = 10
        self.ready_pool = [DatabaseConnection(i) for i in range(self.max_connections)]
        self.leased_pool = []

    def get(self):
        if not self.ready_pool:
            raise Exception('no connections available')
    
        conn = self.ready_pool[-1]
        self.ready_pool.pop()
        self.leased_pool.append(conn)

        return conn


class Manager(BaseManager): pass

Manager.register('DatabaseConnectionPool', DatabaseConnectionPool)

def proc(pool):
    try:
        conn = pool.get()
        print(f'leased {conn}')
    except:
        print(f'failed to lease connection')

if __name__ == '__main__':

    manager = Manager()
    manager.start()
    pool = manager.DatabaseConnectionPool()

    procs = []
    for _ in range(11):
        p = Process(target=proc, args=(pool,))
        procs.append(p)
        p.start()

    for p in procs:
        p.join()

when I run this, I get:

leased connection(id=9)
leased connection(id=8)
leased connection(id=7)
leased connection(id=6)
leased connection(id=5)
leased connection(id=4)
leased connection(id=3)
leased connection(id=2)
leased connection(id=1)
leased connection(id=0)
failed to lease connection

I am essentially creating a DatabaseConnectionPool class shared across a bunch of processes, then having each process retrieve a DatabaseConnection object from the pool, expecting this to cause a race condition but I can't seem to provoke one here.

Specifically, I would expect the line conn = self.ready_pool[-1] to result in a race condition such that some processes return connections with duplicated ids since I don't have a mutex around the DatabaseConnectionPool.Get() method.

Am I understanding something wrong here? Since the DatabaseConnectionPool is shared, each process is pulling from the same list and the following two lines are not atomic (purposefully) as far as I know.

conn = self.ready_pool[-1]
self.ready_pool.pop()
Cory answered 16/9, 2022 at 6:38 Comment(0)
B
2

TLDR; there is indeed a race condition, but the complexity (overhead) of Manager relative to the speed of your example function get makes it extremely difficult to trigger.

"Since the DatabaseConnectionPool is shared": it's not. You only share proxies to this object. When you call manager.start(), a new process is started which will hold all the actual instances that are "managed" by the manager. Calling your registered constructor: manager.DatabaseConnectionPool() will return a proxy to the instance which is created in the manager process.

These proxies are picklable, and therefore can easily be sent to other processes. When you call a method of one of these proxies, the method name and arguments are pickled and sent to the manager process where the call is actually performed, and then the results are pickled and sent back (remote procedure call).

On the server side, a single thread continuously waits on new connections with a multiprocessing.connection.Listener. For each incoming connection (assumed to be a single RPC), a new thread is created to handle and respond to the requested procedure call. In this way, any method that is called on your class instance must be thread safe (though not actually process safe), as any other method (that is called remotely) may be called at the same time in another thread.

Now consider what all would have to happen in order to trigger a race in your example:

  1. New process is created (takes quite a while due to import __main__ and pickle/unpickle process object)
  2. Establish a multiprocessing.connection.Connection in order to perform RPC (this also takes quite a while due to required OS calls, as well as authkey handshake)
  3. pickle / unpickle the RPC details, and spawn a new thread to perform the call.
  4. RPC is performed (.get call will be comparatively very fast to the rest here)
  5. RPC result is pickled and sent back to proxy owner
  • rest of the program....

1, 2, and 3 of two separate processes would all have to complete at very nearly exactly the same time, and each is a fairly long operation with a good amount of variability based on how busy the OS is (the OS must allocate things like pipes, processes, sockets, etc..). Even if the average time to complete is similar, if the variability is significantly greater than the time it takes to perform the get call, it will be very unlikely that two calls will line up. If instead get takes a rather long time, the likelihood that any two get calls may be running concurrently rises. For example, adding a bit of random delay in the get method easily breaks the example:

    ...
        conn = self.ready_pool[-1]
        sleep(random()*.1)
        self.ready_pool.pop()
leased connection(id=9)
leased connection(id=8)
leased connection(id=8)
leased connection(id=7)
leased connection(id=7)
leased connection(id=8)
leased connection(id=9)
leased connection(id=8)
leased connection(id=1)
leased connection(id=1)
failed to lease connection
>>>
Blaney answered 16/9, 2022 at 15:41 Comment(1)
Ahh this makes complete sense. Thanks a lot!Cory

© 2022 - 2024 — McMap. All rights reserved.