RabbitMQ pika.exceptions.ConnectionClosed (-1, "error(104, 'Connection reset by peer')")
Asked Answered
H

2

8

I have a task queue in RabbitMQ with multiple producers (12) and one consumer for heavy tasks in a webapp. When I run the consumer it starts dequeuing some of the messages before crashing with this error:

Traceback (most recent call last):
File "jobs.py", line 42, in <module> jobs[job](config)
File "/home/ec2-user/project/queue.py", line 100, in init_queue
channel.start_consuming()
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events
self._flush_output(common_terminator)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "error(104, 'Connection reset by peer')")

The producers code is:

message = {'image_url': image_url, 'image_name': image_name, 'notes': notes}

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='tasks_queue')
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))

connection.close()

And the only consumer's code (the one is clashing):

def callback(self, ch, method, properties, body):
    """Callback when receive a message."""
    message = json.loads(body)
    try:
        image = _get_image(message['image_url'])
    except:
        sys.stderr.write('Error getting image in note %s' % note['id'])
   # Crop image with PIL. Not so expensive
   box_path = _crop(image, message['image_name'], box)

   # API call. Long time function
   result = long_api_call(box_path)

   if result is None:
       sys.stderr.write('Error in note %s' % note['id'])
       return
   # update the db
   db.update_record(result)


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks_queue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback_obj.callback, queue='tasks_queue', no_ack=True)
channel.start_consuming()

As you can see, there are 3 expensive functions for message. One crop task, one API call and one database update. Without the API call, que consumer runs smoothly.

Thanks in advance

Hypogeous answered 24/10, 2018 at 15:52 Comment(3)
Please provide information about your environment - what versions of software you're using, are you using Docker, are you using a load balancer, is there anything logged by RabbitMQ. Connection reset by peer means that something interrupted your TCP connection unexpectedly. I expect to see a similar message logged by RabbitMQ.Petepetechia
Hello. I have rabbitmq 3.7.0 running on a Amazon Linux EC2 Instance. No docker or load balancer. Also, this code. result = long_api_call(box_path) is behind a try catch block, so is supposed to be fault tolerant. This long_api_call points to an external service with a currently unstable internet conection, so is not rare than some of the callback calls just don't work. But the error shouldn't drop the consumer with this weird error. My rabbitmq log_file:Hypogeous
2018-10-25 06:04:54.854 [info] <0.7436.0> closing AMQP connection <0.7436.0> (127.0.0.1:42882 -> 127.0.0.1:5672, vhost: '/', user: 'guest') 2018-10-25 06:05:14.740 [warning] <0.5202.0> closing AMQP connection <0.5202.0> (127.0.0.1:32816 -> 127.0.0.1:5672): missed heartbeats from client, timeout: 60s 2018-10-25 06:06:59.367 [info] <0.7460.0> accepting AMQP connection <0.7460.0> (127.0.0.1:43332 -> 127.0.0.1:5672) 2018-10-25 06:06:59.370 [info] <0.7460.0> connection <0.7460.0> (127.0.0.1:43332 -> 127.0.0.1:5672): user 'guest' authenticated and grantedHypogeous
P
15

Your RabbitMQ log shows a message that I thought we might see:

missed heartbeats from client, timeout: 60s

What's happening is that your long_api_call blocks Pika's I/O loop. Pika is a very lightweight library and does not start threads in the background for you so you must code in such a way as to not block Pika's I/O loop longer than the heartbeat interval. RabbitMQ thinks your client has died or is unresponsive and forcibly closes the connection.

Please see my answer here which links to this example code showing how to properly execute a long-running task in a separate thread. You can still use no_ack=True, you will just skip the ack_message call.

Petepetechia answered 25/10, 2018 at 14:13 Comment(1)
You're a life saver. That stucks with me for the last 3 days, and now it works. Thanks!Hypogeous
S
1

Starting with RabbitMQ 3.5.5, the broker’s default heartbeat timeout decreased from 580 seconds to 60 seconds.

See pika: Ensuring well-behaved connection with heartbeat and blocked-connection timeouts.

The simplest fix is to increase the heartbeat timeout:

rabbit_url = host + "?heartbeat=360"
conn = pika.BlockingConnection(pika.URLParameters(rabbit_url))

# or

params = pika.ConnectionParameters(host, heartbeat=360)
conn = pika.BlockingConnection(params)
Sprit answered 14/10, 2021 at 23:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.