I took SingleNegationElimination's answer and created a class with can be used in a with
-statement the following way:
global_lock = timeout_lock()
...
with timeout_lock(owner='task_name', lock=global_lock):
do()
some.stuff()
This way it will only warn if the timeout expired (default=1s) and show the owner of the lock for investigation.
Use it this way and an exception will be thrown after the timeout:
with timeout_lock(owner='task_name', lock=global_lock, raise_on_timeout=True):
do()
some.stuff()
The timeout_lock.lock()
instance has to be created once and can be used across threads.
Here is the class - it works for me but feel free to comment and improve:
class timeout_lock:
''' taken from https://mcmap.net/q/534622/-how-to-implement-a-lock-with-a-timeout-in-python-2-7
'''
class lock:
def __init__(self):
self.owner = None
self.lock = threading.Lock()
self.cond = threading.Condition()
def _release(self):
self.owner = None
self.lock.release()
with self.cond:
self.cond.notify()
def __init__(self, owner, lock, timeout=1, raise_on_timeout=False):
self._owner = owner
self._lock = lock
self._timeout = timeout
self._raise_on_timeout = raise_on_timeout
def __enter__(self):
self.acquire()
return self
def __exit__(self, type, value, tb):
''' will only be called if __enter__ did not raise '''
self.release()
def acquire(self):
if self._raise_on_timeout:
if not self._waitLock():
raise RuntimeError('"%s" could not aquire lock within %d sec'
% (self._owner, self._timeout))
else:
while True:
if self._waitLock():
break
print('"%s" is waiting for "%s" and is getting bored...'
% (self._owner, self._lock.owner))
self._lock.owner = self._owner
def release(self):
self._lock._release()
def _waitLock(self):
with self._lock.cond:
_current_t = _start_t = time.time()
while _current_t < _start_t + self._timeout:
if self._lock.lock.acquire(False):
return True
else:
self._lock.cond.wait(self._timeout - _current_t + _start_t)
_current_t = time.time()
return False
To be sure the threads really don't interfere and don't wait get notified as soon as possible I wrote a small multithreading test which will sum up the time needed to run all threads:
def test_lock_guard():
import random
def locking_thread_fn(name, lock, duration, timeout):
with timeout_lock(name, lock, timeout=timeout):
print('%x: "%s" begins to work..' % (threading.get_ident(), name))
time.sleep(duration)
print('%x: "%s" finished' % (threading.get_ident(), name))
_lock = timeout_lock.lock()
_threads = []
_total_d = 0
for i in range(3):
_d = random.random() * 3
_to = random.random() * 2
_threads.append(threading.Thread(
target=locking_thread_fn, args=('thread%d' % i, _lock, _d, _to)))
_total_d += _d
_t = time.time()
for t in _threads: t.start()
for t in _threads: t.join()
_t = time.time() - _t
print('duration: %.2f sec / expected: %.2f (%.1f%%)'
% (_t, _total_d, 100 / _total_d * _t))
Output is:
7f940fc2d700: "thread0" begins to work..
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
7f940fc2d700: "thread0" finished
7f940f42c700: "thread1" begins to work..
"thread2" is waiting for "thread1" and is getting bored...
"thread2" is waiting for "thread1" and is getting bored...
7f940f42c700: "thread1" finished
"thread2" is waiting for "None" and is getting bored...
7f940ec2b700: "thread2" begins to work..
7f940ec2b700: "thread2" finished
duration: 5.20 sec / expected: 5.20 (100.1%)
threading.Condition
object (see docs.python.org/library/threading.html#threading.Condition). The Condition object wraps a lock and has await
method with a timeout. The wait is interrupted when it times out or whennotify
is called on the Condition object. – Obstetricthreading.Condition
just wraps athreading.RLock
object. Before callingwait
you need toacquire
the underlying Lock. Apart from that, it seems thatwait
is implemented using a busy loop. Am I missing something here? – Harris