I updated the code from TerrenceSun to work with the latest version of pika (currently v1.3.0) and also added a thread so everything will work in a self contained class:
(note: had to use call_later as Andrew suggested)
# async_messenger.py : simple asynchronous rabbitmq message producer
# based on https://mcmap.net/q/1758423/-how-to-do-a-simple-pika-selectconnection-to-send-a-message-in-python
import os
import sys
import time
import traceback
import logging
import json
from optparse import OptionParser
import pika
import queue
import threading
'''
USAGE:
python async_messenger.py --debuglevel=1
cat ./async_messenger.log
'''
logger = logging.getLogger(__name__)
class AsyncMessenger:
def __init__(self, debuglevel=0, queue=queue.Queue()):
self.debuglevel = debuglevel
if self.debuglevel > 0:
print('AsyncMessenger: init debuglevel:',debuglevel)
self.credentials = pika.PlainCredentials('guest','guest')
self.parameters = pika.ConnectionParameters(host='localhost',
port=5672,
virtual_host='/',
credentials=self.credentials,
heartbeat=600)
self.queue = queue
self.exchange = 'YOUR EXCHANGE'
self.routing_key = 'YOUR ROUTING KEY'
self.msgThread = None
# self.start -> (creates thread) -> self.run
def run(self):
print('AsyncMessenger: run')
self.connection = pika.SelectConnection(parameters=self.parameters,
on_open_callback=self.on_open)
try:
print('AsyncMessenger: run: connection.ioloop.start')
self.connection.ioloop.start()
except Exception as e:
print("exception in publisher:",format(e))
# traceback.print_exc(file=sys.stdout)
self.connection.close()
self.connection.ioloop.start()
# run -> on_open
def on_open(self, conn):
print('AsyncMessenger: on_open')
self.connection = conn
self.connection.channel(on_open_callback=self.on_channel_open)
# run -> on_open -> on_channel_open
def on_channel_open(self, chn):
print('AsyncMessenger: on_channel_open')
self.channel = chn
self.connection.ioloop.call_later(0.1, self.schedule_next_message)
# run -> on_open -> on_channel_open -> schedule_next_message
def schedule_next_message(self):
if (self.debuglevel > 1): print('AsyncMessenger: schedule_next_message')
try:
msg = self.queue.get(True, 0.01)
print('AsyncMessenger: queue msg:',msg)
self.channel.basic_publish(self.exchange,self.routing_key,msg)
except queue.Empty:
pass
self.connection.ioloop.call_later(0.1, self.schedule_next_message)
def close(self):
print('AsyncMessenger: close')
self.connection.ioloop.stop()
self.connection.close()
# start our own self contained thread in class
def start(self):
print('AsyncMessenger: start')
# function for worker thread
def message_worker():
self.run()
# Turn-on the worker thread.
self.msgThread = threading.Thread(target=message_worker, daemon=True)
# start the threads
self.msgThread.start()
def main():
parser = OptionParser()
parser.add_option("--debuglevel", action="store", type="int", \
nargs=1, dest="debuglevel", default=0)
(options, args) = parser.parse_args()
debuglevel = options.debuglevel
log_file = './async_messenger.log'
logging.basicConfig(filename=log_file, level=logging.INFO, format= \
'%(name)s : %(asctime)s : Line: %(lineno)d - %(levelname)s :: %(message)s', \
datefmt='%m/%d/%Y %I:%M:%S %p')
logger = logging.getLogger(__name__)
q = queue.Queue()
asyncMessenger = AsyncMessenger(debuglevel, q)
# Send task requests to the worker.
for item in range(10):
print('adding queue item:',item)
# put a str so each item has len
q.put(str(item))
asyncMessenger.start()
# keep checking queue, exit when empty
while (q.qsize() > 0):
time.sleep(1)
asyncMessenger.close()
# blocking wait for the threads to complete
# Note: thread will wait forever unless we use: connection.ioloop.stop()
asyncMessenger.msgThread.join()
print('All work completed')
if __name__ == '__main__':
main()
If all goes well, your output should look like this:
python async_messenger.py --debuglevel=1
AsyncMessenger: init debuglevel: 1
adding queue item: 0
adding queue item: 1
adding queue item: 2
adding queue item: 3
adding queue item: 4
adding queue item: 5
adding queue item: 6
adding queue item: 7
adding queue item: 8
adding queue item: 9
AsyncMessenger: start
AsyncMessenger: run
AsyncMessenger: run: connection.ioloop.start
AsyncMessenger: on_open
AsyncMessenger: on_channel_open
AsyncMessenger: queue msg: 0
AsyncMessenger: queue msg: 1
AsyncMessenger: queue msg: 2
AsyncMessenger: queue msg: 3
AsyncMessenger: queue msg: 4
AsyncMessenger: queue msg: 5
AsyncMessenger: queue msg: 6
AsyncMessenger: queue msg: 7
AsyncMessenger: queue msg: 8
AsyncMessenger: queue msg: 9
AsyncMessenger: close
All work completed
add_timeout
is renamed tocall_later
in the newer pika versions. Maybe you could update your answer. – Cyclo