Python3 RabbitMQ Pika ConnectionResetError on publisher only
Asked Answered
A

0

6

i have an issue that happens only on my publisher side of pika.

the architecture of the project:

- Kubernetes cluster:
    - Flask Server Pod:
        - publisher running
    - Consumer Pod:
        - Consumer Running

the publisher is running a BlockingConnection class for a single thread use. the consumer is running a SelectConnection class for a multi threaded use.

everytime i am getting a request to my flask server, i am doing some data handling and then posting a message to my relevant queue

for example, this is how i am publishing to the first queue:

def publish_queue_1_message(self, message):
    with self.QUEUES[self.QUEUE_1]["lock"]:
        self._publish_message(self.QUEUE_1, message)

@pika_publish_decorator
def _publish_message(self, queue, message):
    if self.QUEUES[queue]["channel"] is None or not self.QUEUES[queue]["channel"].is_open:
        raise RabbitNoChannelException

    self.QUEUES[queue]["channel"].basic_publish(
        self.EXCHANGE,
        self.QUEUES[queue]["routing_key"],
        message
    )

Note: self.QUEUES is a dictionary holding the channel that i assign once creating the channel and a threading lock. i am using the lock in order to lock the channel to post only one message at a time in case i'm getting a lot of requests from the flask server

@pika_publish_decorator is a decorator that handles the case that if there is any error, then he tried to recreate the connection and the channel and tries to run the function again.

for some reason i'm getting this error a lot of times from pika.adapters.utils.io_services_utils logger.

_AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=17, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.240.0.165', 47600)>; Caller's stack:
Traceback (most recent call last):

  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 1103, in _on_socket_writable
    self._produce()

  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 819, in _produce
    num_bytes_sent = self._sigint_safe_send(self._sock,

  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
    return func(*args, **kwargs)

  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send
    return sock.send(data)

ConnectionResetError: [Errno 104] Connection reset by peer

if you could help me with this, this would be amazing! thank you very much!

Arnoldoarnon answered 15/12, 2021 at 11:29 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.