How to use threading.Lock in async function while object can be accessed from multiple thread
Asked Answered
F

2

5

I want to use threading.Lock() in a async function, asyncio.Lock() is not thread safe so I cannot do with await asyncio.Lock():. The reason I need to use threading.Lock() is because this object may be accessed by multiple treads, hence it is used in a web app and the server running it can spin up many threads. What is an effective way of doing so ? So far I've tried a simple function that uses the lock:

1)

async def main():
    with await threading.Lock():
        a = 6
    return a

TypeError: object _thread.lock can't be used in 'await' expression
async def main():
    async with threading.Lock():
            a = 1564666546
    return a

AttributeError: __aexit__
Footboy answered 14/8, 2020 at 21:49 Comment(1)
with await is syntactically valid, but is not how with is normally used in async context; you'd use async with.Perlite
P
16

You can't pass a threading.Lock to async with because it is not designed for async usage, it's a blocking primitive. More importantly, async with threading.Lock() doesn't make sense even if it did work because you would be acquiring a brand new lock, which would always succeed. For locking to work, you must share a lock between multiple threads, e.g. by storing it in an object's attribute. The rest of this answer will assume that you have a threading.Lock shared between threads.

Since threading.Lock always blocks, the only way you can use it from asyncio is to wait to acquire it in a separate thread, and suspend the execution of the current coroutine until the lock is successfully acquired. The functionality of running a blocking function in a different thread is already covered by the run_in_executor event loop method, which you can apply:

_pool = concurrent.futures.ThreadPoolExecutor()

async def work(lock, other_args...):
    # lock is a threading.Lock shared between threads

    loop = asyncio.get_event_loop()
    # Acquire the lock in a worker thread, suspending us while waiting.
    await loop.run_in_executor(_pool, lock.acquire)

    ... access the object with the lock held ...

    # Can release directly because release() doesn't block and a
    # threading.Lock can be released from any thread.
    lock.release()

You can make this more elegant to use (and exception-safe) by creating an async context manager:

_pool = concurrent.futures.ThreadPoolExecutor()

@contextlib.asynccontextmanager
async def async_lock(lock):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(_pool, lock.acquire)
    try:
        yield  # the lock is held
    finally:
        lock.release()

Then you can use it as follows:

# lock is a threading.Lock shared between threads
async with async_lock(lock):
    ... access the object with the lock held ...

Of course, in code not running in the asyncio event loop you'd just acquire the lock directly:

# lock is a threading.Lock shared between threads
with lock:
   ... access the object ...

Note that we use a separate thread pool instead of passing None to run_in_executor() to reuse the default pool. This is to avoid deadlock in situations where the function that holds the lock itself needs access to the thread pool for other uses of run_in_executor(). By keeping the thread pool private, we avoid the possibility of deadlocking through the use of the same pool by others.

Perlite answered 15/8, 2020 at 10:50 Comment(18)
Hi thank you for your kind answer, just wanted to know if I acquire the lock directly (as if sync code) without running it in a separated thread, would it make any difference ?Footboy
@Footboy Yes, it would block the entire event loops (all coroutines and callbacks) until the lock is acquired. Such an approach might make sense if the lock is guaranteed to only be held for a very short time, but I wouldn't recommend it.Perlite
the lock is for accessing a rate limiting value, check if it is allowed to make request, and do the increase properly. If I acquire the lock in another thread, does all the operations between the lock acquired and lock release will run in another thread too ? Or the only thing that runs in another thread is just acquiring the lock and releasing it?Footboy
@Footboy The only thing run in another thread is acquiring the lock, everything else runs in the asyncio thread as usual. (I've now edited the answer to release the lock in the current thread, which is ok because release doesn't block.) Just look at the first code snippet - only lock.acquire is passed to run_in_executor, nothing else. The second snippet offers a nicer API for use with async with, but is functionally equivalent.Perlite
I am getting into the context of it now, if I don't acquire the lock in another thread, there could be a lock already running and while waiting for that lock to release, to acquire it itself, it can block the entire loop. Am I correct ?Footboy
@Footboy Yes. The version doing it in another thread actually suspends the current coroutine, allowing the event loop to proceed while waiting for the lock.Perlite
This approach is correct but it's still prone to a deadlock situation when using the using the default ThreadPoolExecutor. If there will be another call to run_in_executor() inside the blocking part it will deadlock. For example getaddrinfo() in asyncio.open_connections() uses the default ThreadPoolExecutor. So when you getaddrinfo() will schedule it's task on a blocking thread, the blocking task will never finish. A solution to this is to not use the default ThreadPoolExecutor. like so: LOCKS_THREAD_POOL_EXECUTOR = ThreadPoolExecutor(1, thread_name_prefix="async_lock")Sauer
@Sauer I think I understand what you're getting at in general, but I'm not sure I see how a deadlock could occur in this code. Why would another call to run_in_executor() happen "insode the blocking part"? If by "blocking part" you're referring to the blocking function like lock.acquire(), that code executes legacy blocking or CPU-bound code which runs no async. If you're referring to the part inside async with, that part is normal async that doesn't take up a slot in the thread pool (because the lock has already been acquired by the time it starts running). Can you clarify?Perlite
Sorry, horrible formatting :(. Think of this scenario: Let's assume we have 2 threads. async tasks running on thread 1. 1. async_task1 acquire the lock (success not blocking) 2. hits await (open_connection() for instance) 3. async_task2 tries to acquire the lock - blocking on a new thread 2 provided by executor. 4. async_task2 hits await 5. async_task1 reach another run_in_executor() - schedule task on thread 2. 6. deadlock. - async_task1 is blocked by other and won't finish, while async_task2 is waiting for the lock.Sauer
@Sauer I now see what you mean: a task waiting for the lock takes up a slot in the thread pool. If the task that holds the lock needs the slot for its own work, we have a deadlock. Of course, that particular scenario assumes that there is only one available slot, but even with multiple slots, the deadlock could still occur with multiple tasks. I've now amended the answer accordingly.Perlite
If 50 async operations attempt to access a locked resource, I think that a ThreadPoolExecutor should create 50 threads, with each one waiting for access to the resource. However, if the max_workers parameter is set to 50, the event loop will hang when the 51st request for the resource arrives. In that case, either an infinite slot pool is needed or a new pool should be created each time. Am I correct?Emancipation
@Emancipation While ThreadPoolExecutor doesn't support "infinite" operation directly, it does create threads lazily, so I suppose you could give it a very large number of threads as maximum. (Though once created, it will keep them in the pool, which might not be what you want.) But I don't see a reason for that. There is no reason to create a huge number of threads that will just wait on the mutex, it's perfectly fine for them to wait in a queue instead. The important part is to avoid a deadlock, and that's achieved by using a pool separate from the one used by asyncio itself.Perlite
In the code above, _pool has a limitation of max_workers = number of cores * 5, which means that with two cores, _pool will not be able to create the 11th thread and will hang. Additionally, in the case of 10 resource requests, 10 threads will be created, but they will be kept in the pool. Therefore, it makes more sense to create a new pool each time, doesn't it? with concurrent.futures.ThreadPoolExecutor() as pool:Emancipation
@Emancipation It won't "hang" (wait indefinitely), it will just wait for its turn to get a chance to acquire the mutex. Or am I misunderstanding you somehow?Perlite
You are correct, it will wait but not hang indefinitely. However, if I want to avoid waiting altogether, would the solution be to use a new pool every time, right? Or is there another solution?Emancipation
@Emancipation There is no "avoiding waiting" here. Out of N (where N==5*num_cores) workers in the pool, only 1 is actually working, and the remaining N-1 are waiting for the mutex. Everyone else is waiting to get a chance to wait for the mutex, so they're no worse off. Maybe you have some other use scenario where there is no mutex, but then it is different from the situation in this answer.Perlite
In my case, the resource is occupied (write) by another thread unrelated to the main one, while N threads are waiting (for read), allowing the main thread to perform other tasks. However, if there is an N+1 waiting, it will pause the main thread, and it won't be able to perform its primary work for some time.Emancipation
@Emancipation What you describe sounds like great material for a separate question because it is a different scenario than the one in this answer.Perlite
E
0

You can use aiologic.Lock (I'm the creator of aiologic), which supports both asynchronous and synchronous interfaces. It can be owned by both threads and tasks, and its implementation doesn't use an executor, so it doesn't have the disadvantages described in the other answer.

Asynchronous interface:

async with lock:
    ...
await lock.async_acquire()

try:
    ...
finally:
    lock.async_release()

Synchronous interface:

with lock:
    ...
lock.green_acquire()

try:
    ...
finally:
    lock.green_release()
Exacerbate answered 29/9 at 20:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.