When using python async/asyncio, I often create and complete asyncio.Future
objects from threads that are not the thread running the event loop.
Unless I complete those futures in the thread that is running the event loop or via a function that notifies that loop of the completion, the event loop often does not "notice" that the futures are completed.
Is there a way to "notify" an event loop that it should check a Future for completion if that future was readied (via set_result) externally?
Why I am asking this
The threads which ready futures need to a) have very low latency, and b) check whether the future has been readied, synchronously, later on (e.g. via future.done()
).
The event loop await
ing the Futures does not need to have low latency in being notified that they're ready--it can be notified a few milliseconds late.
Ideally there would be a performant way to notify the event loop that a Future had been readied after readying it synchronously in a thread.
Even if that's not possible, the event loop could poll readiness on an interval, so long as the futures were synchronously readied as quickly as possible in threads.
What I have tried
The "correct" way to solve this problem is with call_soon_threadsafe
, e.g.:
def do_in_thread(future):
future.get_loop().call_soon_threasafe(future.set_result, "the result")
That notifies the event loop of Future readiness reliably, but does not work for two reasons:
- It has significant (8-10x) overhead versus calling
future.set_result
in my benchmarks. - It doesn't ready the Future until the event loop runs, which means I can't reliably check if the Future is done, which I need to do. For example, this won't work:
def do_in_thread(future):
future.get_loop().call_soon_threasafe(future.set_result, "the result")
assert future.done() # Fails
One thing that does seem to work is to notify the event loop by intentionally failing a second call to set_result
via call_soon_threadsafe
, and swallowing the InvalidStateError
, like this:
def ensure_result(f, res):
try:
f.set_result(res)
except InvalidStateError:
pass
def in_thread(fut: Future):
fut.set_result("the result")
fut.get_loop().call_soon_threadsafe(ensure_result, fut, "the result")
That still has overhead, but I could remove the overhead of calling call_soon_threadsafe
by tracking Futures in a thread-shared data structure and polling calls to ensure_result
occasionally. However, I'm still not sure:
- Does that reliably work? Is
set_result
failing withInvalidStateError
guaranteed to notify the event loop that aawait
given Future can return out ofawait
, or is that an undocumented implementation detail I'm relying on? - Is there a better way to achieve that periodic-wakeup that doesn't involve me keeping track of/polling such Futures myself?
In a perfect world, there would be a loop.poll_all_pending_futures()
or loop.update_future_state(fut)
method which would achieve this efficiently, but I don't know of one.
asyncio
with threads in the first place. Why do you need to do that? – RoxannaroxanneFuture.set_result
is not thread-safe. (In fact, they're documented not to be...) – Roxannaroxanneset_result
is indeed not thread safe, for the exact reason (among others) listed in the question: it doesn't thread-safely notify the event loop that a future is ready. I'm asking if there are ways to mitigate that thread-unsafety without resorting to manually tracking all futures or usingcall_soon_threadsafe
. – Natalaset_result
, so unless the programmer is very certain a future is onlyset
by a single thread, it has a race condition too. – Roxannaroxannecall_soon_threadsafe
, and both will set the same result. – Natala