Calling condition.wait() inside thread causes retrieval of any future to block on main thread
Asked Answered
D

1

38

I have tasks that are executed inside a threadpool that share a reentrant read write lock. these tasks return futures if there execution finishes. The reentrant read write lock will wait on a condition when the lock experiences contention.

The library I am using exposes a wait_for_any method to retrieve one or more finished futures from a collection of tasks. However, even though one or more futures have finished the wait_for_any method will fail to return until all futures are finished. Furthermore, the wait_for_any method exposes a timeout parameter that is subsequently ignored if set.

My question is what am I doing wrong that could cause such a wait_for_any method to block? Do I understand Python's implementation of conditional wait and notify incorrectly and will these constructs block every thread entirely in Python?

The library I am using is called Futurist and is maintained by the OpenStack foundation. Here are links to relevant classes and methods I use: GreenThreadPoolExecutor and waiters.wait_for_any

Here is the ReentrantReadWriteLock:

class ReentrantReadWriteLock(object):
    def __init__(self):

        self._read_lock = RLock()
        self._write_lock = RLock()
        self._condition = Condition
        self._num_readers = 0
        self._wants_write = False

    def read_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._read_lock.acquire(blocking):
                int_lock = True
                LOG.warning("read internal lock acquired")
                while self._wants_write:
                    LOG.warning("read wants write true")
                    if not blocking:
                        LOG.warning("read non blocking")
                        return False
                    LOG.warning("read wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("read acquired lock")
                self._num_readers += 1
                return True
            LOG.warning("read internal lock failed")
            return False
        finally:
            if int_lock:
                 self._read_lock.release()

    def write_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._write_lock.acquire(blocking):
                int_lock = True
                LOG.warning("write internal lock acquired")
                while self._num_readers > 0 or self._wants_write:
                    LOG.warning("write wants write true or num read")
                    if not blocking:
                        LOG.warning("write non blocking")
                        return False
                    LOG.warning("write wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("write acquired lock")
                self._wants_write = True
                return True
            LOG.warning("write internal lock failed")
            return False
        finally:
            if int_lock:
                self._write_lock.release()

To test the lock and for it do block indefintely I do the following:

def get_read(self, rrwlock):
    return rrwlock.read_acquire()

def get_write(self, rrwlock):
    return rrwlock.write_acquire()

def test():
    self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
    rrwlock = ReentrantReadWriteLock()
    futures = []
    futures.append(self._threadpool.submit(self.get_read, rrwlock))
    futures.append(self._threadpool.submit(self.get_write, rrwlock))

    # Get the results and verify only one of the calls succeeded
    # assert that the other call is still pending
    results = waiters.wait_for_any(futures)
    self.assertTrue(results[0].pop().result)
    self.assertEqual(1, len(results[1]))

In the example the execution of results = waiters.wait_for_any(futures) blocks indefinitely. This makes me thoroughly confused. I hope someone can provide me with an explanation for this behavior.

Update 2019-10-16 18:55:00 UTC: The blocking of the main thread is not limited to this ReentrantReadWriteLock implementation but also happens when using libraries such as readerwriterlock.

Update 2019-10-17 08:15:00 UTC I have submitted this as a bug report to the maintainers of futurist over on launchpad as I believe this behavior to be incorrect: launchpad bug report

Update 2019-10-20 09:02:00 UTC I have since observed on which call inside the futurist library progress is blocked: waiter.event.wait(timeout) A similar issue seemed to be submitted to Python 3.3 and 3.4 and has since been closed: closed issue

Update 2019-10-21 09:06:00 UTC A patch to the futurist library has been submitted to try and resolve this issue.

Update 2019-10-22 08:03:00 UTC The patch submitted did not resolve the issue. When tracing down waiter.event.wait(timeout) the call blocks in the Python threading.py wait function when calling waiter.acquire().

Update 2019-10-23 07:17:00 UTC I created a small repository that demonstrates this is possible with native ThreadPoolExecutor and futures. I am starting to suspect this is a limitation in CPython caused by the GIL. The following code demonstrates the operation of the demonstration using the same lock as shown above:

from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor

def read_lock(lock):
    lock.read_acquire()

def write_lock(lock):
    lock.write_acquire()

def main():
    local_lock = ReentrantReadWriteLock()
    with ThreadPoolExecutor(max_workers=2) as executor:
        # First task will submit fine
        future = executor.submit(read_lock, local_lock)
        # Second one will block indefinitely
        future2 = executor.submit(write_lock, local_lock)

Update 2019-10-31 07:36:00 UTC The reentrant read write lock has been updated so that it works with Python 2.7 and is current to what is written in the demo repository on Github.

Additionally, it has been discovered that the native threadpool demo as described on 2019-10-23 does not work because together with the last statement

future2 = executor.submit(write_lock, local_lock)

The __exit__ method of the threadpool will be called. Naturally, this method tries to cleanly shutdown all currently running threads which is impossible due to the held lock. The example has been updated with a spin_for_any example:

futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))

# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
    for f in futures:
        if f.done():
            futures.remove(f)
            f.result()
            print("Future done")

This native Python concurrency spin_for_any example works entirely as expected.

Disconnection answered 16/10, 2019 at 9:58 Comment(3)
disclaimer: no knowledge of python etc Seems to be semantically ambiguous. Wait for any could also mean that it waits for all (any) futures you have. What do the docs of the function sayJessjessa
@TarickWelling See: docs.openstack.org/futurist/latest/reference/index.html#waiters for the method definition (its a bit lengthy for a comment). I know for a fact that it should return a tuple with done and not done futures, there is also a wait_for_all method which only returns if all futures are done.Disconnection
It sounds like you got this figured out now. I do have some experience with multi-threading in Python, so if you still need anything on this question or another I would be happy to help.Session
V
2

In your ReentrantReadWriteLock class, try changing

self._condition = Condition()
Variolous answered 15/6, 2020 at 16:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.