Communication between async tasks and synchronous threads in python
Asked Answered
H

2

7

I am looking for the best solution for communication between async tasks and methods/functions that run in a thread pool executor from concurrent.futures. In previous synchronous projects, I would use the queue.Queue class. I assume that any method should be thread safe and therefore the asyncio.queue will not work.

I have seen people extend the queue.Queue class to do something like:

class async_queue(Queue):
  async def aput(self, item):
    self.put_nowait(item)

  async def aget(self):
    resp = await asyncio.get_event_loop().run_in_executor( None, self.get )
    return resp

Is there a better way?

Headstall answered 8/1, 2020 at 16:38 Comment(0)
R
10

I would recommend going the other way around: using the asyncio.Queue class to communicate between the two worlds. This has the advantage of not having to spend a slot in the thread pool on operations that take a long time to complete, such as a get().

Here is an example:

class Queue:
    def __init__(self):
        self._loop = asyncio.get_running_loop()
        self._queue = asyncio.Queue()

    def sync_put_nowait(self, item):
        self._loop.call_soon(self._queue.put_nowait, item)

    def sync_put(self, item):
        asyncio.run_coroutine_threadsafe(self._queue.put(item), self._loop).result()

    def sync_get(self):
        return asyncio.run_coroutine_threadsafe(self._queue.get(item), self._loop).result()

    def async_put_nowait(self, item):
        self._queue.put_nowait(item)

    async def async_put(self, item):
        await self._queue.put(item)

    async def async_get(self):
        return await self._queue.get()

The methods prefixed with sync_ are meant to be invoked by sync code (running outside the event loop thread). The ones prefixed with async_ are to be called by code running in the event loop thread, regardless of whether they are actually coroutines. (put_nowait, for example, is not a coroutine, but it still must be distinguished between a sync and an async version.)

Restaurant answered 8/1, 2020 at 17:7 Comment(2)
How does this work? The asyncio queues are not thread safe. They are meant to be used to communicate between coroutines. Since the coroutines all run in an event loop which runs on a single thread, this is why the lack of thread safety doesn't matter between coroutines. But it would matter with non-event loop code.Enervate
@Enervate The "sync_" prefixed methods use run_coroutine_threadsafe to run the function inside the event loop. That way, when the self._queue.put (and get etc) starts actually executing, that will be in the event loop thread.Restaurant
B
0

The disadvantage of the solution via run_coroutine_threadsafe() given in the other answer is that a worker thread will actually depend on your event loop: the thread will wait for the event loop to process your put() and get(). If your event loop is doing a lot of work, this can lead to very long waits, up to seconds of real time.

I suggest using aiologic.Queue (I'm the creator of aiologic), which doesn't have this disadvantage: wherever you do put() or get(), the thread or task will only wait if it's really necessary (e.g. if you do get() when the queue is empty).

queue = aiologic.Queue(["first", "second"], maxsize=3)

Timer(1, queue.green_put, ["third"]).start()

print(await queue.async_get())  # "first"
print(await queue.async_get())  # "second"
print(await queue.async_get())  # after one second: "third"

And if you don't need to set the upperbound limit on the number of items that can be placed in the queue, try aiologic.SimpleQueue. With it you will get maximum performance when the queue is rarely empty and there are many consumers (and of course, nothing prevents you from using this queue in other cases).

Bartz answered 19/10, 2024 at 21:45 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.