Redis - Better way of cleaning the processing queue(reliable) while using BRPOPLPUSH
Asked Answered
V

4

20

Our Current Design

Env Redis 2.8.17

We have implemented our reliable queue, using the pattern similar to the one described in redis documentation, under RPOPLPUSH

However, we are using BRPOPLPUSH considering its blocking nature, and LPUSH for ensuring the FIFO order.

Producers: multiple threads(from multiple servers) using LPUSH to push the items.

Consumers: multiple threads(from multiple servers) using BRPOPLPUSH to process the items.

BRPOPLPUSH q processing-q

As documented, redis pops the item from queue 'q', while adding them in 'processing-q'.

Problem

Owing to the multi-threaded(async) nature of our application, we don't have any control over, when the consumers will be completing their processing.

So, if we use LREM(as per documentation) to remove the processed element from processing-q, this will only remove the top element of the processing-q. Where as it has no guarantee, on whether it has removed the actual element, which was processed by the respective consumer.

So if we don't do anything the processing-q keeps on growing(eating-up memory), which is very bad IMHO.

Any suggestions or ideas ?

Vauban answered 16/1, 2015 at 14:54 Comment(1)
Here's a good presentation titled Redis as a Reliable Work Queue and presenter's blog post on the same subject. It explains a more advanced queue design modelled in Redis than one that is in RPOPLPUSH docs. Failure recovery activity is also mentioned.Tanguy
T
12

The approach I would take is to use a per-consumer processing-q (e.g. processing-q:consumer-id). That would solve your current problem but you'd still need to handle crashed consumers somehow. For that, I suggest you also keep the last time that each consumer popped a task and periodically check for timeouts. If a consumer has reached the timeout, move its task back to the main queue and delete its queue.

Tessi answered 16/1, 2015 at 15:38 Comment(0)
S
27

You just need to include the job you want to delete in your call to LREM.

LREM takes the form:

LREM queue count "object"

It will remove count items equal to "object" from queue. So to remove the specific job your consumer thread is working on you'd do something like this.

LREM processing-q 1 "job_identifier"

For more see the documentation here: http://redis.io/commands/lrem

Then to handle crashed consumers and abandoned jobs you can use SETEX to create locks with an expiration and periodically check for jobs without locks.

So the whole process looks like this:

Producer

  1. RPUSH q "job_identifier"

Consumer

  1. SETEX lock:processing-q:job_identifier 60 (Set lock first to avoid race condition)
  2. BRPOPLPUSH q processing-queue
  3. Process job
  4. LREM processing-queue "job_identifier"

Expired Jobs Monitor

  1. jobs = LRANGE processing-queue 0 -1
  2. foreach job in jobs : lock = GET lock:processing-q:job_identifier
  3. if lock is null this job timed out, so remove from processing-q LREM processing-queue "job_identifier"
  4. and retry with RPUSH q "job_identifier"

@NotAUser has published an open source java implementation, here: https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq

Stoops answered 12/1, 2016 at 22:5 Comment(14)
This looks easier and better to implement than suggested accepted answer.Serajevo
In case this helps someone I made a quick and dirty implementation of that technique in Go: gist.github.com/brunocassol/ee97631ce2126e7b0f36d771a028d104Gettogether
Is not missing a fourth step in Expired Jobs Monitor for removing the task from the processing-queue, much like the fourth step found in Consumer?Execration
This doesn't work unfortunately. There's a race condition between BRPOPLPUSH and SETEX so the expired job monitor might retry objects even if they have just startedGaelan
@Gaelan - If you SETEX before BRPOPLPUSH, does that clear up your race condition?Stoops
Yes! That's what I ended up doing thanks for the follow up. Btw I ended up implementing this: github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/…Gaelan
I guess I miss something obvious but how can you set a lock using job ID before getting job ID?Allyson
@Allyson "job_identifier" is some unique id your system uses for the job. In our case it's frequently an orderId, generated random number, or sometimes even a primary key from our database.Stoops
In Expired Jobs Monitor shouldn't step 4 happen before 3? It's better to have task received by consumer more than once than ever.Piled
I am facing same trouble as @Allyson Can't set the lock in Consumer step 1), as I don't have the job_identifier at this stage yet.Piled
@NeverEndingQueue you have to generate it by yourselfAllyson
@Allyson How can I generate something that relates to something that I've never seen?Piled
@NeverEndingQueue that's not the point, I thought the same way as you initially. RPUSH essentially just pushes value in the list. This value should represent your job. Basically you can put there something like this {jobId: GENERATED_ID, data: {}}. Then you use the same GENERATED_ID to call SETEX. And then, when you want to check that job is alive, you iterate processing-queue, extract ID from value, and check that lock exists. Redis doesn't know anything about a 'job', you encode your 'job' object as a string or something, so you decide how you can identify this job in the future.Allyson
I am not sure if I am dumb now, but I still can't get the idea. Producer using RPUSH adds the string with GENERATED_ID and job content. Then consumer runs the SETEX using either the full job string or just GENERATED_ID (by producer), but how consumer can issue that command before polling the q first? Is there any other communication between producer and consumer outside of this queue mechanism?Piled
T
12

The approach I would take is to use a per-consumer processing-q (e.g. processing-q:consumer-id). That would solve your current problem but you'd still need to handle crashed consumers somehow. For that, I suggest you also keep the last time that each consumer popped a task and periodically check for timeouts. If a consumer has reached the timeout, move its task back to the main queue and delete its queue.

Tessi answered 16/1, 2015 at 15:38 Comment(0)
G
4

In a similar project, I'm using the hostname and the process id of the worker for the backup queues. Each worker has its own backup queue, and if the worker dies, the item is not lost.

Check the README and the implementation for more details.

Gracielagracile answered 16/1, 2015 at 15:43 Comment(1)
@soveran, so what happens when a worker dies in the middle of processing and restarted with a different process ID? How does the new work process look up its predecessor's the backup queue?Shortage
H
0

In addition to the proposed solutions you could also ltrim the processing queue to an amount that makes sense for your service. This would ensure the processing queue never grows out of proportion.

But you'll start losing items if the trim limit is hit. This may or may not be acceptable for your use case.

http://redis.io/commands/ltrim

Humanism answered 14/12, 2015 at 20:12 Comment(1)
you are right, in my use case its not acceptable to have some items lost.Vauban

© 2022 - 2024 — McMap. All rights reserved.