Celery worker connection with Rabbitmq met broken pipe error In gevent or eventlet Mode
Asked Answered
T

1

6

I run to issue that Celery worker connection with Rabbitmq met broken pipe error IN Gevent Mode. While no problem when Celery worker work in Process pool mode (without gevent without monkey patch).

After that, Celery workers will not get task messages from Rabbitmq anymore until they are restarted.

That issue always happen when the speed of Celery workers consuming task messages slower than Django applications producing messages, and about 3 thounds of messages piled in Rabbitmq.

Gevent version 1.1.0

Celery version 3.1.22

====== Celery log ======

[2016-08-08 13:52:06,913: CRITICAL/MainProcess] Couldn't ack 293, reason:error(32, 'Broken pipe')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 93, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 412, in sendall
    timeleft = self.__send_chunk(chunk, flags, timeleft, end)
  File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 351, in __send_chunk
    data_sent += self.send(chunk, flags)
  File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 320, in send
    return sock.send(data, flags)
error: [Errno 32] Broken pipe

======= Rabbitmq log ==================

=ERROR REPORT==== 8-Aug-2016::14:28:33 ===
closing AMQP connection <0.15928.4> (10.26.39.183:60732 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}

=ERROR REPORT==== 8-Aug-2016::14:29:03 ===
closing AMQP connection <0.15981.4> (10.26.39.183:60736 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}

=ERROR REPORT==== 8-Aug-2016::14:29:03 ===
closing AMQP connection <0.15955.4> (10.26.39.183:60734 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}

The similar issue appears when Celery worker use eventlet.

[2016-08-09 17:41:37,952: CRITICAL/MainProcess] Couldn't ack 583, reason:error(32, 'Broken pipe')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 93, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 385, in sendall
    tail = self.send(data, flags)
  File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 379, in send
    return self._send_loop(self.fd.send, data, flags)
  File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 366, in _send_loop
    return send_method(data, *args)
error: [Errno 32] Broken pipe

Add setup and load test info

We use supervisor to launch celery with the following options

celery worker -A celerytasks.celery_worker_init -Q default -P gevent -c 1000 --loglevel=info

And Celery use Rabbitmq as broker.

And we have 4 Celery worker processes by specifying "numprocs=4" in supervisor configurations.

We use jmeter to emulate web access load, Django applications will produces tasks for Celery workers to consume. Those tasks basically need to access Mysql DB to get/update some data.

From rabbitmq web admin page, tasks-producing speed is like 50 per seconds while consuming speed is like 20 per seconds. After about 1 miniutes load testing, log file shows many connections between Rabbitmq and Celery met Broken-Pipe error

Technician answered 9/8, 2016 at 1:25 Comment(11)
Can you give a bit more details regarding your setup and the work load that you are trying to test? From the error it seems that it's just a network issue, but it might be related to other things.Elegiac
@lesingerouge, add those info on bottom of my post. I don't think it's random network issue, because that issue could be reproduced every time we conduct testing. While Celery workers are in process-pool mode, every thing goes well.Technician
Ask, the author of Celery, doubt it is a bug. I'm curious nobody has been met it before for such pretty wide used tool? I did not find similar post after googling.Technician
What operating system are you using to run these tests?Elegiac
Sorry, no solution, but highly recommend to get rid of RabbitMQ in favour of Redis storage for Celery. Also, be aware of similar broken pipe issue talking to mysql: github.com/celery/django-celery/commit/…Rousseau
@lesingerouge, it runs on Centos6.5 x64Technician
@temoto, Switching broker to Redis is big risky move for us, since we have used Rabbitmq for about 1 year. And it works well if Celery workers worked in process-pool mode. Just met that issue since we try to change to gevent mode since we learned many benifits of using greenlet. And it looks more like the issue of Celery instead of Rabbitmq.Technician
Have you tried changing the number of green threads when you start the celery worker? For example, testing with 100, 200, 500 threads to see if there's any difference? I am asking because this sounds like a TCP port issue. I have run into this issue on Windows, but never on Linux.Elegiac
Broken pipe is an exception that Python throws when TCP connection was established (so not a port issue), then closed, then read/write attempted. Indeed this is clearly Celery issue to not handle the exception without silent retry. I was not implying this is RabbitMQ issue, but very probably switch to Redis task storage would also fix this problem. Because Celery-Redis driver is different from Celery-RabbitMQ. Of course, the very best course of action is to fix Celery-AMQP driver (kombu) to retry requests on EPIPE.Rousseau
@temoto, I got it, thank you. Will try redis if it could not be fixed.Technician
Hi, guys, maybe I found one clue to reproduce that. That issue alway happen when the speed of Celery workers consuming task messages slower than Django applications producing messages, and about 3 thounds of messages piled in Rabbitmq.Technician
O
2

We noticed that this issue is also caused because of a combination of high prefect count along with high concurrency.

We had concurrency set to 500 and prefetch to 100, which means the ultimate prefetch is 500*100=50,000 per worker. We had around 100k tasks piled up and because of this configuration one worker reserved all tasks for itself and other workers weren't even used, this one worker kept getting Broken pipe error and never acknowledge any task which lead to tasks being never cleared from the queue.

We then changed the prefetch to 3 and restarted all the workers which fixed the issue, after changing the prefetch down to a lower number we have seen 0 instances of Broken pipe error since we used to see it quite often before that.

Oviparous answered 9/8, 2022 at 7:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.