Celery: correct way to run lengthy initialization function (per process)
Asked Answered
I

2

15

TLDR;

To run an initialization function for each process that is spawned by celery, you can use the worker_process_init signal. As you can read in the docs, handlers for that signal should not be blocking for more than 4 seconds. But what are the options, if I have to run an init function that takes more than 4 seconds to execute?

Problem

I use a C extension module to run certain operations within celery tasks. This module requires an initialization that might take several seconds (maybe 4 - 10). Since I would rather prefer not to run this init function for every task but for every process that is spawned, I made use of the worker_process_init signal:

#lib.py 
import isclient #c extension module
client = None
def init():
    global client
    client = isclient.Client() #this might take a while

def create_ne_list(text):
    return client.ne_receiventities4datachunk(text)

#celery.py
from celery import Celery
from celery.signals import worker_process_init
from lib import init

celery = Celery(include=[
    'isc.ne.tasks'
])

celery.config_from_object('celeryconfig')

@worker_process_init.connect
def process_init(sender=None, conf=None, **kwargs):
    init()

if __name__ == '__main__':
    celery.start()

#tasks.py
from celery import celery
from lib import create_ne_list as cnl

@celery.task(time_limit=1200)
def create_ne_list(text):
    return cnl(text)

What happens, when I run this code is what I described in my earlier question (Celery: stuck in infinitly repeating timeouts (Timed out waiting for UP message)). In short: since my init function takes longer than 4 seconds, it sometimes happens that a worker gets killed and restarted and during the restarting process gets killed again, because that's what automatically happens after 4 seconds unresponsiveness. This eventually results in an infinite repeating kill-and-restart process.

Another option would be to run my init function only once for every worker, using the signal worker_init. If I do that, I get a different problem: Now the queued up processes get stuck for some reason. When I start the worker with a concurrency of 3, and then send a couple of tasks, the first three will get finished, the remaining ones won't get touched. (I assume it might have something to do with the fact, that the client objects needs to be shared between multiple processes and that the C extension, for some reasons, doesn't support that. But to be honest, I'm relatively new to muli-processing, so I can just guess)

Question

So, the question remains: How can I run an init function per process that takes longer than 4 seconds? Is there a correct way to do that and what way would that be?

Instate answered 13/6, 2014 at 10:6 Comment(5)
How is this not a duplicate of your previous question?Brocket
@Brocket it's not at all? How should it be a duplicate? It's more of a follow-up question.Instate
I think on later reading I get the difference. The problem really is that neither post has a clear problem statement, so it's not obvious what answers are wanted.Brocket
@Brocket good point. I edited this question and hope it is now a bit clearer what I'm asking.Instate
I'm pretty sure that a subclassed task class will run its init function once per process. You should be able to put your long init function in there. You'll have to refactor a bit and add base=YourTaskSubclass to the task decorator.Yonina
P
13

Celery limits to process init timeout to 4.0 sec. Check source code

To workaround this limit, you can consider change it before you create celery app

from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 10.0 #set this long enough

Note that there is no configuration or setting to change this value.

Pitiless answered 17/7, 2015 at 5:46 Comment(0)
B
8

@changhwan's answer is no longer the only method as of celery 4.4.0. Here is the pull request that added the config option for this feature.

Use the config option

With celery ^4.4.0, this value is configurable. Use the celery application config option worker_proc_alive_timeout. From the stable version docs:

worker_proc_alive_timeout

Default: 4.0.

The timeout in seconds (int/float) when waiting for a new worker process to start up.

Example:

from celery import Celery
from celery.signals import worker_process_init

app = Celery('app')
app.conf.worker_proc_alive_timeout = 10

@worker_process_init.connect
def long_init_function(*args, **kwargs):
   import time
   time.sleep(8)
Beery answered 21/6, 2021 at 19:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.