I'd like to implement a RabbitMQ topology similar to Option 3 here, except for some differences:
The new topology should handle a few 1000 messages per day. And it shall have two exchanges: one to deal with the main queues (about 30), the other to deal with retry and error queues (about 60). I've been following this tutorial and the usual RMQ tutorials, plus many SO posts. The RMQ server is fired up in a Docker container.
The problem I am facing is that the not all messages are being picked up by the consumer, and the sequence of receiving the messages is unexpected. I'm also seeing the same message being rejected twice. Here's my code:
exchanges.py
def callback(self, channel, method, properties, body):
print("delivery_tag: {0}".format(method.delivery_tag))
data = json.loads(body)
routingKey = data.get('routing-key')
routingKey_dl_error = queues_dict[routingKey]['error']
print(" [X] Got {0}".format(body))
print(" [X] Received {0} (try: {1})".format(data.get('keyword'), int(properties.priority)+1))
# redirect faulty messages to *.error queues
if data.get('keyword') == 'FAIL':
channel.basic_publish(exchange='exchange.retry',
routing_key=routingKey_dl_error,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2,
priority=int(properties.priority),
timestamp=int(time.time()),
headers=properties.headers))
print(" [*] Sent to error queue: {0}".format(routingKey_dl_error))
time.sleep(5)
channel.basic_ack(delivery_tag=method.delivery_tag) #leaving this in creates 1000s of iterations(?!)
# check number of sent counts
else:
# redirect messages that exceed MAX_RETRIES to *.error queues
if properties.priority >= MAX_RETRIES - 1:
print(" [!] {0} Rejected after {1} retries".format(data.get('keyword'), int(properties.priority) + 1))
channel.basic_publish(exchange='exchange.retry',
routing_key=routingKey_dl_error,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2,
priority=int(properties.priority),
timestamp=int(time.time()),
headers=properties.headers))
print(" [*] Sent to error queue: {0}".format(routingKey_dl_error))
#channel.basic_ack(delivery_tag=method.delivery_tag)
else:
timestamp = time.time()
now = datetime.datetime.now()
expire = 1000 * int((now.replace(hour=23, minute=59, second=59, microsecond=999999) - now).total_seconds())
# to reject job we create new one with other priority and expiration
channel.basic_publish(exchange='exchange_main',
routing_key=routingKey,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2,
priority=int(properties.priority) + 1,
timestamp=int(timestamp),
expiration=str(expire),
headers=properties.headers))
# send back acknowledgement about job
channel.basic_ack(delivery_tag=method.delivery_tag) # nack or reject???
print("[!] Rejected. Going to sleep for a while...")
time.sleep(5)
def exchange(self):
# 1 - connect and channel setup
parameters = "..."
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 2 - declare exchanges
# declares the main exchange to be used by all producers to send messages. External facing
channel.exchange_declare(exchange='exchange_main',
exchange_type='direct',
durable=True,
auto_delete=False)
# declares the dead letter exchange. Routes messages to *error and *retry queues. Internal use only
channel.exchange_declare(exchange='exchange.retry',
exchange_type='direct',
durable=True,
auto_delete=False)
# 3- bind the external facing exchange to the internal exchange
#channel.exchange_bind(destination='exchange.retry', source='exchange_main')
# 4 - declare queues
# Create durable queues bound to the exchange_main exchange
for queue_name in self.queueName_list:
queueArgs = {
"x-message-ttl": 5000,
"x-dead-letter-exchange": 'exchange.retry',
#"x-dead-letter-routing-key": queue_name + '.retry'
}
channel.queue_declare(queue=queue_name, durable=True, arguments=queueArgs)
# Create durable queues bound to the exchange.retry exchange
'''
for queue_dl_name in self.queueName_dl_list:
if queue_dl_name[-5:] == 'retry':
queueArgs_retry = {
"x-message-ttl": 5000,
"x-dead-letter-exchange": 'exchange_main',
"x-dead-letter-routing-key": queue_dl_name[:-6]
}
channel.queue_declare(queue=queue_dl_name, durable=True, arguments=queueArgs_retry)
else:
channel.queue_declare(queue=queue_dl_name, durable=True)
'''
for queue_dl_name in self.queueName_dl_list:
channel.queue_declare(queue=queue_dl_name, durable=True)
# 5 - bind retry and main queues to exchanges
# bind queues to exchanges. Allows for messages to be saved when no consumer is present
for queue_name in self.queueName_list:
channel.queue_bind(queue=queue_name, exchange='exchange_main')
for queue_dl_name in self.queueName_dl_list:
channel.queue_bind(queue=queue_dl_name, exchange='exchange.retry')
# 6 - don't dispatch a new message to worker until processed and acknowledged the previous one, dispatch to next worker instead
channel.basic_qos(prefetch_count=1)
# 7 - consume the message
all_queues = self.queueName_list + self.queueName_dl_list
for queue in all_queues:
channel.basic_consume(queue=queue,
on_message_callback=self.callback,
auto_ack=False)
print ('[*] Waiting for data for:')
for queue in all_queues:
print(' ' + queue)
print ('[*] To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
channel.close()
connection.close()
producer.py
# 1 - connect and channel setup
parameters = "..."
try:
connection = pika.BlockingConnection(parameters)
except pika.exceptions.AMQPConnectionError as err:
print("AMQP connection failure. Ensure RMQ server is running.")
raise err
channel = connection.channel() # create a channel in TCP connection
# 2 - Turn on delivery confirmations (either a basic.ack or basic.nack)
channel.confirm_delivery()
# 3 - send message to rmq
print(" [*] Sending message to create a queue")
# set header parameters
count = 3
for i in range(1, count + 1):
if self.keyword is None:
message = "data {0}".format(i)
else:
message = self.keyword
timestamp = time.time()
now = datetime.datetime.now()
expire = 1000 * int((now.replace(hour=23, minute=59, second=59, microsecond=999999) - now).total_seconds())
headers = dict()
headers['RMQ_Header_Key'] = self.queueName
headers['x-retry-count'] = 0
headers['x-death'] = None
data = {
'routing-key': self.queueName,
'keyword': message,
'domain': message,
'created': int(timestamp),
'expire': expire
}
# properties are often uses for bits of data that your code needs to have, but aren't part of the actual message body.
channel.basic_publish(
exchange='exchange_main',
routing_key=self.queueName,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2, # makes persistent job
priority=0, # default priority
timestamp=int(timestamp), # timestamp of job creation
expiration=str(expire), # job expiration
headers=headers
))
print(" [*] Sent message: {0} via routing key: {1}".format(message, self.queueName))
# 4 - close channel and connection
channel.close()
connection.close()
After firing up the exchange.py
, I then send from my command line in another Terminal window: python3 producer.py queue1
And then get:
delivery_tag: 1
[X] Got b'{"routing-key": "queue1", "keyword": "data 1", "domain": "data 1", "created": 1567068725, "expire": 47274000}'
[X] Received data 1 (try: 1)
[!] Rejected. Going to sleep for a while...
delivery_tag: 2
[X] Got b'{"routing-key": "queue1", "keyword": "data 1", "domain": "data 1", "created": 1567068725, "expire": 47274000}'
[X] Received data 1 (try: 2)
[!] Rejected. Going to sleep for a while...
delivery_tag: 3
[X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
[X] Received data 3 (try: 1)
[!] Rejected. Going to sleep for a while...
delivery_tag: 4
[X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
[X] Received data 3 (try: 2)
[!] Rejected. Going to sleep for a while...
delivery_tag: 5
[X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
[X] Received data 3 (try: 3)
[!] data 3 Rejected after 3 retries
[*] Sent to error queue: queue1.error
delivery_tag: 6
[X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
[X] Received data 3 (try: 3)
[!] data 3 Rejected after 3 retries
[*] Sent to error queue: queue1.error
Questions:
- Is my code current implementation corresponding to my desired topology?
- Direct vs topic: is a direct exchange the most optimal/efficient solution in this case?
- One exchange vs two: is it advised to stick to 2 exchanges, or can this be simplified to just one exchange for everything?
- How do I test for messages which are not normal i.e. sent to the retry loop section of my callback function? The callback currently doesn't handle "normal" messages (i.e. without retries been triggered or simply failed messages).
- Is binding the two exchanges necessary? Commenting out this code made no difference.
- Do I need to implement arguments (
channel.queue_declare
) to both dead letter and non-dead letter queues? I know that the non-dead letter queues are to have the arguments declared, whereby thex-dead-letter-exchange
is set, but I'm not sure whether thex-dead-letter-routing-key
should also be set too. - Do I need
ack
/nack
every time a message is published, because I notice differing behaviour when this is and isn't implemented (i.e. w/o ack the FAIL message is not sent 3 times but only two, with ack it sent more than 3) - In the output above, "data 1" is only consumed twice, "data 2" doesn't appear at all, and "data 3" reaches the
MAX_RETRY
limit of 3 times but then gets sent to the *.error queue twice (not once), which I find strange. What is RMQ doing here?