Celery execute task with a batch of messages
Asked Answered
H

2

9

I want to send messages to celery and when it reaches let's say 100 messages i want celery to execute them in batches. This is a common scenario if I want to commit in batches to a database.

For this purpose while googling around i found this link: for doing batches with celery: http://celery.readthedocs.org/en/latest/reference/celery.contrib.batches.html

My problem is that in the example there is no obvious way to get the data submitted to the task

for instance lets say that we submit one by one some messages with:

task.apply_async((message,), link_error=error_handler.s())

and then we have the following task implementation:

@celery.task(name="process.data", base=Batches, flush_every=100, flush_interval=1)
def process_messages(requests):
   for request in requests:
       print request /// how I can take the message data submitted in my task for process?

Is there any alternative way to achieve batches with celery? Thank you

Hookworm answered 28/11, 2014 at 14:55 Comment(0)
H
7

For anyone that will find this post useful after many trial and errors I have managed to take the data out of the SimplRequest object in the following way:

When you submit your data with the following way:

func.delay(data)

from the request object you get the args attribute which is a list with the data:

request.args[0]
request.args[1] 
etc.

If you submit your data with the following way:

func.apply_async((), {'data': data}, link_error=error_handler.s())

then data are available as a dictionary in kwargs:

request.kwargs['data']

Finally, as the example shows we need to do a loop into all requests to gather the data batch

for r in requests:
       data = r.kwargs['data']

It would be nice for the examples in page of the documentation (here) to be updated with a more simple and clear example

Hookworm answered 1/12, 2014 at 9:10 Comment(2)
FYI celery.contrib.batches has been removed in Celery 4 :-(. Ref: docs.celeryproject.org/en/latest/….Foss
@Foss thanks! updated documentation url about removal of batches -> docs.celeryq.dev/en/stable/history/…Allethrin
E
1

The last version of batches.py available at https://github.com/celery/celery/blob/3.1/celery/contrib/batches.py before being deprecated doesn't work with Celery 5+ / Python 3.

A working version can be found at https://gist.github.com/robin-vjc/1a4676ccb055162082c5a061ab556f58

Epigraphy answered 18/11, 2020 at 13:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.