Pika + RabbitMQ: setting basic_qos to prefetch=1 still appears to consume all messages in the queue
Asked Answered
F

1

24

I've got a python worker client that spins up a 10 workers which each hook onto a RabbitMQ queue. A bit like this:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()

The issue I have is that despite setting basic_qos on the channel, the first worker to start accepts all the messages off the queue, whilst the others sit there idle. I can see this in the rabbitmq interface, that even when I set worker_count to be 1 and dump 50 messages on the queue, all 50 go into the 'unacknowledged' bucket, whereas I'd expect 1 to become unacknowledged and the other 49 to be ready.

Why isn't this working?

Fogle answered 14/9, 2012 at 14:51 Comment(0)
F
34

I appear to have solved this by moving where basic_qos is called.

Placing it just after channel = connection.channel() appears to alter the behaviour to what I'd expect.

Fogle answered 14/9, 2012 at 14:55 Comment(7)
thank you! that did solve the issue. and btw this is very hard to debug..Baker
@Hiagara yeah just ran into this today myself. Amazing that almost 5 years later this is still not clear or documented in the API.Azrael
I think that we should to declarate basic_qos before basic_consume. Because basic_consume use this setting when initialized.Tosspot
agreed with @rborodinov. I had basic_qos right after basic_consume and it didn't work. Switched them, now it works fine.Devilry
I also had to set auto_ack=False when setting up the basic_consume for it to work. Otherwise it still consumed more messages than expected.Hazel
My .ack() was in the loop inside the callback, so it was trying to call it more than once for every delivery_tag thus resulting in a RabbitMQ 406 PRECONDITION_FAILED - unknown delivery tag.Philippic
@Hazel RE needing to set both basic_qos(prefetch_count=1) AND auto_ack=False is because of the AMQP spec "The prefetch-count is ignored if the no-ack option is set". Please note: In pika they use the word auto_ack and in the AMQP spec they use the word no-ack. kind of confusing IMO.Ewald

© 2022 - 2024 — McMap. All rights reserved.