How to implement a Lock with a timeout in Python 2.7
Asked Answered
H

7

24

Is there a way to implement a lock in Python for multithreading purposes whose acquire method can have an arbitrary timeout? The only working solutions I found so far use polling, which

  • I find inelegant and inefficient
  • Doesn't preserve the bounded waiting / progress guarantee of the lock as a solution to the critical section problem

Is there a better way to implement this?

Harris answered 5/12, 2011 at 22:12 Comment(2)
You might want a threading.Condition object (see docs.python.org/library/threading.html#threading.Condition). The Condition object wraps a lock and has a wait method with a timeout. The wait is interrupted when it times out or when notify is called on the Condition object.Obstetric
I'm not sure if that is a solution to my problem. According to the docs and from a quick glance at the source code, I figured that threading.Condition just wraps a threading.RLock object. Before calling wait you need to acquire the underlying Lock. Apart from that, it seems that wait is implemented using a busy loop. Am I missing something here?Harris
V
23

to elaborate on Steven's comment suggestion:

import threading
import time

lock = threading.Lock()
cond = threading.Condition(threading.Lock())

def waitLock(timeout):
    with cond:
        current_time = start_time = time.time()
        while current_time < start_time + timeout:
            if lock.acquire(False):
                return True
            else:
                cond.wait(timeout - current_time + start_time)
                current_time = time.time()
    return False

Things to notice:

  • there are two threading.Lock() objects, one is internal to the threading.Condition().
  • when manipulating cond, it's lock is acquired; the wait() operation unlocks it, though, so any number of threads can watch it.
  • the wait is embedded inside a for loop that keeps track of the time. threading.Condition can become notified for reasons other than timeouts, so you still need to track the time if you really want it to expire.
  • even with the condition, you still 'poll' the real lock, because its possible for more than one thread to wake and race for the lock. if the lock.acquire fails, the loop returns to waiting.
  • callers of this waitLock function should follow a lock.release() with a cond.notify() so that other threads waiting on it are notified that they should retry aquiring the lock. This is not shown in the example.
Vladimir answered 5/12, 2011 at 22:50 Comment(5)
Thanks very much, this is okay as polling seems to be the only option. Should have read the documentation more thoroughly, it seems :)Harris
You could wrap this all up in a class to implement a TimeoutLock, so you don't rely on callers using both the lock and the condition variable correctly. I'm kind of amused though at using a condition variable (which is implemented with a lock) to implement a lock. :)Titustityus
@Ben: as far as I know, this is not strictly necessary from the perspective of what's available from OS provided locking primitives; nearly all OSes provide a regular mutex with a timeout acquire capability. Building one from 'scratch' as it were is mostly an artifact of the threading module itself, which has just one timeout capable primitive. since a condition is not itself anything like a mutex, it instead has to manage access to a mutex.Vladimir
@TokenMacGuy: Oh yes, I didn't mean that as any sort of criticism. It just entertained me.Titustityus
For those who wonder what lock.acquire(False) does? https://mcmap.net/q/540231/-lock-acquire-false-doesParsimonious
M
6

My version using thread safe queues http://docs.python.org/2/library/queue.html and their put/get methods that supports timeout.

Until now is working fine, but if someone can do a peer review on it I'll be grateful.

"""
Thread-safe lock mechanism with timeout support module.
"""

from threading import ThreadError, current_thread
from Queue import Queue, Full, Empty


class TimeoutLock(object):
    """
    Thread-safe lock mechanism with timeout support.
    """

    def __init__(self, mutex=True):
        """
        Constructor.
        Mutex parameter specifies if the lock should behave like a Mutex, and
        thus use the concept of thread ownership.
        """
        self._queue = Queue(maxsize=1)
        self._owner = None
        self._mutex = mutex

    def acquire(self, timeout=0):
        """
        Acquire the lock.
        Returns True if the lock was succesfully acquired, False otherwise.

        Timeout:
        - < 0 : Wait forever.
        -   0 : No wait.
        - > 0 : Wait x seconds.
        """
        th = current_thread()
        try:
            self._queue.put(
                th, block=(timeout != 0),
                timeout=(None if timeout < 0 else timeout)
            )
        except Full:
            return False

        self._owner = th
        return True

    def release(self):
        """
        Release the lock.
        If the lock is configured as a Mutex, only the owner thread can release
        the lock. If another thread attempts to release the lock a
        ThreadException is raised.
        """
        th = current_thread()
        if self._mutex and th != self._owner:
            raise ThreadError('This lock isn\'t owned by this thread.')

        self._owner = None
        try:
            self._queue.get(False)
            return True
        except Empty:
            raise ThreadError('This lock was released already.')
Macedoine answered 4/1, 2014 at 0:26 Comment(1)
The solution seems to be correct but Python queue is implemented with two Condition objects (see github.com/python/cpython/blob/master/Lib/queue.py) so @Vladimir answer (https://mcmap.net/q/534622/-how-to-implement-a-lock-with-a-timeout-in-python-2-7) is slightly better.Apices
A
3

If somebody needs Python >= 3.2 API:

import threading
import time


class Lock(object):
    _lock_class = threading.Lock

    def __init__(self):
        self._lock = self._lock_class()
        self._cond = threading.Condition(threading.Lock())

    def acquire(self, blocking=True, timeout=-1):
        if not blocking or timeout == 0:
            return self._lock.acquire(False)
        cond = self._cond
        lock = self._lock
        if timeout < 0:
            with cond:
                while True:
                    if lock.acquire(False):
                        return True
                    else:
                        cond.wait()
        else:
            with cond:
                current_time = time.time()
                stop_time = current_time + timeout
                while current_time < stop_time:
                    if lock.acquire(False):
                        return True
                    else:
                        cond.wait(stop_time - current_time)
                        current_time = time.time()
                return False

    def release(self):
        with self._cond:
            self._lock.release()
            self._cond.notify()

    __enter__ = acquire

    def __exit__(self, t, v, tb):
        self.release()


class RLock(Lock):
    _lock_class = threading.RLock
Agonistic answered 19/2, 2019 at 19:30 Comment(0)
T
1

I'm doubtful that this can be done.

If you want to implement this without any sort of polling, then you need the OS to know that the thread is blocked, and the OS needs to be aware of the timeout, in order to unblock the thread after a while. For that, support needs to already exist in the OS; you can't implement this at the Python level.

(You could have the thread blocked at either OS-level or app-level, and have a mechanism whereby it can be woken up by a different thread at the appropriate time, but then you need that other thread to be effectively polling)

In general you don't have a truly bounded waiting/progress guarantee of the lock anyway, as your thread will have to wait an unbounded time for a context switch to take place for it to notice that it's been unblocked. So unless you can put an upper bound on the amount of CPU contention going on, you're not going to be able to use the timeout to hit any hard real-time deadlines. But you probably don't need that, otherwise you wouldn't dream of using locks implemented in Python.


Due to the Python GIL (Global Interpreter Lock), those polling-based solutions probably aren't as inefficient or as badly unbounded as you think (depending on how they're implemented) (and assuming you're using either CPython or PyPy).

There's only ever one thread running at a time, and by definition there's another thread that you want to run (the one that holds the lock you're waiting for). The GIL is held for a while by one thread to execute a bunch of bytecodes, then dropped and reacquired to give someone else a chance at it. So if the blocked-with-timeout thread is just in a loop checking the time and yielding to other threads, it will only wake up every so often when it gets the GIL and then almost immediately drop it back to someone else and block on the GIL again. Because this thread could only ever wake up when it gets a turn at the GIL anyway, it will also do this check as soon after the timeout expires as it would be able to resume execution even if the timeout was magically perfect.

The only time this will cause a lot of inefficiency is if your thread is blocked waiting for the lock-holding thread, which is blocked waiting for something that can't be caused by another Python thread (say, blocked on IO), and there are no other runnable Python threads. Then your polling timeout really will just sit there checking the time repeatedly, which could be bad if you expect this situation to happen for long periods of time.

Titustityus answered 5/12, 2011 at 22:43 Comment(2)
Maybe I need to clarify the progress/bounded waiting thing a bit. I actually meant that if the threads try to acquire the lock non-blocking, it is more or less random which of the threads can execute their critical section next. Your information about the GIL is very useful and makes me confident that a polling-based solution is probably as good as it can get without too much effort. Thanks.Harris
I'm accepting Token's answer because I think it might be most interesting for others, although yours was more directly related to the original question.Harris
C
1

Based on the already accepted answer and this idea for context hybrid manager/decorators I implemented a timeout lock (works in Python 2.7) that has both a context manager and a decorator interface. Additionally, when used as a context manager it supports named locks, so tasks can wait for a lock of a given name, rather than using a single global lock:

import logging
import threading
import time
from functools import wraps
import sys

logger = logging.getLogger(__name__)
# use a global condition for safe manipulating of the LOCKS and
# LOCK_CONDITIONS dictionary in non-atomic operations
GLOBAL_COND = threading.Condition(threading.Lock())
LOCKS = {}
LOCK_CONDITIONS = {}

class ContextDecorator(object):
    def __enter__(self):
        return self

    def __exit__(self, typ, val, traceback):
        pass

    def __call__(self, f):
        @wraps(f)
        def wrapper(*args, **kw):
            with self as acquired:
                if acquired:
                    return f(*args, **kw)
        return wrapper

class TimeoutLock(ContextDecorator):
    def __init__(self, timeout, name=None):
        self.name = name
        self.timeout = timeout

    def __enter__(self):
        with GLOBAL_COND:
            self.cond = LOCK_CONDITIONS.get(self.name, None)
            if self.cond is None:
                self.cond = threading.Condition(threading.Lock())
                LOCK_CONDITIONS[self.name] = self.cond
                LOCKS[self.name] = threading.Lock()
            self.lock = LOCKS[self.name]

        self.cond.acquire()
        current_time = start_time = time.time()
        while current_time < start_time + self.timeout:
            if self.lock.acquire(False):
                self.cond.release()
                return True
            else:
                logger.debug('Waiting')
                self.cond.wait(
                    self.timeout - current_time + start_time)
                logger.debug('Woke up')
                current_time = time.time()
        logger.info('Timed out')
        self.cond.release()
        return False

    def __exit__(self, typ, val, traceback):
        if self.lock.locked():
            self.lock.release()
            with self.cond:
                self.cond.notify_all()


############################# DEMO ###############################
timeout = 4
sleep_interval = 1

handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
    fmt=('[%(asctime)s] %(name)s '
         '(%(threadName)s): %(message)s'),
    datefmt='%d/%b/%Y %H:%M:%S'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)

def ascontext(i, name):
    with TimeoutLock(timeout, name=name) as acquired:
        if acquired:
            task()

# this will use a single lock, None
@TimeoutLock(timeout)
def asdecorator(i, name):
    task()

def task():
    logger.info('Acquired')
    time.sleep(sleep_interval)
    logger.info('Released')

def run(target):
    threads = []
    for i, name in enumerate(
            ['foo', 'bar', 'foo', 'baz', 'bar', 'foo']):
        thread = threading.Thread(
            target=target,
            name='{}.{}'.format(name, i),
            args=(i, name))
        threads.append(thread)
        thread.start()
    for i, t in enumerate(threads):
        t.join()


print('---- As context manager ----')
# foo, bar and baz can run concurrently
run(ascontext)
print('---- As decorator ----')
run(asdecorator)
Canaletto answered 2/6, 2020 at 22:16 Comment(0)
C
0

I took SingleNegationElimination's answer and created a class with can be used in a with-statement the following way:

global_lock = timeout_lock()
...

with timeout_lock(owner='task_name', lock=global_lock):
    do()
    some.stuff()

This way it will only warn if the timeout expired (default=1s) and show the owner of the lock for investigation.

Use it this way and an exception will be thrown after the timeout:

with timeout_lock(owner='task_name', lock=global_lock, raise_on_timeout=True):
    do()
    some.stuff()

The timeout_lock.lock() instance has to be created once and can be used across threads.

Here is the class - it works for me but feel free to comment and improve:

class timeout_lock:
    ''' taken from https://mcmap.net/q/534622/-how-to-implement-a-lock-with-a-timeout-in-python-2-7
    '''
    class lock:
        def __init__(self):
            self.owner = None
            self.lock = threading.Lock()
            self.cond = threading.Condition()

        def _release(self):
            self.owner = None
            self.lock.release()
            with self.cond:
                self.cond.notify()

    def __init__(self, owner, lock, timeout=1, raise_on_timeout=False):
        self._owner = owner
        self._lock = lock
        self._timeout = timeout
        self._raise_on_timeout = raise_on_timeout

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, type, value, tb):
        ''' will only be called if __enter__ did not raise '''
        self.release()

    def acquire(self):
        if self._raise_on_timeout:
            if not self._waitLock():
                raise RuntimeError('"%s" could not aquire lock within %d sec'
                                   % (self._owner, self._timeout))
        else:
            while True:
                if self._waitLock():
                    break
                print('"%s" is waiting for "%s" and is getting bored...'
                      % (self._owner, self._lock.owner))
        self._lock.owner = self._owner

    def release(self):
        self._lock._release()

    def _waitLock(self):
        with self._lock.cond:
            _current_t = _start_t = time.time()
            while _current_t < _start_t + self._timeout:
                if self._lock.lock.acquire(False):
                    return True
                else:
                    self._lock.cond.wait(self._timeout - _current_t + _start_t)
                    _current_t = time.time()
        return False

To be sure the threads really don't interfere and don't wait get notified as soon as possible I wrote a small multithreading test which will sum up the time needed to run all threads:

def test_lock_guard():
    import random

    def locking_thread_fn(name, lock, duration, timeout):
        with timeout_lock(name, lock, timeout=timeout):
            print('%x: "%s" begins to work..' % (threading.get_ident(), name))
            time.sleep(duration)
            print('%x: "%s" finished' % (threading.get_ident(), name))

    _lock = timeout_lock.lock()

    _threads = []
    _total_d = 0
    for i in range(3):
        _d = random.random() * 3
        _to = random.random() * 2
        _threads.append(threading.Thread(
            target=locking_thread_fn, args=('thread%d' % i, _lock, _d, _to)))
        _total_d += _d

    _t = time.time()

    for t in _threads: t.start()
    for t in _threads: t.join()

    _t = time.time() - _t

    print('duration: %.2f sec / expected: %.2f (%.1f%%)'
          % (_t, _total_d, 100 / _total_d * _t))

Output is:

7f940fc2d700: "thread0" begins to work..
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
7f940fc2d700: "thread0" finished
7f940f42c700: "thread1" begins to work..
"thread2" is waiting for "thread1" and is getting bored...
"thread2" is waiting for "thread1" and is getting bored...
7f940f42c700: "thread1" finished
"thread2" is waiting for "None" and is getting bored...
7f940ec2b700: "thread2" begins to work..
7f940ec2b700: "thread2" finished
duration: 5.20 sec / expected: 5.20 (100.1%)
Chromosphere answered 14/8, 2015 at 12:21 Comment(0)
P
0

Okay, this is already implemented in python 3.2 or above: https://docs.python.org/3/library/threading.html Look for threading.TIMEOUT_MAX

But I improved on the test case over frans' version ... though this is already a waste of time if you're on py3.2 or above:

from unittest.mock import patch, Mock
import unittest

import os
import sys
import logging
import traceback
import threading
import time

from Util import ThreadingUtil

class ThreadingUtilTests(unittest.TestCase):

    def setUp(self):
        pass

    def tearDown(self):
        pass

    # https://www.pythoncentral.io/pythons-time-sleep-pause-wait-sleep-stop-your-code/
    def testTimeoutLock(self):

        faulted = [False, False, False]

        def locking_thread_fn(threadId, lock, duration, timeout):
            try:
                threadName = "Thread#" + str(threadId)
                with ThreadingUtil.TimeoutLock(threadName, lock, timeout=timeout, raise_on_timeout=True):
                    print('%x: "%s" begins to work..' % (threading.get_ident(), threadName))
                    time.sleep(duration)
                    print('%x: "%s" finished' % (threading.get_ident(), threadName))
            except:
                faulted[threadId] = True

        _lock = ThreadingUtil.TimeoutLock.lock()

        _sleepDuration = [5, 10, 1]
        _threads = []

        for i in range(3):
            _duration = _sleepDuration[i]
            _timeout = 6
            print("Wait duration (sec): " + str(_duration) + ", Timeout (sec): " + str(_timeout))
            _worker = threading.Thread(
                                        target=locking_thread_fn, 
                                        args=(i, _lock, _duration, _timeout)
                                    )
            _threads.append(_worker)

        for t in _threads: t.start()
        for t in _threads: t.join()

        self.assertEqual(faulted[0], False)
        self.assertEqual(faulted[1], False)
        self.assertEqual(faulted[2], True)

Now under "Util" folder, I have "ThreadingUtil.py":

import time
import threading

# https://mcmap.net/q/534622/-how-to-implement-a-lock-with-a-timeout-in-python-2-7
# https://docs.python.org/3.4/library/asyncio-sync.html#asyncio.Condition
# https://mcmap.net/q/508722/-how-to-create-global-lock-semaphore-with-multiprocessing-pool-in-python
# https://hackernoon.com/synchronization-primitives-in-python-564f89fee732

class TimeoutLock(object):
    ''' taken from https://mcmap.net/q/534622/-how-to-implement-a-lock-with-a-timeout-in-python-2-7
    '''
    class lock:
        def __init__(self):
            self.owner = None
            self.lock = threading.Lock()
            self.cond = threading.Condition()

        def _release(self):
            self.owner = None
            self.lock.release()
            with self.cond:
                self.cond.notify()

    def __init__(self, owner, lock, timeout=1, raise_on_timeout=False):
        self._owner = owner
        self._lock = lock
        self._timeout = timeout
        self._raise_on_timeout = raise_on_timeout

    # http://effbot.org/zone/python-with-statement.htm
    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, type, value, tb):
        ''' will only be called if __enter__ did not raise '''
        self.release()

    def acquire(self):
        if self._raise_on_timeout:
            if not self._waitLock():
                raise RuntimeError('"%s" could not aquire lock within %d sec'
                                   % (self._owner, self._timeout))
        else:
            while True:
                if self._waitLock():
                    break
                print('"%s" is waiting for "%s" and is getting bored...'
                      % (self._owner, self._lock.owner))
        self._lock.owner = self._owner

    def release(self):
        self._lock._release()

    def _waitLock(self):
        with self._lock.cond:
            _current_t = _start_t = time.time()
            while _current_t < _start_t + self._timeout:
                if self._lock.lock.acquire(False):
                    return True
                else:
                    self._lock.cond.wait(self._timeout - _current_t + _start_t)
                    _current_t = time.time()
        return False
Parsimonious answered 4/10, 2018 at 5:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.