I have a task queue in RabbitMQ with multiple producers (12) and one consumer for heavy tasks in a webapp. When I run the consumer it starts dequeuing some of the messages before crashing with this error:
Traceback (most recent call last):
File "jobs.py", line 42, in <module> jobs[job](config)
File "/home/ec2-user/project/queue.py", line 100, in init_queue
channel.start_consuming()
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events
self._flush_output(common_terminator)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "error(104, 'Connection reset by peer')")
The producers code is:
message = {'image_url': image_url, 'image_name': image_name, 'notes': notes}
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks_queue')
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))
connection.close()
And the only consumer's code (the one is clashing):
def callback(self, ch, method, properties, body):
"""Callback when receive a message."""
message = json.loads(body)
try:
image = _get_image(message['image_url'])
except:
sys.stderr.write('Error getting image in note %s' % note['id'])
# Crop image with PIL. Not so expensive
box_path = _crop(image, message['image_name'], box)
# API call. Long time function
result = long_api_call(box_path)
if result is None:
sys.stderr.write('Error in note %s' % note['id'])
return
# update the db
db.update_record(result)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks_queue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback_obj.callback, queue='tasks_queue', no_ack=True)
channel.start_consuming()
As you can see, there are 3 expensive functions for message. One crop task, one API call and one database update. Without the API call, que consumer runs smoothly.
Thanks in advance
Connection reset by peer
means that something interrupted your TCP connection unexpectedly. I expect to see a similar message logged by RabbitMQ. – Petepetechia