Job queue with job affinity
Asked Answered
C

9

20

I am currently facing a problem for which I am pretty sure there is an official name, but I don't know what to search the web for. I hope that if I describe the problem and the solution I have in mind, somebody is able to tell me the name of the design pattern (if there is one that matches what I am going to describe).

Basically, what I want to have is a job queue: I have multiple clients that create jobs (publishers), and a number of workers that process these jobs (consumers). Now I want to distribute the jobs created by the publishers to the various consumers, which is basically doable using almost any message queue with load balancing across a queue, e.g. using RabbitMQ or even MQTT 5.

However, now things get complicated... every job refers to an external entity, let's say a user. What I want is that the jobs for a single user get processed in order, but for multiple users in parallel. I do not have the requirement that the jobs for user X always go to worker Y, since they should be processed sequentially anyway.

Now I could solve this using RabbitMQ and its consistent hashing exchange, but then I have a data race when new workers enter the cluster, because RabbitMQ does not support re-locating the jobs that are already in a queue.

MQTT 5 does not support this either: Here this idea is known as "sticky shared subscriptions", but this is not official. It may be part of MQTT 6, or it may not. Who knows.

I have also taken a look at NSQ, NATS, and some other brokers. Most of them don't even support this very specific scenario, and those that do use consistent hashing, which has the previously mentioned data racing problem.

Now, the problem would be gone if the broker would not sort the jobs into queues, once the jobs arrive, but if it would track if a job for a specific user is already being processed: If so, it should delay all other jobs for this user, but all jobs for other users should still process. This is, AFAICS, not possible using RabbitMQ et al.

I am pretty sure that I am not the only person with a use case for that. I could e.g. think of users uploading videos to a video platform, and although uploaded videos are processed in parallel, all the videos uploaded by a single user are processed sequentially.

So, to cut a long story short: Is what I describe known under a common name? Something such as distributed job queue? Task dispatcher with task affinity? Or anything else? I have tried lots of terms, but didn't succeed. This may mean that there is no solution for this, but as said, it's hard to imagine that I'm the only person on the planet with this problem.

Any ideas what I could look for? And: Are there any tools that implement this? Any protocols?

PS: Just using a predefined routing key is not an option, since the user ids (which I just used as a made-up example here) are basically UUIDs, so there can be billions of it, so I need something more dynamic. Hence, consistent hashing is basically the correct approach, but as said, the distribution has to work piece by piece, not upfront, to avoid data races.

Crater answered 22/5, 2019 at 19:0 Comment(2)
I would probably solve this issue by using a database rather than a message queue. You already have a single point of contention by having a pre-requisite for the message to be processed, so might as well employ the right tool for the job at that point.Diverticulitis
Any success in finding the solution for this problem using some modern message brokers? I can't find a way to do this in Kafka or RabbitMQ. It seems unavailable without implementing additional coordination. I just found this feature in older, classic brokers (ActiveMQ, Weblogic JMS, Apache Qpid) under name of "message groups" or "unit of order". It means messages of given "unit" or "group" are pushed to only one of currently free workers in a dynamic way (each time a different worker may be selected).Greatly
P
3

what I want to have is a job queue: I have multiple clients that create jobs (publishers), and a number of workers that process these jobs (consumers). Now I want to distribute the jobs created by the publishers to the various consumers, which is basically doable using almost any message queue with load balancing across a queue, e.g. using RabbitMQ or even MQTT 5.

However, now things get complicated... every job refers to an external entity, let's say a user. What I want is that the jobs for a single user get processed in order, but for multiple users in parallel. I do not have the requirement that the jobs for user X always go to worker Y, since they should be processed sequentially anyway.

Evenif it was not this particular use case, I did a survey of (dynamic) task scheduling [0] [1] a couple month ago and nothing like that surfaced.

Every scheduling algorithm I read about have some properties that are common to all other tasks like priority, age, enqueue time, task name (and by extension average time to process). If you tasks were all linked to a user you could build a scheduler that takes user_id into account to pick task from the queue.

But I guess, you don't want to build your own scheduler, anyway it would be waste because, from experience with such need, existing message queues allow to implement your requirement.

To summarize your requirements you need:

A scheduler that run only one task per user at the same time.

The solution is to use a distributed lock, something like REDIS distlock and acquire the lock before the task starts and refresh it regularly during the task execution. If a new task for the same user comes in and try to execute it will fail to acquire the lock and will be re-enqueued.

Here is a pseudo-code:

def my_task(user_id, *args, **kwargs):
    if app.distlock(user_id, blocking=False):
        exec_my_task(user_id, *args, **kwargs)
    else:
        raise RetryTask()

Don't forget to refresh and release the lock.

A similar approach is taken to enforce robots.txt delay between every requests in crawlers.

Panthea answered 31/5, 2019 at 9:37 Comment(1)
Using some kind of distributed locking system looks like a good advice, but I would agree with playsted below, that checking this inside consumer after getting the task from the queue is too late. If it was used to choose from which queue to poll a task - that's completely different story.Greatly
S
25

Temporal Workflow is capable of supporting your use case with minimal effort.

Here is a strawman design that satisfies your requirements:

  • Send signalWithStart request to a user workflow using userID as the workflow ID. It either delivers the signal to the workflow or first starts the workflow and delivers the signal to it.
  • All requests to that workflow are buffered by it. Temporal provides a hard guarantee that only one workflow with a given ID can exist in an open state. So all signals (events) are guaranteed to be buffered in the workflow that belongs to the user. Temporal preserves all data in the workflow (including stack traces and local variables) in the presence of any process or infra failures. So no need to persist the taskQueue variable explicitly.
  • An internal workflow event loop dispatches these requests one by one.
  • When the buffer is empty workflow can complete.

Here is the workflow code that implements it in Java (Go, Typescript, and PHP SDKs are also supported, Python is in alpha):

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

And then the code that enqueues that task to the workflow through the signal method:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = WorkflowOptions.newBuilder()
       .setTaskQueue(TASK_QUEUE)
       .setWorkflowId(task.getUserId())
       .build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = temporalClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = temporalClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    temporalClient.signalWithStart(request);
}

Temporal offers a lot of other advantages over using queues for task processing.

  • Built it exponential retries with unlimited expiration interval
  • Failure handling. For example, it allows executing a task that notifies another service if both updates couldn't succeed during a configured interval.
  • Support for long running heartbeating operations
  • Ability to implement complex task dependencies. For example to implement chaining of calls or compensation logic in case of unrecoverable failures (SAGA)
  • Gives complete visibility into the current state of the update. For example when using queues all you know if there are some messages in a queue and you need additional DB to track the overall progress. With Temporal every event is recorded.
  • Ability to cancel an update in-flight.
  • Distributed CRON support

See the presentation that goes over the Temporal programming model.

Skewbald answered 16/6, 2019 at 0:13 Comment(4)
Isn't Queue<Task> taskQueue will be lost in case of worker crash? If I understand correctly, signals will not be redelievered to new worker because current worker already received them and placed into internal state. As far as I know, Cadence not store workflow state, it recreates it from the history. Are signals part of that history?Propagandism
I think you are confusing implementation details of how Cadence provides fault tolerance (through event sourcing) and the programming model. Queue<Task> taskQueue is not going to be lost in the case of a worker crash. This is exactly why Cadence greatly simplifies development of distributed systems. I call this model fault-oblivious code as the workflow code is not even aware about worker failures. edit: I read your comment more carefully. Yes, signals are absolutely part of the workflow history. Without this it would be impossible to restore the sate.Skewbald
Thanks! I suggest you to update the documentation to explicitly specify, that signals are part of the history (it's not obvious now). Also it would be good to add a section with some internals like "How it works". For instance, I like to know: when signal comes into the workflow, in what thread the signal's handler method is called (in main workflow thread and only on special places like WorkflowThread.sleep()?). Also it is very interesting, what happens to main workflow's thread when activity or sleep() is called (I mean, Java doesn't support coroutines after all). Etc. Thanks!Propagandism
Agree, I'm planning to write a separate section on how multithreading is done in the Java and Go clients. The gist of it is that it is cooperative and real threads are used. So when the sleep is called the real thread is blocked. But if workflow is pushed out of cache that thread is released back to the process. This severely limits the number of cached Java workflows as limit is always in threads, not memory. But it doesn't affect the user experience as from his point of view workflow is just blocked for the whole duration of the sleep.Skewbald
P
3

what I want to have is a job queue: I have multiple clients that create jobs (publishers), and a number of workers that process these jobs (consumers). Now I want to distribute the jobs created by the publishers to the various consumers, which is basically doable using almost any message queue with load balancing across a queue, e.g. using RabbitMQ or even MQTT 5.

However, now things get complicated... every job refers to an external entity, let's say a user. What I want is that the jobs for a single user get processed in order, but for multiple users in parallel. I do not have the requirement that the jobs for user X always go to worker Y, since they should be processed sequentially anyway.

Evenif it was not this particular use case, I did a survey of (dynamic) task scheduling [0] [1] a couple month ago and nothing like that surfaced.

Every scheduling algorithm I read about have some properties that are common to all other tasks like priority, age, enqueue time, task name (and by extension average time to process). If you tasks were all linked to a user you could build a scheduler that takes user_id into account to pick task from the queue.

But I guess, you don't want to build your own scheduler, anyway it would be waste because, from experience with such need, existing message queues allow to implement your requirement.

To summarize your requirements you need:

A scheduler that run only one task per user at the same time.

The solution is to use a distributed lock, something like REDIS distlock and acquire the lock before the task starts and refresh it regularly during the task execution. If a new task for the same user comes in and try to execute it will fail to acquire the lock and will be re-enqueued.

Here is a pseudo-code:

def my_task(user_id, *args, **kwargs):
    if app.distlock(user_id, blocking=False):
        exec_my_task(user_id, *args, **kwargs)
    else:
        raise RetryTask()

Don't forget to refresh and release the lock.

A similar approach is taken to enforce robots.txt delay between every requests in crawlers.

Panthea answered 31/5, 2019 at 9:37 Comment(1)
Using some kind of distributed locking system looks like a good advice, but I would agree with playsted below, that checking this inside consumer after getting the task from the queue is too late. If it was used to choose from which queue to poll a task - that's completely different story.Greatly
C
2

What amirouche is describing would be a simple solution as long as the lock collision doesn't occur very often. If it does you'll be wasting a lot of time on your workers grabbing messages that they have to reject and have the message broker re-queue.

An alternative that solves this sort of problem very well is the Actor model / Actor frameworks. Some examples include Akka, Orleans, Protoactor, and Cadence (mentioned above, although Candence is much more than just an actor framework). These frameworks can get very complex but at their core can ensure messages for a single actor are processed one at a time but allow many actors to be processing at once (there would be an actor per user ID in your scenario). The frameworks abstract all of the message routing and concurrency away from you greatly simplifying the implementation and should be more robust / scalable in the long term.

Calandra answered 3/9, 2019 at 19:36 Comment(0)
H
1

Having a hard requirement of processing order per entity is challenging.

How long-running is each published task? If they are always very short, you could distribute tasks by hash and simply drain the worker pool of running jobs every time it changes shape without losing much productivity.

If they are longer-running, maybe that would be too slow. In that case you could also potentially have the workers take out atomic advisory locks from a fast central service (like Redis or something) for the user_id of each task they consume, for the duration of its execution. This service could also be separately scalable partitioned by user id ranges or what-have-you. If there is enough of a gap between receiving the task and the first side effects from its execution, the worker wouldn't even need to block on the success of taking the lock until it was about to commit, and thereby might not see significantly increased latency. Contention* could be rare: if you are already using some consistent hashing scheme on user_id to distribute work, they would be rare indeed, and still only occur when worker-pool topology changes. You should at least be using hashing distribution to guarantee that there are only two workers competing for the lock: the old one, and the new one.**

If granting the lock was serviced in first-come-first-serve order and locks are requested faster than worker-pool topology changes (that is, workers queue up for the locks as soon as they receive the job from the publisher), this could even give you pretty good guarantees about ordering even when topology changes quite rapidly.

Edits:

*I originally wrote "Failures"; not quite what I meant. The idea is that this lock service would pretty much never experience any locking contention unless the topology changed, since tasks for a given user would always be sent to the same worker normally.

**Another possibility: You could also give good guarantees with only a partial worker pool drain. Without user-level advisory locks, if you are using a consistent hashing scheme to distribute tasks and you can maintain a low water mark for the completion of dispatched tasks, you can defer starting tasks whose target worker is different than it would have been when the oldest currently executing task started (i.e., drain running tasks only for users whose assigned worker changed). It's a fair amount of extra complexity; if you can efficiently track the low water mark and you don't have a long tail of long-running tasks it might be a good option that allows you elide the lock service. However, at the time of writing it's not clear to me whether this would ever be cheaper than locks; low water marks aren't usually cheap to implement reliably, and the death of a worker at the wrong time could delay processing for the entire 1/N cohort that changed workers instead of just the users whose tasks were in-flight on the worker at the time it died.

Harve answered 31/5, 2019 at 17:23 Comment(0)
G
1

Apache Qpid broker supports a feature called message groups, where the relation between routing key and worker is dynamic and based on current traffic.

Consumption ordering means that the broker will not allow outstanding unacknowledged messages to more than one consumer for a given group.

This means that only one consumer can be processing messages from a particular group at a given time. When the consumer acknowledges all of its acquired messages, then the broker may pass the next pending message from that group to a different consumer.

This may give better utilization of workers:

Note well that distinct message groups would not block each other from delivery. For example, assume a queue contains messages from two different message groups - say group "A" and group "B" - and they are enqueued such that "A"'s messages are in front of "B". If the first message of group "A" is in the process of being consumed by a client, then the remaining "A" messages are blocked, but the messages of the "B" group are available for consumption by other consumers - even though it is "behind" group "A" in the queue.

Still this feature probably comes at significant performance price, when compared to other brokers. And there is not much interest in Qpid these days 4 5.

EDIT: There are other brokers that also provide this feature: ActiveMQ and ActiveMQ Artemis. EDIT2: It turns out "message groups" in ActiveMQ and Artemis work differently - assignment of group to worker is static (sticky) not dynamic.

Greatly answered 23/9, 2020 at 12:48 Comment(0)
C
1

Kafka supports exactly what you require. You need to configure a key and kafka will ensure that all messages with the same key will be processed sequentially.

Carduaceous answered 17/10, 2020 at 17:19 Comment(1)
Not sequentially, it would ensure all data with the same key would go to the same nodeBarbital
T
0

I was able to find this discussion of the kind of behavior you're describing by searching for "job queue with category ordering".

Unfortunately it doesn't look like they have a solution to your problem.

There's an answer to a prior question, which suggests against using a message-broker service of any kind for order-sensitive or business-logic-sensitive tasks of any kind, for reasons which may or may not apply to what you're doing. It also points out a technique that seems like it could do what you're trying to do, but which might not scale well for the task at hand.

If you had the option of stickiness, it would solve your problem neatly and with minimal extra inefficiency. Of course stickiness has its own failure modes; there's no reason to think you'll find an implementation that's made the exact trade-offs you'd have made.

I assume, because you've asked the question here, that the per-user sequential-ness is important. In the example you give, of a video platform processing uploads, a sequential-ness violation would be no big deal. More broadly, most people who need massive-throughput load-balanced job queues don't need strong guarantees about the order things get processed in.

If you end up needing to build the thing yourself, you'll have a lot of options. I'm getting the impression you're expecting a huge throughput, a highly parallelized architecture, and a low rate of user-id collisions. In this case you might consider maintaining a list of prerequisites:
When a new task comes in, the balancer searches all in-process, assigned, and not-yet-assigned jobs for any that match the job key (user_id).
If there's an existing match, then the new job is added to the not-yet-assigned list, with the oldest job sharing its key as the prerequisite.
Every time a job finishes, the worker needs to check the not-yet-assigned list to see if it just finished anyone's prerequisite. If it did, the worker could either flag that child job for assignment, or just handle the child job itself.
Of course, this has its own failure modes; you'll have to make trade-offs.

Tonitonia answered 25/5, 2019 at 1:23 Comment(0)
B
-2

Kafka could help as it stores messages for a while, so you can poll them again

Bosh answered 29/5, 2019 at 15:48 Comment(0)
C
-2

If I'm understanding your scenario correctly, I believe the feature you are describing is pretty similar to how Message Sessions work in Azure Service Bus.

You'd basically set the SessionId property of the message to the UserId before pushing them into a queue.

Each consumer will lock onto a session processing messages one after the other and these messages would belong to the same user. Once done, the consumer could just move on to the next available session.

Also, Azure Functions has recently released Service Bus Sessions support which is in preview but allows you to achieve all this with very little effort.

Unfortunately, I'm not familiar enough to know if this feature exists in one of the open source alternates but I hope this helps.

Celaeno answered 31/5, 2019 at 18:24 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.