How can I use Pika to send and receive RabbitMQ messages?
Asked Answered
E

2

6

I'm having some issue getting Pika to work with routing keys or exchanges in a way that's consistent with it AMQP or RabbitMQ documentation. I understand that the RabbitMQ documentation uses an older version of Pika, so I have disregarded their example code.

What I'm trying to do is define a queue, "order" and have two consumers, one that handle the exchange or routing_key "production" and one that handles "test". From looking at that RabbitMQ documentation that should be easy enough to do by using either a direct exchange and routing keys or by using a topic exchange.

Pika however doesn't appear to know what to do with the exchanges and routing keys. Using the RabbitMQ management tool to inspect the queues, it's pretty obvious that Pika either didn't queue the message correctly or that RabbitMQ just threw it away.

On the consumer side it isn't really clear how I should bind a consumer to an exchange or handle routing keys and the documentation isn't really helping.

If I drop all ideas or exchanges and routing keys, messages queue up nicely and are easily handled by my consumer.

Any pointers or example code people have would be nice.

Escalator answered 27/3, 2011 at 11:30 Comment(2)
Invalid question: i have never seen any implementation of any standard being complete. They always lack certain details in the implementation.Forayer
"documentation uses an older version of Pika, so I have disregarded their example code." This is a real question affecting many people who try to get started(with pika) from the official rabbitMQ website. I have even cloned their sample code and tried to run them (having rabbitMQ server installed and no issues with SELinux) without getting the expected results.Heroics
E
14

As it turns out, my understanding of AMQP was incomplete.

The idea is as following:

Client:

The client after getting the connection should not care about anything else but the name of the exchange and the routing key. That is we don't know which queue this will end up in.

channel.basic_publish(exchange='order',
                      routing_key="order.test.customer",
                      body=pickle.dumps(data),
                      properties=pika.BasicProperties(
                          content_type="text/plain",
                          delivery_mode=2))

Consumer

When the channel is open, we declare the exchange and queue

channel.exchange_declare(exchange='order', 
                         type="topic", 
                         durable=True, 
                         auto_delete=False)

channel.queue_declare(queue="test", 
                      durable=True, 
                      exclusive=False, 
                      auto_delete=False, 
                      callback=on_queue_declared)

When the queue is ready, in the "on_queue_declared" callback is a good place, we can bind the queue to the exchange, using our desired routing key.

channel.queue_bind(queue='test', 
                   exchange='order', 
                   routing_key='order.test.customer')

#handle_delivery is the callback that will actually pickup and handle messages
#from the "test" queue
channel.basic_consume(handle_delivery, queue='test') 

Messages send to the "order" exchange with the routing key "order.test.customer" will now be routed to the "test" queue, where the consumer can pick it up.

Escalator answered 27/3, 2011 at 14:12 Comment(0)
U
1

While Simon's answer seems right in general, you might need to swap the parameters for consuming

channel.basic_consume(queue='test', on_message_callback=handle_delivery) 

Basic setup is sth like

credentials = pika.PlainCredentials("some_user", "some_password")
parameters = pika.ConnectionParameters(
    "some_host.domain.tld", 5672, "some_vhost", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

To start consuming:

channel.start_consuming()
Upmost answered 21/12, 2022 at 19:1 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.