Solution to slow consumer(eventProcessor) issue in LMAX Disruptor pattern
Asked Answered
B

3

7

While using the disruptor, there may be a consumer(s) that is lagging behind, and because of that slow consumer, the whole application is affected.

Keeping in mind that every producer(Publisher) and consumer(EventProcessor) is running on a single thread each, what can be the solution to the slow consumer problem?

Can we use multiple threads on a single consumer? If not, what is a better alternative?

Bremser answered 23/7, 2012 at 3:25 Comment(0)
H
7

Generally speaking use a WorkerPool to allow multiple pooled worker threads to work on a single consumer, which is good if you have tasks that are independent and of a potentially variable duration (eg: some short tasks, some longer).

The other option is to have multiple independent workers parallel process over the events, but each worker only handle modulo N workers (eg 2 threads, and one thread processes odd, one thread processes even event IDs). This works great if you have consistent duration processing tasks, and allows batching to work very efficiently too.

Another thing to consider is that the consumer can do "batching", which is especially useful for example in auditing. If your consumer has 10 events waiting, rather than write 10 events to an audit log independently, you can collect all 10 events and write them at the same time. In my experience this more than covers the need to run multiple threads.

Horehound answered 23/7, 2012 at 6:24 Comment(4)
with respect to the option of having a set of (N) parallel consumers (EventProcessor) doing batching. one thing you must watch out for (at least with the 2.10.x version -- I've not dug through 3.x yet) is that when you split the work across N consumers you cannot rely on endOfBatch being processed by each consumer; it will only be noticed by the ONE consumer set to process the sequence where endOfBatch happens to be true (unless you cheat and look for endOfBatch on all sequences). if there is any lull in events, the other consumers may not see an endOfBatch=true for quite some time.Epicycloid
In current codeline, WorkHandlers do not have access to an endOfBatch flag (ie: it's not even available to the pool consumers).Horehound
But what about if the events must be processed sequentially? I may have missed something here but it looks like what you suggest apply only if you can process the events in parallel.Trotyl
sequential processing is all about "when" and "relations". if you can split the problem into "work", which has no dependent "relation" to any other work, and "commit" to make that work permanent, then you can arrange your EventProcessors so that you do the mod-N processing via several concurrent event processors, and then you follow those up with a single "commit" Event processor. the single commit will still be a bottle neck, so this only works if the "work" is more expensive than the "commit". assuming batching in the "commit" eventprocessor this is fairly easy to ensure a lot of the time.Epicycloid
H
4

Try to separate slow part to other thread (I/O, not O(1) or O(log) calculations, etc.), or to apply some kind of back pressure when the consumer is overloaded (by yielding or temporary parking producers, replying with 503 or 429 status codes, etc.): http://mechanical-sympathy.blogspot.com/2012/05/apply-back-pressure-when-overloaded.html

Heteropolar answered 25/7, 2012 at 13:44 Comment(0)
S
2

Use a set of identical eventHandlers. To avoid more than 1 eventHandler acting upon a single event, I use the following approach.

Create a thread pool of size Number of cores in the system

Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks

Then create a handler array

 HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];

for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
    handlers[i] = new HttpEventHandler(i);
}

disruptor.handleEventsWith(handlers);

In the EventHandler

public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws InterruptedException
{
    if( sequence % Runtime.getRuntime().availableProcessors()==id){

        System.out.println("-----On event Triggered on thread "+Thread.currentThread().getName()+" on sequence "+sequence+" -----");
        //your event handler logic
}
Subacute answered 5/5, 2019 at 12:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.