Python RQ: pattern for callback
Asked Answered
L

2

6

I have now a big number of documents to process and am using Python RQ to parallelize the task.

I would like a pipeline of work to be done as different operations is performed on each document. For example: A -> B -> C means pass the document to function A, after A is done, proceed to B and last C.

However, Python RQ does not seem to support the pipeline stuff very nicely.

Here is a simple but somewhat dirty of doing this. In one word, each function along the pipeline call its next function in a nesting way.

For example, for a pipeline A->B->C.

At the top level, some code is written like this:

q.enqueue(A, the_doc)

where q is the Queue instance and in function A there are code like:

q.enqueue(B, the_doc)

And in B, there are something like this:

q.enqueue(C, the_doc)

Is there any other way more elegant than this? For example some code in ONE function:

q.enqueue(A, the_doc) q.enqueue(B, the_doc, after = A) q.enqueue(C, the_doc, after= B)

depends_on parameter is the closest one to my requirement, however, running something like:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

won't work. As q.enqueue(B, depends_on=A_job ) is executed immediately after A_job = q.enqueue(A, the_doc) is executed. By the time B is enqueued, the result from A might not be ready as it takes time to process.

PS:

If Python RQ is not really good at this, what else tool in Python can I use to achieve the same purpose:

  1. round-robin parallelization
  2. pipeline processing support
Lashley answered 18/6, 2014 at 21:12 Comment(4)
what about using 3 queues? one for job a, then another for job b and the last one for job c, the only thing is that when job a ends, the doc is queued on job b queue, and so on...Jamilajamill
Even though q.enqueue(B, depends_on=A_job ) job B will be processed after A is finished. Isn't what matters is when it process rather than enqueue?Talapoin
@Jamilajamill How is using 3 queues is different from using a single queue? How would it solve this?Talapoin
did you find an answer to this ?Heptachord
A
4

By the time B is enqueued, the result from A might not be ready as it takes time to process.

I'm not sure if this was actually true when you originally posted the question but in any case, this is not true now. In fact, the depends_on feature is made exactly for the workflow you described.

It is true that these two functions are executed immediately in succession.

A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )

But the worker will not execute B until A is finished. Until A_job is successfully executed, B.status == 'deferred'. Once A.status == 'finished', then B will start to run.

This means that B and C can access and operate on the result of their dependencies like this:

import time
from rq import Queue, get_current_job
from redis import StrictRedis

conn = StrictRedis()
q = Queue('high', connection=conn)

def A():
    time.sleep(100)
    return 'result A'

def B():
    time.sleep(100)
    current_job = get_current_job(conn)
    a_job_id = current_job.dependencies[0].id
    a_job_result = q.fetch_job(a_job_id).result
    assert a_job_result == 'result A'
    return a_job_result + ' result B'


def C():
    time.sleep(100)
    current_job = get_current_job(conn)
    b_job_id = current_job.dependencies[0].id
    b_job_result = q.fetch_job(b_job_id).result
    assert b_job_result == 'result A result B'
    return b_job_result + ' result C'

The worker will eventually print 'result A result B result C'.

Also, if you have many jobs in the queue and B might be waiting a while before being executed, you might want to significantly increase result_ttl or make it indefinite with result_ttl=-1. Otherwise, the result of A will be purged after however many seconds are set for result_ttl in which case B will no longer be able to access it and return the desired result.

Setting result_ttl=-1 has important memory implications, however. This means your the result of your jobs will never be automatically purged and memory will grow proportionately until you manually remove those results from redis.

Alviani answered 9/6, 2016 at 15:26 Comment(2)
Whats the purpose of adding sleep in each job?Waddell
@Waddell to simulate a long running process.Alviani
P
0

depends_on parameter is the closest one to my requirement, however, running something like:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

won't work. As q.enqueue(B, depends_on=A_job ) is executed immediately after A_job = q.enqueue(A, the_doc) is executed. By the time B is enqueued, the result from A might not be ready as it takes time to process.

For this case, q.enqueue(B, depends_on=A_job) will run once A_job finishes. If the result is not ready, q.enqueue(B, depends_on=A_job) will wait until it is ready.


It does not support it out of the box, but using other technologies it is possible.

In my case, I used caching to keep track of the previous job in the chain, so when we want to enqueue a new function(to run right after) we can properly set its 'depends_on' parameter when calling enqueue()

Note the use of the additional parameters to enqueue: 'timeout, result_ttl, ttl'. These were used since I was running long jobs on rq. You can reference their use in the python rq docs.

I used django_rq.enqueue() which is derived from python rq

    # main.py
    def process_job():
        ...

        # Create a cache key for every chain of methods you want to call.
        # NOTE: I used this for web development, in your case you may want
        # to use a variable or a database, not caching

        # Number of time to cache and keep the results in rq
        TWO_HRS = 60 * 60 * 2

        cache_key = 'update-data-key-%s' % obj.id
        previous_job_id = cache.get(cache_key)
        job = django_rq.enqueue(update_metadata,
                                campaign=campaign,
                                list=chosen_list,
                                depends_on=previous_job_id,
                                timeout=TWO_HRS,
                                result_ttl=TWO_HRS,
                                ttl=TWO_HRS)

        # Set the value for the most recent finished job, so the next function
        # in the chain can set the proper value for 'depends_on'
        cache.set(token_key, job.id, TWO_HRS)

    # utils.py
    def update_metadata(campaign, list):
        # Your code goes here to update the campaign object with the list object
        pass

'depends_on' - From the the rq docs:

depends_on - specifies another job (or job id) that must complete before this job will be queued

Phonon answered 17/11, 2015 at 22:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.