Consume multiple queues in python / pika
Asked Answered
W

3

45

I am trying to create a consumer that would subscribe to multiple queues, and then process messages as they arrive.

The problem is that when there is some data already present in the first queue, it consumes the first queue and never goes to consume the second queue. However, when the first queue is empty, it does go to the next queue, and then consumes both queues simultaneously.

I had first implemented threading but want to steer clear of it, when pika library does it for me without much complexity. Below is my code:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
Wildwood answered 1/7, 2014 at 12:28 Comment(9)
I tried your code with the only change of adding a logger to prevent exceptions, and declaring the queues. The code works as expected. I publishes some messages to each queue and the messages got routed and echo'ed on the CLIHesperus
Hey, can you try with pre-populated queues, and then start the consumer. Let me know if this also works as expected.Wildwood
I just tried that and it doesn't work. I only see the messages from the first queue.Hesperus
That's what I'm talking about. Wierd isn't it? You have any ideas?Wildwood
I don't know much about the python client, that's why I asked Gavin bellow to answerHesperus
Does it function the same with other clients? Can you give it try in any other client? If this is pika specific, it will have to be raised. Though Gavin gave a good suggestion, it was already implemented.Wildwood
I just tried with the php-amqplib client and it works as expected. I pre publish messages to both queues and then all of them are consumed.Hesperus
Good to know. I was about to raise the issue, when I found out Gavin is the author of pika. Now, it seems it's upon Gavin to help me.Wildwood
Possible duplicate of Python and RabbitMQ - Best way to listen to consume events from multiple channels?Sniggle
Z
28

One possible solution is to use non blocking connection and consume messages.

import pika


def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)


def on_open(connection):
    connection.channel(on_open_callback=on_channel_open)


def on_channel_open(channel):
    channel.basic_consume(queue='queue1', on_message_callback=callback)
    channel.basic_consume(queue='queue2', on_message_callback=callback)


parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()

This will connect to multiple queues and will consume messages accordingly.

Zielsdorf answered 20/2, 2017 at 17:59 Comment(5)
Could you tell me the purpose of %2F at the end?Ralaigh
@RápliAndrás When connecting to rabbitmq, you need to specify virtualhost. The default host is /, which is escaped to %2f.Zielsdorf
It is worth to note this code did not work for me with Pika 1.1.0. It is just a matter to add on_open_callback= in the on_open method: connection.channel(on_open_callback=on_channel_open) and on_message_callback= in on_channel_open method: channel.basic_consume(on_open_callback=callback, queue='queue1') channel.basic_consume(on_open_callback=callback, queue='queue2')Jaundice
Is there a way to define a priority between the queues?Bader
FYI you can also do this with the pika blocking connection with one channel. pika has had test code for this since 2015 .Allsun
L
2

The issue is most likely that the first call has issued a Basic.Consume and has already received messages from a pre-populated queue before the second call is issued. You might want to try setting the QoS prefetch count to 1, which will limit RabbitMQ from sending you more than one message at a time.

Langill answered 2/7, 2014 at 14:59 Comment(7)
It has already been set to 1, as can be seen in the code. Anything else that you can think of?Wildwood
One more thing, I don't think the consumer would actually start consuming until it hits start_consuming. Need to verify, though.Wildwood
Hey Gavin, I just tried this functionality with kombu library for python, and it worked as expected.Wildwood
I'll try using the Async backend and let you know.Wildwood
I followed pika.readthedocs.org/en/0.9.13/examples/… and things worked as expected. First queue was fully consumed, and then second queue was consumed accordingly.Wildwood
@Wildwood How did you go about doing this? Did you basically take your code above and replaced it within where you declare the queueEmolument
I moved from pika to kombu.Wildwood
M
2

Similar to comments in the first answer above, I was able to get similar results with pika 1.1.0 and the following:

import pika

def queue1_callback(ch, method, properties, body):
  print(" [x] Received queue 1: %r" % body)

def queue2_callback(ch, method, properties, body):
  print(" [x] Received queue 2: %r" % body)

def on_open(connection):
  connection.channel(on_open_callback = on_channel_open)


def on_channel_open(channel):
  channel.basic_consume('queue1', queue1_callback, auto_ack = True)
  channel.basic_consume('queue2', queue2_callback, auto_ack = True)

credentials = pika.PlainCredentials('u', 'p')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.SelectConnection(parameters = parameters, on_open_callback = on_open)

Try:
  connection.ioloop.start()
except KeyboardInterrupt:
  connection.close()
  connection.ioloop.start()
Mirellamirelle answered 4/2, 2021 at 21:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.