celery: daemonic processes are not allowed to have children
Asked Answered
T

4

24

In Python (2.7) I try to create processes (with multiprocessing) in a celery task (celery 3.1.17) but it gives the error:

daemonic processes are not allowed to have children

Googling it, I found that most recent versions of billiard fix the "bug" but I have the most recent version (3.3.0.20) and the error is still happening. I also tried to implement this workaround in my celery task but it gives the same error.

Does anybody know how to do it? Any help is appreciated, Patrick

EDIT: snippets of code

Task:

from __future__ import absolute_import
from celery import shared_task
from embedder.models import Embedder

@shared_task
def embedder_update_task(embedder_id):
    embedder = Embedder.objects.get(pk=embedder_id)
    embedder.test()

Artificial test function (from here):

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t    

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = mp.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test(self):
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

My real function:

import mulitprocessing as mp

def test(self):
    self.init()
    for saveindex in range(self.start_index,self.start_index+self.nsaves):
        self.create_storage(saveindex)
        # process creation:
        procs = [mp.Process(name="Process-"+str(i),target=getattr(self,self.training_method),args=(saveindex,)) for i in range(self.nproc)]
        for p in procs: p.start()
        for p in procs: p.join()
    print "End of task"

The init function defines a multiprocessing array and an object that share the same memory so that all my processes can update this same array at the same time:

mp_arr = mp.Array(c.c_double, np.random.rand(1000000)) # example
self.V = numpy.frombuffer(mp_arr.get_obj()) #all the processes can update V

Error generated when task is called:

[2015-06-04 09:47:46,659: INFO/MainProcess] Received task: embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda]
[2015-06-04 09:47:47,674: WARNING/Worker-5] Creating 5 (non-daemon) workers and jobs in main process.
[2015-06-04 09:47:47,789: ERROR/MainProcess] Task embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda]     raised unexpected: AssertionError('daemonic processes are not allowed to have children',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
   R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
   return self.run(*args, **kwargs)
  File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/tasks.py", line 21, in embedder_update_task
    embedder.test()
  File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/models.py", line 475, in test
    pool = MyPool(5)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
self._repopulate_pool()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool
    w.start()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 124, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
Tantalize answered 3/6, 2015 at 15:21 Comment(6)
please update your question with the snippet of code that causes the exception and the full exception.Frogfish
Added my real code (as opposed to the artificial one). Thank you scytale for your help, it's very appreciated.Tantalize
hm.... there's a lot of OO in there (are you a Java person? :-) OO and distributed processing can lead to excessive complication. Plus you're missing the class definition (test() is a method of a class, right?). Plus you should try to show training_method() (or a representative sample of it). is it possible to move the functionality of training_method() into a function? That would make it easier to integrate it with celery (or multiprocessing for that matter).Frogfish
oh wait... test() is a method of the Embedder class, right? if so then they should be in the same code block for clarity. And Embedder is a Django model, right? You should make all this clear - it's very relevant.Frogfish
Yes. Embedder is a Django model and test() is one of its method. The only uses of multiprocessing module is in Embedder init() method (creation of multiprocessing array) and in test() method (creation of processes, start and join). The self.training_method references the function used for learning (test() is actually learn()).Tantalize
oh wait "object that share the same memory so that all my processes can update this same array at the same time" - I only just noticed that.... sighFrogfish
F
12

billiard and multiprocessing are different libraries - billiard is the Celery project's own fork of multiprocessing. You will need to import billiard and use it instead of multiprocessing

However the better answer is probably that you should refactor your code so that you spawn more Celery tasks instead of using two different ways of distributing your work.

You can do this using Celery canvas

from celery import group

@app.task
def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t    

def work(num_procs):
    return group(sleepawhile.s(randint(1, 5)) for x in range(num_procs)])

def test(self):
    my_group = group(work(randint(1, 5)) for x in range(5))
    result = my_group.apply_async()
    result.get()

I've attempted to make a working version of your code that uses canvas primitives instead of multiprocessing. However since your example was quite artificial it's not easy to come up with something that makes sense.

Update:

Here is a translation of your real code that uses Celery canvas:

tasks.py:

@shared_task
run_training_method(saveindex, embedder_id):
    embedder = Embedder.objects.get(pk=embedder_id)
    embedder.training_method(saveindex)

models.py:

from tasks import run_training_method
from celery import group

class Embedder(Model):

    def embedder_update_task(self):
        my_group = []

        for saveindex in range(self.start_index, self.start_index + self.nsaves):
            self.create_storage(saveindex)
            # Add to list
            my_group.extend([run_training_method.subtask((saveindex, self.id)) 
                         for i in range(self.nproc)])

        result = group(my_group).apply_async()
Frogfish answered 4/6, 2015 at 14:1 Comment(2)
Thank you scytale, I'll try it! Do you know if there is an equivalent of multiprocessing.array in billiard (cf my init() method) so that all tasks can share a same memory variable?Tantalize
No, there is not - celery workers can make no assumptions about being able to share anything since they may be running on different hosts. If you're going to run only on 1 machine ever then maybe using only multiprocessing is a better approach since you have the convenience of shared memory. If you need to run on multiple machines then maybe use only celery and do mapreduce style processing on your dataFrogfish
J
20

I got a similar Error trying to call a multiprocessing method from a Celery task in django. I solved using billiard instead of multiprocessing

import billiard as multiprocessing

Hope it helps.

Joyner answered 28/2, 2019 at 2:44 Comment(2)
For windows this is giving another strange error! Not sure how to fix this..Detector
Sorry @SauravKumar i dont use a windows machine if i know what the error I might be able to helpJoyner
M
16

If you are using a submodule/library with multiprocessing already baked in, it may make more sense to set the -P threads argument of the worker:

celery worker -P threads

https://github.com/celery/celery/issues/4525#issuecomment-566503932

Update: There was a bug in command-line parsing in celery < v5.1.1 that did not allow -P threads even though it was supported. It is fixed in >= v5.1.1. It has been officially supported since v4.4.

Mcclain answered 9/3, 2020 at 18:0 Comment(3)
Thanks! I think this should be considered the accepted answer now.Hartford
Thank u. The best answer.Reece
Thanks! I think this should be the accepted answer. Often times is a library which makes use of threads / sub-processes.Cyna
F
12

billiard and multiprocessing are different libraries - billiard is the Celery project's own fork of multiprocessing. You will need to import billiard and use it instead of multiprocessing

However the better answer is probably that you should refactor your code so that you spawn more Celery tasks instead of using two different ways of distributing your work.

You can do this using Celery canvas

from celery import group

@app.task
def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t    

def work(num_procs):
    return group(sleepawhile.s(randint(1, 5)) for x in range(num_procs)])

def test(self):
    my_group = group(work(randint(1, 5)) for x in range(5))
    result = my_group.apply_async()
    result.get()

I've attempted to make a working version of your code that uses canvas primitives instead of multiprocessing. However since your example was quite artificial it's not easy to come up with something that makes sense.

Update:

Here is a translation of your real code that uses Celery canvas:

tasks.py:

@shared_task
run_training_method(saveindex, embedder_id):
    embedder = Embedder.objects.get(pk=embedder_id)
    embedder.training_method(saveindex)

models.py:

from tasks import run_training_method
from celery import group

class Embedder(Model):

    def embedder_update_task(self):
        my_group = []

        for saveindex in range(self.start_index, self.start_index + self.nsaves):
            self.create_storage(saveindex)
            # Add to list
            my_group.extend([run_training_method.subtask((saveindex, self.id)) 
                         for i in range(self.nproc)])

        result = group(my_group).apply_async()
Frogfish answered 4/6, 2015 at 14:1 Comment(2)
Thank you scytale, I'll try it! Do you know if there is an equivalent of multiprocessing.array in billiard (cf my init() method) so that all tasks can share a same memory variable?Tantalize
No, there is not - celery workers can make no assumptions about being able to share anything since they may be running on different hosts. If you're going to run only on 1 machine ever then maybe using only multiprocessing is a better approach since you have the convenience of shared memory. If you need to run on multiple machines then maybe use only celery and do mapreduce style processing on your dataFrogfish
F
4

I got this when I use multiprocessing with Celery 4.2.0 and Python3.6. Solved this by using billiard.

I changed my source code from

from multiprocessing import Process

to

from billiard.context import Process

solved this error.

Attention, import source is billiard.context not billiard.process

Felony answered 8/7, 2018 at 14:7 Comment(2)
update: from billiard import Process seems to work now!Walz
What is the right airflow billiard package version for python 3.9 airflow 2.2.4Gossipmonger

© 2022 - 2024 — McMap. All rights reserved.