Running "unique" tasks with celery
Asked Answered
I

6

64

I use celery to update RSS feeds in my news aggregation site. I use one @task for each feed, and things seem to work nicely.

There's a detail that I'm not sure to handle well though: all feeds are updated once every minute with a @periodic_task, but what if a feed is still updating from the last periodic task when a new one is started ? (for example if the feed is really slow, or offline and the task is held in a retry loop)

Currently I store tasks results and check their status like this:

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed


_results = {}


@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
    for feed in Feed.objects.all():
        if feed.pk in _results:
            if not _results[feed.pk].ready():
                # The task is not finished yet
                continue
        _results[feed.pk] = update_feed.delay(feed)


@task()
def update_feed(feed):
    try:
        feed.fetch_articles()
    except socket.error, exc:
        update_feed.retry(args=[feed], exc=exc)

Maybe there is a more sophisticated/robust way of achieving the same result using some celery mechanism that I missed ?

Irradiant answered 4/11, 2010 at 10:57 Comment(0)
R
37

From the official documentation: Ensuring a task is only executed one at a time.

Raphaelraphaela answered 4/11, 2010 at 12:25 Comment(12)
I don't see anything superior in this approach, it's way more complex but basically does the same thing (and using the django cache to store locks seems awkward)Irradiant
Oh I missed a big detail, it makes the lock process and thread safe.Irradiant
Do you know if this is still valid when writing to a global variable? #7719703Lacustrine
@LuperRouch another issue related to your locking mechanism: it only works when there is only one worker running :)Faiyum
here is an approach using redis to store the lock: loose-bits.com/2010/10/distributed-task-locking-in-celery.htmlUdale
this link from the official documentation is pretty useless when not running celery in a django environment, as it relies on setting a cache key and releasing it once the task has finished. has anyone tried an approach with multiprocessing.Semaphore to prevent tasks from a single worker being executed concurrently?Hephzipah
This is a terrible suggestion. Setting up yet-another-server just to regulate tasks going into your Celery server is a lot of extra complexity relative to the utility you're getting out of it.Leonorleonora
@Leonorleonora - service, not server. When the requirement is to ensure that a task is only executed by one worker in a distributed multi-worker system then the functionality required is that of a lock service. There are many ways of implementing a lock service.Raphaelraphaela
Any reason why one wouldn't just create a unique worker for this task with concurrency=1? ... celery worker --queues= fetch_articles_queue --concurrency=1 And then use: CELERYBEAT_SCHEDULE = {'fetch_articles': {'task': 'path.to.tasks.fetch_articles', 'schedule': 60 # every 60 secs, 'options': {'queue' : 'fetch_articles_queue'} # Use this queue},} (Doesn't let me do multiline code blocks in comments ...)Empathy
@luke: firstly, you lose the resilience of having multiple distributed workers capable of performing the task. Secondly, it makes it easier to accidentally run the task concurrently. Newer features of task TTL probably mitigate the possible problems of the job overrunning and beat-spawned requests backing up.Raphaelraphaela
This example "is using the cache framework to set a lock that’s accessible for all workers." but it's not clear a) Is it visible also to clients (i.e. can it be checked before the task is published?) b) What scope is this visibility? If there are many workers across many machines, can they all see this cache?Hydrargyrum
I was wondering why using celery.app.control.inspect().active() to get the list of the currently running tasks has been directly discarded. Wouldn't be enough comparing the current task against that list to assure uniqueness?Remand
C
47

Based on MattH's answer, you could use a decorator like this:

from django.core.cache import cache
import functools

def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__
            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)
            if acquire_lock():
                try:
                    func(*args, **kwargs)
                finally:
                    release_lock()
        return wrapper
    return task_exc

then, use it like so...

@periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
    yada yada...
Cordeliacordelie answered 5/10, 2011 at 22:9 Comment(1)
Thanks; worked for me! Notice however that this does in fact not work with default django CACHES because the default is set to local memory caching which means each process has its own cache, so each celery worker (process) will have its own cache....Reconsider
R
37

From the official documentation: Ensuring a task is only executed one at a time.

Raphaelraphaela answered 4/11, 2010 at 12:25 Comment(12)
I don't see anything superior in this approach, it's way more complex but basically does the same thing (and using the django cache to store locks seems awkward)Irradiant
Oh I missed a big detail, it makes the lock process and thread safe.Irradiant
Do you know if this is still valid when writing to a global variable? #7719703Lacustrine
@LuperRouch another issue related to your locking mechanism: it only works when there is only one worker running :)Faiyum
here is an approach using redis to store the lock: loose-bits.com/2010/10/distributed-task-locking-in-celery.htmlUdale
this link from the official documentation is pretty useless when not running celery in a django environment, as it relies on setting a cache key and releasing it once the task has finished. has anyone tried an approach with multiprocessing.Semaphore to prevent tasks from a single worker being executed concurrently?Hephzipah
This is a terrible suggestion. Setting up yet-another-server just to regulate tasks going into your Celery server is a lot of extra complexity relative to the utility you're getting out of it.Leonorleonora
@Leonorleonora - service, not server. When the requirement is to ensure that a task is only executed by one worker in a distributed multi-worker system then the functionality required is that of a lock service. There are many ways of implementing a lock service.Raphaelraphaela
Any reason why one wouldn't just create a unique worker for this task with concurrency=1? ... celery worker --queues= fetch_articles_queue --concurrency=1 And then use: CELERYBEAT_SCHEDULE = {'fetch_articles': {'task': 'path.to.tasks.fetch_articles', 'schedule': 60 # every 60 secs, 'options': {'queue' : 'fetch_articles_queue'} # Use this queue},} (Doesn't let me do multiline code blocks in comments ...)Empathy
@luke: firstly, you lose the resilience of having multiple distributed workers capable of performing the task. Secondly, it makes it easier to accidentally run the task concurrently. Newer features of task TTL probably mitigate the possible problems of the job overrunning and beat-spawned requests backing up.Raphaelraphaela
This example "is using the cache framework to set a lock that’s accessible for all workers." but it's not clear a) Is it visible also to clients (i.e. can it be checked before the task is published?) b) What scope is this visibility? If there are many workers across many machines, can they all see this cache?Hydrargyrum
I was wondering why using celery.app.control.inspect().active() to get the list of the currently running tasks has been directly discarded. Wouldn't be enough comparing the current task against that list to assure uniqueness?Remand
J
20

Using https://pypi.python.org/pypi/celery_once seems to do the job really nice, including reporting errors and testing against some parameters for uniqueness.

You can do things like:

from celery_once import QueueOnce
from myapp.celery import app
from time import sleep

@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
def start_billing(customer_id, year, month):
    sleep(30)
    return "Done!"

which just needs the following settings in your project:

ONCE_REDIS_URL = 'redis://localhost:6379/0'
ONCE_DEFAULT_TIMEOUT = 60 * 60  # remove lock after 1 hour in case it was stale
Jeraldinejeralee answered 30/10, 2015 at 11:55 Comment(0)
S
10

If you're looking for an example that doesn't use Django, then try this example (caveat: uses Redis instead, which I was already using).

The decorator code is as follows (full credit to the author of the article, go read it)

import redis

REDIS_CLIENT = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = REDIS_CLIENT.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=False)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec
Sherburn answered 21/10, 2013 at 16:10 Comment(1)
is this possible to do this in rabbitMQ?Anana
R
6

I was wondering why nobody mentioned using celery.app.control.inspect().active() to get the list of the currently running tasks. Is it not real time? Because otherwise it would be very easy to implement, for instance:

def unique_task(callback,  *decorator_args, **decorator_kwargs):
    """
    Decorator to ensure only one instance of the task is running at once.
    """
    @wraps(callback)
    def _wrapper(celery_task, *args, **kwargs):
        active_queues = task.app.control.inspect().active()
        if active_queues:
            for queue in active_queues:
                for running_task in active_queues[queue]:
                    # Discard the currently running task from the list.
                    if task.name == running_task['name'] and task.request.id != running_task['id']:
                        return f'Task "{callback.__name__}()" cancelled! already running...'

        return callback(celery_task, *args, **kwargs)

    return _wrapper

And then just applying the decorator to the corresponding tasks:

@celery.task(bind=True)
@unique_task
def my_task(self):
    # task executed once at a time.
    pass

Remand answered 24/2, 2021 at 11:53 Comment(0)
L
0

This solution for celery working at single host with concurency greater 1. Other kinds (without dependencies like redis) of locks difference file-based don't work with concurrency greater 1.

class Lock(object):
    def __init__(self, filename):
        self.f = open(filename, 'w')

    def __enter__(self):
        try:
            flock(self.f.fileno(), LOCK_EX | LOCK_NB)
            return True
        except IOError:
            pass
        return False

    def __exit__(self, *args):
        self.f.close()


class SinglePeriodicTask(PeriodicTask):
    abstract = True
    run_every = timedelta(seconds=1)

    def __call__(self, *args, **kwargs):
        lock_filename = join('/tmp',
                             md5(self.name).hexdigest())
        with Lock(lock_filename) as is_locked:
            if is_locked:
                super(SinglePeriodicTask, self).__call__(*args, **kwargs)
            else:
                print 'already working'


class SearchTask(SinglePeriodicTask):
    restart_delay = timedelta(seconds=60)

    def run(self, *args, **kwargs):
        print self.name, 'start', datetime.now()
        sleep(5)
        print self.name, 'end', datetime.now()
Lincoln answered 6/1, 2014 at 19:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.