RabbitMQ pika.exceptions.ConnectionClosed
Asked Answered
I

4

9

I tried to send message and receive message using RabbitMQ. I dont have computer science background, the terms I used could not be very accurate.

I try to copy the tutorial file: When submitting my html form, my python script (cgi) the message is submitting to the queue

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='task_queue', durable=True)
        message = PN
        channel.basic_publish(exchange='',
                              routing_key='task_queue',
                              body=message,
                              properties=pika.BasicProperties(
                                 delivery_mode = 2, # make message persistent
                              ))
        connection.close()

my receiver is running :

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received Project %r" % body)
    #ch.basic_ack(delivery_tag = method.delivery_tag) 
    if not (os.path.isfile(js_path)):
        print (' [*] ERROR files missing ')
        #ch.basic_ack(delivery_tag = method.delivery_tag)
        return
    p= subprocess.Popen(run a subprocess here)
    p.wait()

    print (' [*] Temporary Files removed')
    print(" [*] Waiting for messages. To exit press CTRL+C")

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='task_queue',no_ack=True)
channel.start_consuming()

It manages most of the time but randomly crash with the following error:

Traceback (most recent call last):
   File "Receive5.py", line 139, in <module>
    channel.start_consuming()   
   File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 1681, in start_consuming
    self.connection.process_data_events(time_limit=None)
   File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 647, in process_data_events
    self._flush_output(common_terminator)
   File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 426, in _flush_output
    raise exceptions.ConnectionClosed() pika.exceptions.ConnectionClosed
Inelegance answered 19/5, 2016 at 10:53 Comment(4)
you should check the logs, for some reason the connection is closed on the consumerMildred
looks to be a "heartbeat" problem..¨=ERROR REPORT==== 20-May-2016::08:36:28 === closing AMQP connection ... missed heartbeats from client, timeout: 60s =INFO REPORT==== 20-May-2016::08:40:22 === accepting AMQP connection ... =INFO REPORT==== 20-May-2016::08:44:51 === accepting AMQP connection ... =INFO REPORT==== 20-May-2016::08:44:51 === closing AMQP connection ... =ERROR REPORT==== 20-May-2016::08:46:22 === closing AMQP connection ... missed heartbeats from client, timeout: 60s =INFO REPORT==== 20-May-2016::09:38:51 === accepting AMQP connection ...Inelegance
issue solved by setting the heartbeats to 0 ...Inelegance
it been almost two years, I don't think this issue has been handled or documented better :(Maecenas
P
18

This is because you are keeping the main thread waiting, and because of this pika cannot handle incoming messages; in this case it cannot respond to the heartbeat until the subprocess is done. This causes RabbitMQ to think that the client is dead and forces a disconnection.

If you want this to work with heartbeats (which is recommend) you need to periodically call connection.process_data_events. This can be done by adding a loop that checks if the thread is done, and every 30s or so call process_data_events until the thread is done.

Passant answered 30/5, 2016 at 14:25 Comment(2)
just curious, is the RabbitMQ tutorial missing this part or a bad use of the tutorial from my side? (rabbitmq.com/tutorials/tutorial-two-python.html)Inelegance
I would say that this is largely undocumented. I went as far as creating my own AMQP library to avoid this problem.Passant
P
4

Here is pika document about how to avoid the connection being dropped because of heartbeat.

https://pika.readthedocs.io/en/stable/examples/heartbeat_and_blocked_timeouts.html

In a version of pika older than 0.11.2, although we can add an argument inside the pika.ConnectionParameters: heartbeat_interval=600, but it cannot help if the server side has a short heartbeat value of 60s. It can work only when the version is at least 0.11.2

Platen answered 4/2, 2021 at 5:53 Comment(0)
C
3

Look add this https://github.com/mosquito/aio-pika

It's an asynchio wrapper and if u understand the concept behind asynchron very easy to use :)

Collbaith answered 13/5, 2017 at 18:58 Comment(0)
H
-3

it seems that it is not connecting to RabbitMQ at 127.0.0.1:5672 -- are you sure RabbitMQ is running and listening on 127.0.0.1:5672?

for linux (debian) users

You can check if RabbitMQ is installed by typing this command.

sudo service rabbitmq-server status

If you get back a response, that means it it installed. Check if it is running or not?

to start rabbitMQ serve type

sudo service rabbitmq-server start

to restart rabbitMq use this command

sudo service rabbitmq-server restart

No response means that you don't have Rabbitmq installed. install it by typing the following commands.

sudo apt-get update

sudo apt-get -y upgrade

sudo apt-get install rabbitmq-server

And then start the server by typing the start command above.

Hooey answered 19/2, 2018 at 13:27 Comment(1)
downwoting because the answer is completely besides the pointSculpturesque

© 2022 - 2024 — McMap. All rights reserved.