I use pika
basic_get
method to get message from rabbitmq
and basic_ack
method to do ack.
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host="rabbitmq",
heartbeat=0
)
)
channel = connection.channel()
def mq_record():
method_frame, _, body = channel.basic_get("test")
if not body:
return False, None
if method_frame:
channel.basic_ack(delivery_tag=method_frame.delivery_tag, multiple=True)
rev_mq = eval(body.decode())
task_id = rev_mq.get("task_id")
level = rev_mq.get("level")
return task_id, level
task_id, level = mq_record()
And here comes the error.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2159, in basic_get
self._basic_getempty_result.is_ready)
File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "/usr/local/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
I have learned from the answer,
Your consume process is probably taking too much time to complete and send Ack/Nack to the server. Therefore, server does not receive heartbeat from your client, and thereby stops from serving. Then, on the client side you receive the error.
However, in my case, we don't take too much time to ack. Why there is such an error? Anyone knows, thanks in advance.