python lock with-statement and timeout
Asked Answered
S

3

27

I am using a Python 3 sequence like this:

lock = threading.Lock()
res = lock.acquire(timeout=10)
if res:
    # do something ....
    lock.release()
else:
    # do something else ...

I would prefer to use a with-statement instead of explicit "acquire" and "release", but I don't know how to get the timeout effect.

Synonym answered 24/5, 2013 at 17:0 Comment(3)
It doesn't look like you can do that.Unlovely
this might helpGavingavini
It does not look like what I want ;-).Synonym
G
25

You can do this pretty easily with a context manager:

import threading
from contextlib import contextmanager

@contextmanager
def acquire_timeout(lock, timeout):
    result = lock.acquire(timeout=timeout)
    try:
        yield result
    finally:
        if result:
            lock.release()


# Usage:
lock = threading.Lock()

with acquire_timeout(lock, 2) as acquired:
    if acquired:
        print('got the lock')
        # do something ....
    else:
        print('timeout: lock not available')
        # do something else ...

*Note: This won't work in Python 2.x since there isn't a timeout argument to Lock.acquire

Gwendolyn answered 28/5, 2013 at 1:0 Comment(3)
this is working for me, thanks. But can you explain me why is it neccesary to use yield when using with in this case?Walczak
@monkjuice Inside a context manager, the yield statement is where control is passed back to the with block provided by the caller. So yield result puts the value of result into the acquired variable and runs the indented block below with acquire_timeout... and continues inside the context manager once it returns.Gwendolyn
The acquire_timeout function in this answer is wrong; it should release the lock in a finally block with the yield result in the try. That way the lock will still be released when an exception is raised out of the code that runs in the context manager.Theological
G
14

Slightly nicer version:

import threading
from contextlib import contextmanager


class TimeoutLock(object):
    def __init__(self):
        self._lock = threading.Lock()

    def acquire(self, blocking=True, timeout=-1):
        return self._lock.acquire(blocking, timeout)

    @contextmanager
    def acquire_timeout(self, timeout):
        result = self._lock.acquire(timeout=timeout)
        yield result
        if result:
            self._lock.release()

    def release(self):
        self._lock.release()

# Usage:
lock = TimeoutLock()

with lock.acquire_timeout(3) as result:
    if result:
        print('got the lock')
        # do something ....
    else:
        print('timeout: lock not available')
        # do something else ...

It appears you can't subclass threading.Lock, so I had to make a wrapper class instead.

Gwendolyn answered 28/5, 2013 at 1:17 Comment(0)
M
0

This is @robble's code. I just added usage/example:

from datetime import datetime
import time
from queue import Queue
from threading import Thread
    
def _log(msg : str):
    print(f"{datetime.utcnow()} {msg}")

import threading
from contextlib import contextmanager
from typing import TypeVar

class TimeoutLock(object):
    def __init__(self, timeout_sec = -1):
        self._lock = threading.Lock()
        self.timeout_sec = timeout_sec

    @contextmanager
    def acquire_timeout(self):
        result = self._lock.acquire(timeout=self.timeout_sec)
        yield result
        if result:
            self._lock.release()

def producer(name, q, delay_sec):
    try:
        i : int = 0
        while True:
            q.put(i)
            _log(f"{name} {i}")
            time.sleep(delay_sec)
            i = i + 1

    except Exception as e:
        err_msg = f"{name} error: {str(e)}"
        _log(err_msg)
        raise

def consumer(name, q, lock, delay_sec):
    while True:
        with lock.acquire_timeout() as acquired:
            if acquired:
                i = q.get()
                _log(f'{name} {i}')
                time.sleep(delay_sec)
            else:
                _log(f"{name} wait timeout'ed")

try:
    q = Queue()
    lock = TimeoutLock(timeout_sec=3)

    consumer1_thread = Thread(target = consumer, args =('consumer1', q, lock, 5 ))
    consumer2_thread = Thread(target = consumer, args =('consumer2', q, lock, 5 ))
    producer1_thread = Thread(target = producer, args =('producer1', q, 1 ))
    producer1_thread.start()
    consumer1_thread.start()
    time.sleep(5)
    consumer2_thread.start()
    

    q.join()
except Exception as e:
        err_msg = f"main thread error: {str(e)}"
        _log(err_msg)
finally:
    _log(f'main thread done!')
Matthew answered 16/9, 2022 at 5:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.