How can I "wake up" an event loop to notify it that a Future was completed from another thread?
Asked Answered
N

1

2

When using python async/asyncio, I often create and complete asyncio.Future objects from threads that are not the thread running the event loop.

Unless I complete those futures in the thread that is running the event loop or via a function that notifies that loop of the completion, the event loop often does not "notice" that the futures are completed.

Is there a way to "notify" an event loop that it should check a Future for completion if that future was readied (via set_result) externally?

Why I am asking this

The threads which ready futures need to a) have very low latency, and b) check whether the future has been readied, synchronously, later on (e.g. via future.done()).

The event loop awaiting the Futures does not need to have low latency in being notified that they're ready--it can be notified a few milliseconds late.

Ideally there would be a performant way to notify the event loop that a Future had been readied after readying it synchronously in a thread.

Even if that's not possible, the event loop could poll readiness on an interval, so long as the futures were synchronously readied as quickly as possible in threads.

What I have tried

The "correct" way to solve this problem is with call_soon_threadsafe, e.g.:

def do_in_thread(future):
    future.get_loop().call_soon_threasafe(future.set_result, "the result")

That notifies the event loop of Future readiness reliably, but does not work for two reasons:

  1. It has significant (8-10x) overhead versus calling future.set_result in my benchmarks.
  2. It doesn't ready the Future until the event loop runs, which means I can't reliably check if the Future is done, which I need to do. For example, this won't work:
def do_in_thread(future):
    future.get_loop().call_soon_threasafe(future.set_result, "the result")
    assert future.done()  # Fails

One thing that does seem to work is to notify the event loop by intentionally failing a second call to set_result via call_soon_threadsafe, and swallowing the InvalidStateError, like this:

def ensure_result(f, res):
    try:
        f.set_result(res)
    except InvalidStateError:
        pass


def in_thread(fut: Future):
    fut.set_result("the result")
    fut.get_loop().call_soon_threadsafe(ensure_result, fut, "the result")

That still has overhead, but I could remove the overhead of calling call_soon_threadsafe by tracking Futures in a thread-shared data structure and polling calls to ensure_result occasionally. However, I'm still not sure:

  1. Does that reliably work? Is set_result failing with InvalidStateError guaranteed to notify the event loop that a await given Future can return out of await, or is that an undocumented implementation detail I'm relying on?
  2. Is there a better way to achieve that periodic-wakeup that doesn't involve me keeping track of/polling such Futures myself?

In a perfect world, there would be a loop.poll_all_pending_futures() or loop.update_future_state(fut) method which would achieve this efficiently, but I don't know of one.

Natala answered 11/4, 2022 at 16:33 Comment(6)
You... probably shouldn't be mixing asyncio with threads in the first place. Why do you need to do that?Roxannaroxanne
Also, by the looks of it, Future.set_result is not thread-safe. (In fact, they're documented not to be...)Roxannaroxanne
That's the motivation for this question. It's less that "mixing asyncio with threads is something you shouldn't do" and more that it's complicated and delicate. set_result is indeed not thread safe, for the exact reason (among others) listed in the question: it doesn't thread-safely notify the event loop that a future is ready. I'm asking if there are ways to mitigate that thread-unsafety without resorting to manually tracking all futures or using call_soon_threadsafe.Natala
It also doesn't guard against multiple threads concurrently calling set_result, so unless the programmer is very certain a future is only set by a single thread, it has a race condition too.Roxannaroxanne
I can enforce that invariant externally. The only calls to set_result will be the one done synchronously in the thread and (if required, though I'd love a solution that doesn't require it) one via call_soon_threadsafe, and both will set the same result.Natala
This is an excellent, valid question, and I strongly second the OP's suggestion that there be an easy function to kick the event loop in the ass. Saying that one should never mix threads and asyncio fails to account for the fact that 3rd party libraries may use either, and that you may have C / C++ libraries and extensions integrated which are multi-threaded and use callbacks into Python. Not everything is within your control, can be readily seen, or can be readily redesigned.Buffet
B
2

Disclaimer: this may not be future proof or handle all event loop contexts.

I ran into the same essential problem, and banged my head on the wall for a few days. I was using callbacks from a C++ extension to set the future result via code running in another system thread (created on the C side). I could set that future result, but the event loop didn't care unless I had other coroutines keeping it "alive" in that Python thread, and even then it was often slow.

I used this conceptual solution (posting against your example code) to solve it:

def in_thread(fut: Future):
    fut.set_result("the result")
    asyncio.run_coroutine_threadsafe( asyncio.sleep(0), fut.get_loop() )

The result was a delay of like 100 microseconds (not milli, micro) between setting the future and getting the event loop to "wake up" and process it.

Buffet answered 19/7, 2023 at 18:38 Comment(3)
Confirm this worked!Clara
This does work, but is highly implementation dependent. Different event loops (e.g. uvloop), future versions of those loops, and different editions of Python now or in the future may cause this trick to stop working. If you add a disclaimer to that effect to your answer, I'm happy to accept it, as it definitely does solve my problem at present!Natala
@ZacB: I added the disclaimer as requested. I'm glad this worked for your use case!Buffet

© 2022 - 2024 — McMap. All rights reserved.