I have tasks that are executed inside a threadpool that share a reentrant read write lock. these tasks return futures if there execution finishes. The reentrant read write lock will wait on a condition when the lock experiences contention.
The library I am using exposes a wait_for_any
method to retrieve one or more finished futures from a collection of tasks. However, even though one or more futures have finished the wait_for_any
method will fail to return until all futures are finished. Furthermore, the wait_for_any
method exposes a timeout parameter that is subsequently ignored if set.
My question is what am I doing wrong that could cause such a wait_for_any
method to block? Do I understand Python's implementation of conditional wait and notify incorrectly and will these constructs block every thread entirely in Python?
The library I am using is called Futurist and is maintained by the OpenStack foundation. Here are links to relevant classes and methods I use: GreenThreadPoolExecutor
and waiters.wait_for_any
Here is the ReentrantReadWriteLock:
class ReentrantReadWriteLock(object):
def __init__(self):
self._read_lock = RLock()
self._write_lock = RLock()
self._condition = Condition
self._num_readers = 0
self._wants_write = False
def read_acquire(self, blocking=True):
int_lock = False
try:
if self._read_lock.acquire(blocking):
int_lock = True
LOG.warning("read internal lock acquired")
while self._wants_write:
LOG.warning("read wants write true")
if not blocking:
LOG.warning("read non blocking")
return False
LOG.warning("read wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("read acquired lock")
self._num_readers += 1
return True
LOG.warning("read internal lock failed")
return False
finally:
if int_lock:
self._read_lock.release()
def write_acquire(self, blocking=True):
int_lock = False
try:
if self._write_lock.acquire(blocking):
int_lock = True
LOG.warning("write internal lock acquired")
while self._num_readers > 0 or self._wants_write:
LOG.warning("write wants write true or num read")
if not blocking:
LOG.warning("write non blocking")
return False
LOG.warning("write wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("write acquired lock")
self._wants_write = True
return True
LOG.warning("write internal lock failed")
return False
finally:
if int_lock:
self._write_lock.release()
To test the lock and for it do block indefintely I do the following:
def get_read(self, rrwlock):
return rrwlock.read_acquire()
def get_write(self, rrwlock):
return rrwlock.write_acquire()
def test():
self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
rrwlock = ReentrantReadWriteLock()
futures = []
futures.append(self._threadpool.submit(self.get_read, rrwlock))
futures.append(self._threadpool.submit(self.get_write, rrwlock))
# Get the results and verify only one of the calls succeeded
# assert that the other call is still pending
results = waiters.wait_for_any(futures)
self.assertTrue(results[0].pop().result)
self.assertEqual(1, len(results[1]))
In the example the execution of results = waiters.wait_for_any(futures)
blocks indefinitely. This makes me thoroughly confused. I hope someone can provide me with an explanation for this behavior.
Update 2019-10-16 18:55:00 UTC: The blocking of the main thread is not limited to this ReentrantReadWriteLock implementation but also happens when using libraries such as readerwriterlock.
Update 2019-10-17 08:15:00 UTC I have submitted this as a bug report to the maintainers of futurist over on launchpad as I believe this behavior to be incorrect: launchpad bug report
Update 2019-10-20 09:02:00 UTC I have since observed on which call inside the futurist library progress is blocked: waiter.event.wait(timeout) A similar issue seemed to be submitted to Python 3.3 and 3.4 and has since been closed: closed issue
Update 2019-10-21 09:06:00 UTC A patch to the futurist library has been submitted to try and resolve this issue.
Update 2019-10-22 08:03:00 UTC
The patch submitted did not resolve the issue. When tracing down waiter.event.wait(timeout)
the call blocks in the Python threading.py wait function when calling waiter.acquire().
Update 2019-10-23 07:17:00 UTC I created a small repository that demonstrates this is possible with native ThreadPoolExecutor and futures. I am starting to suspect this is a limitation in CPython caused by the GIL. The following code demonstrates the operation of the demonstration using the same lock as shown above:
from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor
def read_lock(lock):
lock.read_acquire()
def write_lock(lock):
lock.write_acquire()
def main():
local_lock = ReentrantReadWriteLock()
with ThreadPoolExecutor(max_workers=2) as executor:
# First task will submit fine
future = executor.submit(read_lock, local_lock)
# Second one will block indefinitely
future2 = executor.submit(write_lock, local_lock)
Update 2019-10-31 07:36:00 UTC The reentrant read write lock has been updated so that it works with Python 2.7 and is current to what is written in the demo repository on Github.
Additionally, it has been discovered that the native threadpool demo as described on 2019-10-23 does not work because together with the last statement
future2 = executor.submit(write_lock, local_lock)
The __exit__
method of the threadpool will be called. Naturally, this method tries to cleanly shutdown all currently running threads which is impossible due to the held lock. The example has been updated with a spin_for_any example:
futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))
# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
for f in futures:
if f.done():
futures.remove(f)
f.result()
print("Future done")
This native Python concurrency spin_for_any example works entirely as expected.