TLDR;
To run an initialization function for each process that is spawned by celery, you can use the worker_process_init
signal. As you can read in the docs, handlers for that signal should not be blocking for more than 4 seconds.
But what are the options, if I have to run an init function that takes more than 4 seconds to execute?
Problem
I use a C extension module to run certain operations within celery tasks. This module requires an initialization that might take several seconds (maybe 4 - 10). Since I would rather prefer not to run this init function for every task but for every process that is spawned, I made use of the worker_process_init
signal:
#lib.py
import isclient #c extension module
client = None
def init():
global client
client = isclient.Client() #this might take a while
def create_ne_list(text):
return client.ne_receiventities4datachunk(text)
#celery.py
from celery import Celery
from celery.signals import worker_process_init
from lib import init
celery = Celery(include=[
'isc.ne.tasks'
])
celery.config_from_object('celeryconfig')
@worker_process_init.connect
def process_init(sender=None, conf=None, **kwargs):
init()
if __name__ == '__main__':
celery.start()
#tasks.py
from celery import celery
from lib import create_ne_list as cnl
@celery.task(time_limit=1200)
def create_ne_list(text):
return cnl(text)
What happens, when I run this code is what I described in my earlier question (Celery: stuck in infinitly repeating timeouts (Timed out waiting for UP message)). In short: since my init function takes longer than 4 seconds, it sometimes happens that a worker gets killed and restarted and during the restarting process gets killed again, because that's what automatically happens after 4 seconds unresponsiveness. This eventually results in an infinite repeating kill-and-restart process.
Another option would be to run my init function only once for every worker, using the signal worker_init
. If I do that, I get a different problem: Now the queued up processes get stuck for some reason.
When I start the worker with a concurrency of 3, and then send a couple of tasks, the first three will get finished, the remaining ones won't get touched. (I assume it might have something to do with the fact, that the client
objects needs to be shared between multiple processes and that the C extension, for some reasons, doesn't support that. But to be honest, I'm relatively new to muli-processing, so I can just guess)
Question
So, the question remains: How can I run an init function per process that takes longer than 4 seconds? Is there a correct way to do that and what way would that be?
base=YourTaskSubclass
to the task decorator. – Yonina