Python - RabbitMQ Pika consumer - How to use async function as callback
Asked Answered
S

2

6

I have the following code where I initialize a consumer listening to a queue.

consumer = MyConsumer()
consumer.declare_queue(queue_name="my-jobs")
consumer.declare_exchange(exchange_name="my-jobs")
consumer.bind_queue(
    exchange_name="my-jobs", queue_name="my-jobs", routing_key="jobs"
)
consumer.consume_messages(queue="my-jobs", callback=consumer.consume)

The problem is that the consume method is defined as follows:

async def consume(self, channel, method, properties, body):

Inside the consume method, we need to await async functions, but this produces an error "coroutine is not awaited" for the consume function. Is there a way to use async function as a callback in pika?

Sinusoid answered 7/7, 2022 at 12:36 Comment(0)
S
9

I annotated my callback with @sync where sync is:

def sync(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
    return wrapper

(found it here for celery, but it worked with pika too)

Sinusoid answered 7/7, 2022 at 18:34 Comment(4)
Hello, I maintain Pika. That is a very interesting discovery. If you have time, would you mind documenting this if you think it's likely another user may need to know? github.com/pika/pika/tree/main/docsCaveman
I should mention that this solution does not have a true async behavior, it's just a workaround to solve the coroutine exception when using an async function as callback. For actual async behavior, I found aio-pika (aio-pika.readthedocs.io/en/latest/quick-start.html). In the docs, where exactly would you like me to mention this workaround? @LukeBakkenSinusoid
Yes, aio-pika or aiorabbit are better choices. Hm, the docs don't seem like a good place for this. Is it too much trouble for a small example program here? github.com/pika/pika/tree/main/examplesCaveman
aiormq is also a simple and nice option for async (which aio-pika is now using). Basically aio-pika mostly provide auto-reconnection which many had issue with, including me.Copperhead
I
0

I had a similar doubt, I ended up using AsyncioConnection adapter.

class Consumer:
  
  def __init__(self, loop, ...):
    self._loop = loop
    self._in_flight_tasks = set()

  def connect(self):
    return AsyncioConnection(
      parameters=...
      custom_ioloop=self._loop,
    )

  ...

  
  async def _handle_message(...):
    ...
  
  def on_message(self, _unused_channel, basic_deliver, properties, body):
      task = self._loop.create_task(self._handle_message(tag, properties, body))
      self._in_flight_tasks.add(task)
      task.add_done_callback(self._in_flight_tasks.discard)

  ...

Note that I'm passing the event loop to the consumer. I create it with asyncio.new_event_loop() on the top of my app. I am not sure if that's required but may be as it seems like Pika us using some custom event loop implementation by default.

Most of the consumer code is taken from Pika examples.

For the explanation on why the tasks are added to a set and then discarded see here.

Irradiate answered 1/3, 2023 at 12:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.