i have an issue that happens only on my publisher side of pika.
the architecture of the project:
- Kubernetes cluster:
- Flask Server Pod:
- publisher running
- Consumer Pod:
- Consumer Running
the publisher is running a BlockingConnection class for a single thread use. the consumer is running a SelectConnection class for a multi threaded use.
everytime i am getting a request to my flask server, i am doing some data handling and then posting a message to my relevant queue
for example, this is how i am publishing to the first queue:
def publish_queue_1_message(self, message):
with self.QUEUES[self.QUEUE_1]["lock"]:
self._publish_message(self.QUEUE_1, message)
@pika_publish_decorator
def _publish_message(self, queue, message):
if self.QUEUES[queue]["channel"] is None or not self.QUEUES[queue]["channel"].is_open:
raise RabbitNoChannelException
self.QUEUES[queue]["channel"].basic_publish(
self.EXCHANGE,
self.QUEUES[queue]["routing_key"],
message
)
Note: self.QUEUES is a dictionary holding the channel that i assign once creating the channel and a threading lock. i am using the lock in order to lock the channel to post only one message at a time in case i'm getting a lot of requests from the flask server
@pika_publish_decorator is a decorator that handles the case that if there is any error, then he tried to recreate the connection and the channel and tries to run the function again.
for some reason i'm getting this error a lot of times from pika.adapters.utils.io_services_utils logger.
_AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=17, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.240.0.165', 47600)>; Caller's stack:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 1103, in _on_socket_writable
self._produce()
File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 819, in _produce
num_bytes_sent = self._sigint_safe_send(self._sock,
File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send
return sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
if you could help me with this, this would be amazing! thank you very much!