Flask Celery task locking
Asked Answered
M

3

10

I am using Flask with Celery and I am trying to lock a specific task so that it can only be run one at a time. In the celery docs it gives a example of doing this Celery docs, Ensuring a task is only executed one at a time. This example that was given was for Django however I am using flask I have done my best to convert this to work with Flask however I still see myTask1 which has the lock can be run multiple times.

One thing that is not clear to me is if I am using the cache correctly, I have never used it before so all of it is new to me. One thing from the doc's that is mentioned but not explained is this

Doc Notes:

In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.

Im not truly sure what that means, should i be using the cache in conjunction with a database and if so how would I do that? I am using mongodb. In my code I just have this setup for the cache cache = Cache(app, config={'CACHE_TYPE': 'simple'}) as that is what was mentioned in the Flask-Cache doc's Flask-Cache Docs

Another thing that is not clear to me is if there is anything different I need to do as I am calling my myTask1 from within my Flask route task1

Here is an example of my code that I am using.

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time


app = Flask(__name__)

cache = Cache(app, config={'CACHE_TYPE': 'simple'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################


app.config['CELERY_BROKER_URL'] = 'amqp://localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')

    lock_id = self.name

    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html') 


@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))

Final Working Code

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
import redis
from flask_redis import FlaskRedis


app = Flask(__name__)

# ADDING REDIS
redis_store = FlaskRedis(app)

# POINTING CACHE_TYPE TO REDIS
cache = Cache(app, config={'CACHE_TYPE': 'redis'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################

# CELERY USING REDIS
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    print('in memcache_lock and timeout_at is {}'.format(timeout_at))
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
        print('memcache_lock and status is {}'.format(status))
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')
    print('dir is {} '.format(dir(self)))

    lock_id = self.name
    print('lock_id is {}'.format(lock_id))

    with memcache_lock(lock_id, self.app.oid) as acquired:
        print('in memcache_lock and lock_id is {} self.app.oid is {} and acquired is {}'.format(lock_id, self.app.oid, acquired))
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'myTask1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html')

@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))


if __name__ == '__main__':
    app.secret_key = 'super secret key for me123456789987654321'
    app.run(port=1234, host='localhost')

Here is also a screen shot you can see that I ran myTask1 two times and myTask2 a single time. Now I have the expected behavior for myTask1. Now myTask1 will be run by a single worker if another worker attempt to pick it up it will just keep retrying based on whatever i define.

Flower Dashboard

Melosa answered 27/12, 2018 at 20:30 Comment(6)
self is a string, and self.cache doesn't exist. Just a guess but maybe the Cache.add should be an instance so something like: Cache().add ? Because when add get's called the first argument might be self like def add(self, lock_id, oid, lock_expire): so self is the lock_id with how you have it?Melli
Thanks for your suggestion I gave it a try status = Cache().add(lock_id, oid, LOCK_EXPIRE) however that gave me a new traceback.Melosa
with memcache_lock(lock_id, self.app.oid) as acquired: File "/auto/pysw/cel63/python/3.4.1/lib/python3.4/contextlib.py", line 59, in enter return next(self.gen) File "app.py", line 63, in memcache_lock status = Cache().add(lock_id, oid, LOCK_EXPIRE) File "/pyats2/lib/python3.4/site-packages/flask_cache/__init__.py", line 204, in add self.cache.add(*args, **kwargs) File "/ws/mastarke-sjc/pyats2/lib/python3.4/site-packages/flask_cache/__init__.py", line 192, in cache return app.extensions['cache'][self] KeyError: 'cache'Melosa
What is the behavior you want when the task is called multiple times? Do you want the task to be queued or to be ignored entirely?Sternmost
Ideally what I would like is when myTask1 is called it would just be queued and not run until the lock is complete.Melosa
Let me re-clarify incase my first comment is not clear. if myTask1 is not in use then I would like it to be run. If another worker is using myTask1 ideally i would like the task to be queued but not run until the lock is removed. hopefully that is more clear.Melosa
M
1

I also found this to be a surprisingly hard problem. Inspired mainly by Sebastian's work on implementing a distributed locking algorithm in redis I wrote up a decorator function.

A key point to bear in mind about this approach is that we lock tasks at the level of the task's argument space, e.g. we allow multiple game update/process order tasks to run concurrently, but only one per game. That's what argument_signature achieves in the code below. You can see documentation on how we use this in our stack at this gist:

import base64
from contextlib import contextmanager
import json
import pickle as pkl
import uuid

from backend.config import Config
from redis import StrictRedis
from redis_cache import RedisCache
from redlock import Redlock

rds = StrictRedis(Config.REDIS_HOST, decode_responses=True, charset="utf-8")
rds_cache = StrictRedis(Config.REDIS_HOST, decode_responses=False, charset="utf-8")
redis_cache = RedisCache(redis_client=rds_cache, prefix="rc", serializer=pkl.dumps, deserializer=pkl.loads)
dlm = Redlock([{"host": Config.REDIS_HOST}])

TASK_LOCK_MSG = "Task execution skipped -- another task already has the lock"
DEFAULT_ASSET_EXPIRATION = 8 * 24 * 60 * 60  # by default keep cached values around for 8 days
DEFAULT_CACHE_EXPIRATION = 1 * 24 * 60 * 60  # we can keep cached values around for a shorter period of time

REMOVE_ONLY_IF_OWNER_SCRIPT = """
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
"""


@contextmanager
def redis_lock(lock_name, expires=60):
    # https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/
    random_value = str(uuid.uuid4())
    lock_acquired = bool(
        rds.set(lock_name, random_value, ex=expires, nx=True)
    )
    yield lock_acquired
    if lock_acquired:
        rds.eval(REMOVE_ONLY_IF_OWNER_SCRIPT, 1, lock_name, random_value)


def argument_signature(*args, **kwargs):
    arg_list = [str(x) for x in args]
    kwarg_list = [f"{str(k)}:{str(v)}" for k, v in kwargs.items()]
    return base64.b64encode(f"{'_'.join(arg_list)}-{'_'.join(kwarg_list)}".encode()).decode()


def task_lock(func=None, main_key="", timeout=None):
    def _dec(run_func):
        def _caller(*args, **kwargs):
            with redis_lock(f"{main_key}_{argument_signature(*args, **kwargs)}", timeout) as acquired:
                if not acquired:
                    return TASK_LOCK_MSG
                return run_func(*args, **kwargs)
        return _caller
    return _dec(func) if func is not None else _dec

Implementation in our task definitions file:

@celery.task(name="async_test_task_lock")
@task_lock(main_key="async_test_task_lock", timeout=UPDATE_GAME_DATA_TIMEOUT)
def async_test_task_lock(game_id):
    print(f"processing game_id {game_id}")
    time.sleep(TASK_LOCK_TEST_SLEEP)

How we test against a local celery cluster:

from backend.tasks.definitions import async_test_task_lock, TASK_LOCK_TEST_SLEEP
from backend.tasks.redis_handlers import rds, TASK_LOCK_MSG
class TestTaskLocking(TestCase):
    def test_task_locking(self):
        rds.flushall()
        res1 = async_test_task_lock.delay(3)
        res2 = async_test_task_lock.delay(5)
        self.assertFalse(res1.ready())
        self.assertFalse(res2.ready())
        res3 = async_test_task_lock.delay(5)
        res4 = async_test_task_lock.delay(5)
        self.assertEqual(res3.get(), TASK_LOCK_MSG)
        self.assertEqual(res4.get(), TASK_LOCK_MSG)
        time.sleep(TASK_LOCK_TEST_SLEEP)
        res5 = async_test_task_lock.delay(3)
        self.assertFalse(res5.ready())

(as a goodie there's also a quick example of how to setup a redis_cache)

Mertiemerton answered 9/8, 2020 at 2:17 Comment(0)
T
7

In your question, you point out this warning from the Celery example you used:

In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.

And you mention that you don't really understand what this means. Indeed, the code you show demonstrates that you've not heeded that warning, because your code uses an inappropriate backend.

Consider this code:

with memcache_lock(lock_id, self.app.oid) as acquired:
    if acquired:
        # do some work

What you want here is for acquired to be true only for one thread at a time. If two threads enter the with block at the same time, only one should "win" and have acquired be true. This thread that has acquired true can then proceed with its work, and the other thread has to skip doing the work and try again later to acquire the lock. In order to ensure that only one thread can have acquired true, .add must be atomic.

Here's some pseudo code of what .add(key, value) does:

1. if <key> is already in the cache:
2.   return False    
3. else:
4.   set the cache so that <key> has the value <value>
5.   return True

If the execution of .add is not atomic, this could happen if two threads A and B execute .add("foo", "bar"). Assume an empty cache at the start.

  1. Thread A executes 1. if "foo" is already in the cache and finds that "foo" is not in the cache, and jumps to line 3 but the thread scheduler switches control to thread B.
  2. Thread B also executes 1. if "foo" is already in the cache, and also finds that "foo" is not in the cache. So it jumps to line 3 and then executes line 4 and 5 which sets the key "foo" to the value "bar" and the call returns True.
  3. Eventually, the scheduler gives control back to Thread A, which continues executing 3, 4, 5 and also sets the key "foo" to the value "bar" and also returns True.

What you have here is two .add calls that return True, if these .add calls are made within memcache_lock this entails that two threads can have acquired be true. So two threads could do work at the same time, and your memcache_lock is not doing what it should be doing, which is only allow one thread to work at a time.

You are not using a cache that ensures that .add is atomic. You initialize it like this:

cache = Cache(app, config={'CACHE_TYPE': 'simple'})

The simple backend is scoped to a single process, has no thread-safety, and has an .add operation which is not atomic. (This does not involve Mongo at all by the way. If you wanted your cache to be backed by Mongo, you'd have to specify a backed specifically made to send data to a Mongo database.)

So you have to switch to another backend, one that guarantees that .add is atomic. You could follow the lead of the Celery example and use the memcached backend, which does have an atomic .add operation. I don't use Flask, but I've does essentially what you are doing with Django and Celery, and used the Redis backend successfully to provide the kind of locking you're using here.

Thetes answered 29/12, 2018 at 23:54 Comment(4)
Ok so rather than using the simple cache like so cache = Cache(app, config={'CACHE_TYPE': 'simple'}) I should switch to using a Redis cache like cache = Cache(app, config={'CACHE_TYPE': 'redis'}) Right now I use RabbitMQ but I could switch to Redis if this is what is required for this to work.Melosa
Yes, you could use cache = Cache(app, config={'CACHE_TYPE': 'redis'}) if you want to use Redis. I know RabbitMQ by name but I've never actually used it, so I don't know if it would be appropriate as a backend for locking.Thetes
Ok thanks. I don't see RabbitMQ mentioned on the official Flask-Cache Doc's for Built-in Cache Backend, but I do see Redis. Let me setup Redis in my environment and see how that goes. If that works I'll mark your answer as accepted. Thanks again for your detailed explanation.Melosa
Thanks you, I was able to finally get this to work. Basically needed to use Redis I changed to cache = Cache(app, config={'CACHE_TYPE': 'redis'}) and now this works.Melosa
S
1

With this setup, you should still expect to see workers receiving the task, since the lock is checked inside of the task itself. The only difference will be that the work won't be performed if the lock is acquired by another worker.
In the example given in the docs, this is the desired behavior; if a lock already exists, the task will simply do nothing and finish as successful. What you want is slightly different; you want the work to be queued up instead of ignored.

In order to get the desired effect, you would need to make sure that the task will be picked up by a worker and performed some time in the future. One way to accomplish this would be with retrying.

@task(bind=True, name='my-task')
def my_task(self):
    lock_id = self.name

    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later
Sternmost answered 28/12, 2018 at 20:15 Comment(7)
Thanks I see your point and appreciate your help. However I still see I am able to call this multiple times. When I look at the worker logs you can see both worker 3 and 4 picked up myTask1 at around the same time. 140> tail -f worker-3.log [2018-12-28 12:51:55,849: WARNING/ForkPoolWorker-3] acquired is True [2018-12-28 12:53:25,943: INFO/ForkPoolWorker-3] Task app.myTask1[5f492c3f-9684-493e-8873-1190f71527a5] succeeded in 90.10438371099985s: 'result'Melosa
Here is worker 4 logs 141> tail -f worker-4.log [2018-12-28 12:51:59,381: WARNING/ForkPoolWorker-4] acquired is True [2018-12-28 12:53:29,476: INFO/ForkPoolWorker-4] Task app.myTask1[be05682f-1ff4-452b-9dff-c4593bd3c452] succeeded in 90.11584289799998s: 'result'Melosa
you can see they both pick up myTask1 around '12:53'Melosa
I updated my post to show myTask1 with your solution.Melosa
It would indicate to me then that the memcache_lock is not functioning as expected. I'm not familiar witth flask-cache but I would wonder what the value is returned when you .add an existing key. Additionally, it looks like flask-cache is bound to the request context? Also according to the docs, the 'simple' cache is just a Python dictionary. Maybe you could try to configure an actual memcached cache instead.Sternmost
Thanks i was thinking in the same way. However I have never used Flask-Cache or any type of memcache so Im not really sure what to do there. But I think your idea is in the right direction. ill look into the Flask-Cache and memcache.Melosa
I added my route code to my original post. Originally I was not sure if it was relevant or not. I think the cache issue may have to do with how I am calling the task. If you see in my added code the task is getting executed by going to a particular route. in this example I have a route called task1 that will execute myTask1. I think maybe the cache issue might have to do with this.Melosa
M
1

I also found this to be a surprisingly hard problem. Inspired mainly by Sebastian's work on implementing a distributed locking algorithm in redis I wrote up a decorator function.

A key point to bear in mind about this approach is that we lock tasks at the level of the task's argument space, e.g. we allow multiple game update/process order tasks to run concurrently, but only one per game. That's what argument_signature achieves in the code below. You can see documentation on how we use this in our stack at this gist:

import base64
from contextlib import contextmanager
import json
import pickle as pkl
import uuid

from backend.config import Config
from redis import StrictRedis
from redis_cache import RedisCache
from redlock import Redlock

rds = StrictRedis(Config.REDIS_HOST, decode_responses=True, charset="utf-8")
rds_cache = StrictRedis(Config.REDIS_HOST, decode_responses=False, charset="utf-8")
redis_cache = RedisCache(redis_client=rds_cache, prefix="rc", serializer=pkl.dumps, deserializer=pkl.loads)
dlm = Redlock([{"host": Config.REDIS_HOST}])

TASK_LOCK_MSG = "Task execution skipped -- another task already has the lock"
DEFAULT_ASSET_EXPIRATION = 8 * 24 * 60 * 60  # by default keep cached values around for 8 days
DEFAULT_CACHE_EXPIRATION = 1 * 24 * 60 * 60  # we can keep cached values around for a shorter period of time

REMOVE_ONLY_IF_OWNER_SCRIPT = """
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
"""


@contextmanager
def redis_lock(lock_name, expires=60):
    # https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/
    random_value = str(uuid.uuid4())
    lock_acquired = bool(
        rds.set(lock_name, random_value, ex=expires, nx=True)
    )
    yield lock_acquired
    if lock_acquired:
        rds.eval(REMOVE_ONLY_IF_OWNER_SCRIPT, 1, lock_name, random_value)


def argument_signature(*args, **kwargs):
    arg_list = [str(x) for x in args]
    kwarg_list = [f"{str(k)}:{str(v)}" for k, v in kwargs.items()]
    return base64.b64encode(f"{'_'.join(arg_list)}-{'_'.join(kwarg_list)}".encode()).decode()


def task_lock(func=None, main_key="", timeout=None):
    def _dec(run_func):
        def _caller(*args, **kwargs):
            with redis_lock(f"{main_key}_{argument_signature(*args, **kwargs)}", timeout) as acquired:
                if not acquired:
                    return TASK_LOCK_MSG
                return run_func(*args, **kwargs)
        return _caller
    return _dec(func) if func is not None else _dec

Implementation in our task definitions file:

@celery.task(name="async_test_task_lock")
@task_lock(main_key="async_test_task_lock", timeout=UPDATE_GAME_DATA_TIMEOUT)
def async_test_task_lock(game_id):
    print(f"processing game_id {game_id}")
    time.sleep(TASK_LOCK_TEST_SLEEP)

How we test against a local celery cluster:

from backend.tasks.definitions import async_test_task_lock, TASK_LOCK_TEST_SLEEP
from backend.tasks.redis_handlers import rds, TASK_LOCK_MSG
class TestTaskLocking(TestCase):
    def test_task_locking(self):
        rds.flushall()
        res1 = async_test_task_lock.delay(3)
        res2 = async_test_task_lock.delay(5)
        self.assertFalse(res1.ready())
        self.assertFalse(res2.ready())
        res3 = async_test_task_lock.delay(5)
        res4 = async_test_task_lock.delay(5)
        self.assertEqual(res3.get(), TASK_LOCK_MSG)
        self.assertEqual(res4.get(), TASK_LOCK_MSG)
        time.sleep(TASK_LOCK_TEST_SLEEP)
        res5 = async_test_task_lock.delay(3)
        self.assertFalse(res5.ready())

(as a goodie there's also a quick example of how to setup a redis_cache)

Mertiemerton answered 9/8, 2020 at 2:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.