Python Celery - How to call celery tasks inside other task
Asked Answered
D

3

11

I'm calling a task within a tasks in Django-Celery

Here are my tasks.

@shared_task
def post_notification(data,url):
    url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line.
    headers = {'content-type': 'application/json'}
    requests.post(url, data=json.dumps(data), headers=headers)


@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)

    for server in server_list:
        task = post_notification.delay(data,server.server_id.url)
        print task.status # it prints 'Nonetype' has no attribute id

How can I call a task within a task? I read somewhere it can be done using group, but I'm not able to form the correct syntax. How do I do it?

I tried this

for server in server_list:
    task = group(post_notification.s(data, server.server_id.url))().get()
    print task.status

Throws a warning saying

TxIsolationWarning: Polling results w│                                                                        
ith transaction isolation level repeatable-read within the same transacti│                                                                        
on may give outdated results. Be sure to commit the transaction for each │                                                                        
poll iteration.                                                          │                                                                        
  'Polling results with transaction isolation level '

Dont know what it is!!!

How do I solve my problem?

Dishearten answered 17/2, 2014 at 13:4 Comment(2)
result = task.delay/task.apply_async gives an AsyncResult object. This supports a polling .status attribute which each time its accessed will check what the state of the task is. It makes no sense to call .state right after you have sent the task because chances are the worker did not start executing it yet. In your later example you call task = .....get().status which will not work because you are calling status on the return value of the task, not the result (result.status vs result.get().status).Jocularity
Finally you should not wait for the result of a subtask because that may lead to deadlocks, instead you should use a callback task: (post_notification.s() | do_sometihing_after_posted.s()).delay(). See docs.celeryproject.org/en/latest/userguide/… and docs.celeryproject.org/en/latest/userguide/canvas.htmlJocularity
G
9

This should work:

celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3])
Gripping answered 17/2, 2014 at 13:17 Comment(8)
What is my_model and current_app?Dishearten
current_app is property of celery module. mymodel.tasks is a path to your tasks.py. Change it if necessary.Gripping
so, I should do something like this task = celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3]) Dishearten
Since the task to be called is in the same module, I'm doing like this task = celery.current_app.send_task('post_notification', args=[data, url]) print task.statusDishearten
celery.current_app.send_task will return AsyncResult instance. The task will be executed.Gripping
and check the path to tasks fileGripping
Note that you don't have to use send_task, you can use task.delay from a task without problems, your problem is polling the result object that is returned.Jocularity
Worked fine here! Thanks... I use from celery._state import current_app to version 4.2.xSansen
B
4

You are right, because each task in you for loop will be overwrite task variable.

You can try celery.group like

from celery import group

and

@shared_task
def shipment_server(data,notification_type):
    notification_obj = Notification.objects.get(name = notification_type)
    server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)


    tasks = [post_notification.s(data, server.server_id.url) for server in server_list]
    results = group(tasks)()
    print results.get() # results.status() what ever you want
Booby answered 17/2, 2014 at 13:31 Comment(1)
calling .get() inside a task is bad practice that can lead to dead locks. You need to be very careful about it.Gongorism
F
0

you can call task from a task using delay function

from app.tasks import celery_add_task
    celery_add_task.apply_async(args=[task_name]) 

... it will work

Fatherhood answered 17/2, 2014 at 13:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.