How to synchronize between multiple async processes in Python?
Asked Answered
S

2

5

I have an async http webs service using fastapi. I am running multiple instances of the same service on the server on a different port and I have an nginx server in front so I can utilise them all. I have a particular resource I need to protect that only one client is accessing it.

@app.get("/do_something")
async def do_something():
     critical_section_here()

I tried to protect this critical section using a file lock like this:

@app.get("/do_something")
async def do_something():
    with FileLock("dosomething.lock"):
        critical_section()

This will prevent multiple processes to enter the critical section at the same time. But what I found is that this will actually dead lock. Think about the following event:

  1. client 1 connected to port 8000 and enter the critical section
  2. while client 1 is still using the resource client 2 is routed to the same port 8000 and then it will try to acquire the file lock, it cannot, so it will keep trying and this will block the execution of client 1 and client 1 will never be able to release the filelock and this means not only this process is locked every other server instance will be locked as well.

Is there a way for me to coordinate these processes so that only one of them access the critical section? I thought about adding a timeout to the filelock but I really don't want to reject user, I just want to wait until it's his/her turn to enter the critical section.

Sludge answered 18/11, 2020 at 18:12 Comment(0)
S
5

You can try something like this:

import fcntl

from contextlib import asynccontextmanager

from fastapi import FastAPI

app = FastAPI()


def acquire_lock():
    f = open("/tmp/test.lock", "w")
    fcntl.flock(f, fcntl.LOCK_EX)
    return f


@asynccontextmanager
async def lock():
    loop = asyncio.get_running_loop()
    f = await loop.run_in_executor(None, acquire_lock)
    try:
        yield
    finally:
        f.close()


@app.get("/test/")
async def test():
    async with lock():
        print("Enter critical section")
        await asyncio.sleep(5)
        print("End critical section")

It will basically serialize all your requests.

Skepticism answered 18/11, 2020 at 21:37 Comment(1)
This is sadly the only way to do it. It's a shame that it has to involve threads, but there is simply no way to wait for flock() without threads or busy-looping. (The non-blocking flock() can only be used for testing; you cannot associate the lock with a file descriptor in order to integrate it with the event loop.)Brokerage
B
0

You could use aioredlock.

It allows you to create distributed locks between workers (processes). For more information about its usage, follow the link above.

The redlock algorithm is a distributed lock implementation for Redis. There are many implementations of it in several languages. In this case, this is the asyncio compatible implementation for python 3.5+.

Example of usage:

# Or you can use the lock as async context manager:
try:
    async with await lock_manager.lock("resource_name") as lock:
        assert lock.valid is True
        # Do your stuff having the lock
        await lock.extend()  # alias for lock_manager.extend(lock)
        # Do more stuff having the lock
    assert lock.valid is False # lock will be released by context manager
except LockError:
    print('Lock not acquired')
    raise
Barrios answered 19/11, 2020 at 12:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.