How to do a simple Pika SelectConnection to send a message, in python?
Asked Answered
H

3

6

I am trying to convert my code to send rabbitmq messages via Pika instead. I am having a lot of trouble understanding how to send a simple message using an asynchronous connection (such as SelectConnection).

In my old code, which I use the amqp library I simply declare a class like this:

import amqp as amqp

class MQ():

    mqConn = None
    channel = None

    def __init__(self):
        self.connect()

    def connect(self):
        if self.mqConn is None:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

        elif not self.mqConn.connected:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

    def sendMQ(self, message):
        self.connect()
        lMessage = amqp.Message(message)
        self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q") 

And then elsewhere in my code I call sendMQ("this is my message"), and then the code continues. I do not need to listen for acknowledgements etc.

Could someone please write a simple class utilizing pika and SelectConnection that would also work to just send a message using sendMQ("this is my message")? I've looked at the pika examples but I don't know how to get around the ioloop and KeyboardInterrupt. I guess I'm just not sure how to make my code continue to run without all these try/excepts... Also, not exactly sure how I can pass my message on through all the callbacks...

Any help is appreciated!

Thanks.

Hubris answered 19/5, 2015 at 17:30 Comment(0)
B
8

The whole thing is call back driven, as it is a async way of doing things. Async consumer is easy to understand, we can get the message by providing a call back function. However the publisher part is a bit difficult to understand, at least, for beginner.

Usually we need a Queue to do the communication, and the publisher get data from it periodically.

The key thing of using SelectConnection is to register your publish message function into the event loop, which can be done by connection.add_timeout. After you are done with the publish, register next round of your publish.

The next question is where to put the the initial registration. The initial registration can be done in the channel open call back.

Below is a code-snip for better understanding. Be aware, it is not production ready. Because it only publish message at max speed of 10 per second. You need to adjust the publish interval and publish more message at one call back.

class MQ(Object):
    def __init___(self, queue):
        self.queue = queue
    def on_channel_open(self, chn):
        self.channel = chn
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def schedule_next_message(self):
        try:
            msg = self.queue.get(True, 0.01)
            self.channel.basic_publish('YOUR EXCHANGE','YOUR ROUTING KEY',msg)
        except Queue.Empty:
            pass
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def on_open(self, conn):
        self.connection = conn
        self.connection.channel(on_open_callback=self.on_channel_open)
    def run(self):
        # create a connection
        self.connection = pika.SelectConnection(pika.ConnectionParameters(heartbeat=600,host=args.mq_ip),self.on_open)
        try:
            self.connection.ioloop.start()
        except Exception:
            print("exception in publisher")
            self.connection.close()
            self.connection.ioloop.start()

Put MQ(queue).run() in a separate thread, and whenever you want to put message to mq, just put it into the queue object.

Biopsy answered 15/8, 2019 at 16:56 Comment(1)
I think add_timeout is renamed to call_later in the newer pika versions. Maybe you could update your answer.Cyclo
A
1

I updated the code from TerrenceSun to work with the latest version of pika (currently v1.3.0) and also added a thread so everything will work in a self contained class: (note: had to use call_later as Andrew suggested)

# async_messenger.py : simple asynchronous rabbitmq message producer
# based on https://mcmap.net/q/1758423/-how-to-do-a-simple-pika-selectconnection-to-send-a-message-in-python
import os
import sys
import time
import traceback
import logging
import json
from optparse import OptionParser
import pika
import queue
import threading

'''
USAGE:
python async_messenger.py --debuglevel=1
cat ./async_messenger.log
'''

logger = logging.getLogger(__name__)

class AsyncMessenger:
    def __init__(self, debuglevel=0, queue=queue.Queue()):
        self.debuglevel = debuglevel

        if self.debuglevel > 0:
            print('AsyncMessenger: init debuglevel:',debuglevel)

        self.credentials = pika.PlainCredentials('guest','guest')
        self.parameters = pika.ConnectionParameters(host='localhost',
                                               port=5672,
                                               virtual_host='/',
                                               credentials=self.credentials,
                                               heartbeat=600)

        self.queue = queue
        self.exchange = 'YOUR EXCHANGE'
        self.routing_key = 'YOUR ROUTING KEY'
        self.msgThread = None

    # self.start -> (creates thread) -> self.run
    def run(self):
        print('AsyncMessenger: run')
        self.connection = pika.SelectConnection(parameters=self.parameters,
                                                on_open_callback=self.on_open)
        try:
            print('AsyncMessenger: run: connection.ioloop.start')
            self.connection.ioloop.start()
        except Exception as e:
            print("exception in publisher:",format(e))
            # traceback.print_exc(file=sys.stdout)
            self.connection.close()
            self.connection.ioloop.start()

    # run -> on_open
    def on_open(self, conn):
        print('AsyncMessenger: on_open')
        self.connection = conn
        self.connection.channel(on_open_callback=self.on_channel_open)

    # run -> on_open -> on_channel_open
    def on_channel_open(self, chn):
        print('AsyncMessenger: on_channel_open')
        self.channel = chn
        self.connection.ioloop.call_later(0.1, self.schedule_next_message)

    # run -> on_open -> on_channel_open -> schedule_next_message
    def schedule_next_message(self):
        if (self.debuglevel > 1): print('AsyncMessenger: schedule_next_message')
        try:
            msg = self.queue.get(True, 0.01)
            print('AsyncMessenger: queue msg:',msg)
            self.channel.basic_publish(self.exchange,self.routing_key,msg)
        except queue.Empty:
            pass
        self.connection.ioloop.call_later(0.1, self.schedule_next_message)

    def close(self):
        print('AsyncMessenger: close')
        self.connection.ioloop.stop()
        self.connection.close()

    # start our own self contained thread in class
    def start(self):
        print('AsyncMessenger: start')

        # function for worker thread
        def message_worker():
            self.run()

        # Turn-on the worker thread.
        self.msgThread = threading.Thread(target=message_worker, daemon=True)

        # start the threads
        self.msgThread.start()

def main():
    parser = OptionParser()

    parser.add_option("--debuglevel", action="store", type="int", \
                      nargs=1, dest="debuglevel", default=0)

    (options, args) = parser.parse_args()

    debuglevel = options.debuglevel

    log_file = './async_messenger.log'
    logging.basicConfig(filename=log_file, level=logging.INFO, format= \
               '%(name)s : %(asctime)s : Line: %(lineno)d - %(levelname)s  ::  %(message)s', \
               datefmt='%m/%d/%Y %I:%M:%S %p')
    logger = logging.getLogger(__name__)

    q = queue.Queue()

    asyncMessenger = AsyncMessenger(debuglevel, q)

    # Send task requests to the worker.
    for item in range(10):
        print('adding queue item:',item)
        # put a str so each item has len
        q.put(str(item))

    asyncMessenger.start()

    # keep checking queue, exit when empty
    while (q.qsize() > 0):
        time.sleep(1)

    asyncMessenger.close()

    # blocking wait for the threads to complete
    # Note: thread will wait forever unless we use: connection.ioloop.stop()
    asyncMessenger.msgThread.join()

    print('All work completed')

if __name__ == '__main__':
    main()

If all goes well, your output should look like this:

python async_messenger.py --debuglevel=1
AsyncMessenger: init debuglevel: 1
adding queue item: 0
adding queue item: 1
adding queue item: 2
adding queue item: 3
adding queue item: 4
adding queue item: 5
adding queue item: 6
adding queue item: 7
adding queue item: 8
adding queue item: 9
AsyncMessenger: start
AsyncMessenger: run
AsyncMessenger: run: connection.ioloop.start
AsyncMessenger: on_open
AsyncMessenger: on_channel_open
AsyncMessenger: queue msg: 0
AsyncMessenger: queue msg: 1
AsyncMessenger: queue msg: 2
AsyncMessenger: queue msg: 3
AsyncMessenger: queue msg: 4
AsyncMessenger: queue msg: 5
AsyncMessenger: queue msg: 6
AsyncMessenger: queue msg: 7
AsyncMessenger: queue msg: 8
AsyncMessenger: queue msg: 9
AsyncMessenger: close
All work completed
Armillary answered 30/9, 2022 at 18:9 Comment(0)
P
-4

As a first approach I recommend you to start with this pub/sub examples provided at the end of the post. Once you understand this simple example start following the tutorial provided right before the code blocks at the end. The tutorial that has 6 different use cases, with its python examples. With the 5 first steps you will understand the way it works. You should have the clear the concept of exchange (entity that routes the messages to each queue), binding key (key used to connect an exchange and a queue), routing key (key that is sent along with the message from the publisher and that is used by the exchange to route message to one queue or another) and queue (a buffer that can store messages, can have more than 1 (or 1 if wanted) subscriber and that can get messages from more than 1 exchange and based in different binding keys). Besides, there's more than one type of exchange (fanout, topic (this one is probably the one you need)...).

If this all sounds new, please follow the tutorial provided by RabbitMQ:

https://www.rabbitmq.com/tutorials/tutorial-one-python.html

pub.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                  routing_key='hello',
                  body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

sub.py:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                  queue='hello',
                  no_ack=True)

channel.start_consuming()
Pantaloon answered 26/5, 2015 at 0:20 Comment(2)
Great that you try to help, but he was asking about the SelectConnection adapter. The examples you mentioned are using the BlockingConnection.Tunnel
Example code with a SelectConnection would be more illustrative.Spital

© 2022 - 2024 — McMap. All rights reserved.