Using multiprocessing pool from celery task raises exception
Asked Answered
C

3

13

FOR THOSE READING THIS: I have decided to use RQ instead which doesn't fail when running code that uses the multiprocessing module. I suggest you use that.

I am trying to use a multiprocessing pool from within a celery task using Python 3 and redis as the broker (running it on a Mac). However, I don't seem to be able to even create a multiprocessing Pool object from within the Celery task! Instead, I get a strange exception that I really don't know what to do with.

Can anyone tell me how to accomplish this?

The task:

from celery import Celery
from multiprocessing.pool import Pool

app = Celery('tasks', backend='redis', broker='redis://localhost:6379/0')

@app.task
def test_pool():
    with Pool() as pool:
        # perform some task using the pool
        pool.close()
    return 'Done!'

which I add to Celery using:

celery -A tasks worker --loglevel=info

and then running it via the following python script:

import tasks

tasks.test_pool.delay()

that returns the following celery output:

[2015-01-12 15:08:57,571: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-01-12 15:08:57,583: INFO/MainProcess] mingle: searching for neighbors
[2015-01-12 15:08:58,588: INFO/MainProcess] mingle: all alone
[2015-01-12 15:08:58,598: WARNING/MainProcess] [email protected] ready.
[2015-01-12 15:09:02,425: INFO/MainProcess] Received task: tasks.test_pool[38cab553-3a01-4512-8f94-174743b05369]
[2015-01-12 15:09:02,436: ERROR/MainProcess] Task tasks.test_pool[38cab553-3a01-4512-8f94-174743b05369] raised unexpected: AttributeError("'Worker' object has no attribute '_config'",)
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python3.4/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/Users/simongray/Code/etilbudsavis/offer-sniffer/tasks.py", line 17, in test_pool
    with Pool() as pool:
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 150, in __init__
    self._setup_queues()
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 243, in _setup_queues
    self._inqueue = self._ctx.SimpleQueue()
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__
    self._rlock = ctx.Lock()
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock
    return Lock(ctx=self.get_context())
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 163, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 59, in __init__
    kind, value, maxvalue, self._make_name(),
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 117, in _make_name
    return '%s-%s' % (process.current_process()._config['semprefix'],
AttributeError: 'Worker' object has no attribute '_config'
Crust answered 12/1, 2015 at 14:23 Comment(4)
I believe you are unable to spawn threads from celery and in a sense defeats the purpose of distributed processing.Silkweed
Well, for one I don't see how it can defeat the purpose of distributed processing that the code I'm trying to run using Celery tasks uses Python's built-in multiprocessing... so that's not really too helpful of an answer.I happen to have a large code base that uses multiprocessing and if I have to rewrite all of that code to be able to use Celery, then Celery seems like an incredible straightjacket of a tool?Crust
hi, we are using the celery + multiprocessing approach and it gives problems when celery serializes shared objets from the multiprocessing suite. We changed it, and now we decided to continue with a rq based implementation.Commissure
That RQ thing looks so much easier and less problematic than Celery + RabbitMQ, each of which have given me numerous headaches.Awakening
E
6

This is a known issue with celery. It stems from an issue introduced in the billiard dependency. A work-around is to manually set the _config attribute for the current process. Thanks to user @martinth for the work-around below.

from celery.signals import worker_process_init
from multiprocessing import current_process

@worker_process_init.connect
def fix_multiprocessing(**kwargs):
    try:
        current_process()._config
    except AttributeError:
        current_process()._config = {'semprefix': '/mp'}

The worker_process_init hook will execute the code upon worker process initialization. We simply check to see if _config exists, and set it if it does not.

Eddaeddana answered 20/8, 2015 at 18:37 Comment(0)
E
3

Via a useful comment in the Celery issue report linked to in Davy's comment, I was able to solve this by importing the billiard module's Pool class instead.

Replace

from multiprocessing import Pool

with

from billiard.pool import Pool
Economize answered 19/4, 2022 at 18:10 Comment(0)
M
1

A quick solution is to use the thread-based "dummy" multiprocessing implementation. Change

from multiprocessing import Pool  # or whatever you're using

to

from multiprocessing.dummy import Pool

However since this parallelism is thread-based, the usual caveats (GIL) apply.

Monobasic answered 9/4, 2016 at 0:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.