This is @robble's code. I just added usage/example:
from datetime import datetime
import time
from queue import Queue
from threading import Thread
def _log(msg : str):
print(f"{datetime.utcnow()} {msg}")
import threading
from contextlib import contextmanager
from typing import TypeVar
class TimeoutLock(object):
def __init__(self, timeout_sec = -1):
self._lock = threading.Lock()
self.timeout_sec = timeout_sec
@contextmanager
def acquire_timeout(self):
result = self._lock.acquire(timeout=self.timeout_sec)
yield result
if result:
self._lock.release()
def producer(name, q, delay_sec):
try:
i : int = 0
while True:
q.put(i)
_log(f"{name} {i}")
time.sleep(delay_sec)
i = i + 1
except Exception as e:
err_msg = f"{name} error: {str(e)}"
_log(err_msg)
raise
def consumer(name, q, lock, delay_sec):
while True:
with lock.acquire_timeout() as acquired:
if acquired:
i = q.get()
_log(f'{name} {i}')
time.sleep(delay_sec)
else:
_log(f"{name} wait timeout'ed")
try:
q = Queue()
lock = TimeoutLock(timeout_sec=3)
consumer1_thread = Thread(target = consumer, args =('consumer1', q, lock, 5 ))
consumer2_thread = Thread(target = consumer, args =('consumer2', q, lock, 5 ))
producer1_thread = Thread(target = producer, args =('producer1', q, 1 ))
producer1_thread.start()
consumer1_thread.start()
time.sleep(5)
consumer2_thread.start()
q.join()
except Exception as e:
err_msg = f"main thread error: {str(e)}"
_log(err_msg)
finally:
_log(f'main thread done!')