Why is asyncio queue await get() blocking?
Asked Answered
H

3

7

Why is await queue.get() blocking?

import asyncio

async def producer(queue, item):
    await queue.put(item)

async def consumer(queue):
    val = await queue.get()
    print("val = %d" % val)

async def main():
    queue = asyncio.Queue()
    await consumer(queue)
    await producer(queue, 1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

If I call the producer() before consumer(), it works fine That is to say, the following works fine.

async def main():
    queue = asyncio.Queue()
    await producer(queue, 1)
    await consumer(queue)

Why isn't await queue.get() yielding control back to the event loop so that the producer coroutine can run which will populate the queue so that queue.get() can return.

Hippocrates answered 30/5, 2019 at 11:25 Comment(0)
P
11

You need to start the consumer and the producer in parallel, e.g. defining main like this:

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(consumer(queue), producer(queue, 1))

If for some reason you can't use gather, then you can do (the equivalent of) this:

async def main():
    queue = asyncio.Queue()
    asyncio.create_task(consumer(queue))
    asyncio.create_task(producer(queue, 1))
    await asyncio.sleep(100)  # what your program actually does

Why isn't await queue.get() yielding control back to the event loop so that the producer coroutine can run which will populate the queue so that queue.get() can return.

await queue.get() is yielding control back to the event loop. But await means wait, so when your main coroutine says await consumer(queue), that means "resume me once consumer(queue) has completed." Since consumer(queue) is itself waiting for someone to produce something, you have a classic case of deadlock.

Reversing the order works only because your producer is one-shot, so it immediately returns to the caller. If your producer happened to await an external source (such as a socket), you would have a deadlock there as well. Starting them in parallel avoids the deadlock regardless of how producer and consumer are written.

Pulmotor answered 30/5, 2019 at 13:4 Comment(4)
The producer and consumer are in different parts of the project, so I can't use asyncio.gather(consumer, producer). Is there a way to notify the consumer when the queue is not empty anymore?Hippocrates
Also, I didn't get this part - "when your main coroutine says await consumer(queue), that means 'resume me once consumer(queue) has completed'." If that is true then what is the difference between await consumer(queue) and consumer(queue). Can you please elaborate on that?Hippocrates
@AkshayTakkar Then start the consumer as a background task, using asyncio.create_task(consumer(queue)). That will also eliminate the deadlock.Pulmotor
@AkshayTakkar await consumer(queue) means "block the current coroutine (allowing other coroutines to run) until consumer returns". asyncio.create_task(consumer(queue)) means "schedule consumer to execute in the event loop, but don't wait for it". Evaluating just consumer(queue) without awaiting it or passing it to a function almost never makes sense. Please refer to an asyncio tutorial for details.Pulmotor
P
1

It's because you call await consumer(queue), which means the next line (procuder) will not be called until consumer returns, which it of course never does because nobody produced yet

check out the Example in the docs and see how they use it there: https://docs.python.org/3/library/asyncio-queue.html#examples

another simple example:

import asyncio
import random


async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break

        # process the item
        print('consuming item {}...'.format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
Phosphaturia answered 30/5, 2019 at 11:43 Comment(1)
My producer_coro and consumer_coro are in different parts of my project. I'm using the queue as a way to communicate between the producer and the consumer. Hence, I can't do asyncio.gather(producer_coro, consumer_coro) since producer_coro and consumer_coro are in different parts of the project.Hippocrates
P
0

You should use .run_until_complete() with .gather()

Here is your updated code:

import asyncio

async def producer(queue, item):
    await queue.put(item)

async def consumer(queue):
    val = await queue.get()
    print("val = %d" % val)

queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(
    asyncio.gather(consumer(queue), producer(queue, 1))
)
loop.close()

Out:

val = 1

Also you could use .run_forever() with .create_task()

So your code snippet will be:

import asyncio

async def producer(queue, item):
    await queue.put(item)

async def consumer(queue):
    val = await queue.get()
    print("val = %d" % val)

queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.create_task(consumer(queue))
loop.create_task(producer(queue, 1))
try:
    loop.run_forever()
except KeyboardInterrupt:
    loop.close()

Out:

val = 1
Prudie answered 30/5, 2019 at 12:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.