Disruptor - EventHandlers not invoked
Asked Answered
C

3

6

I'm playing around with the Disruptor framework, and am finding that my event handlers are not being invoked.

Here's my setup code:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

Elsewhere, I publish events. I've tried each of the following two approaches:

Event Publishing Approach A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

In this scenario, I find the the first EventHandler gets invoked, but never anything beyond that.

Event Publishing Approach B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

In this scenario, I find that none of the event handlers get invoked at all.

What am I doing wrong?

Update

Here's my EventHandler in it's entirety. How should I be signalling that processing is complete?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}
Crake answered 29/11, 2011 at 11:10 Comment(0)
T
8

Each event handler needs to run in its own thread which wont exit until you shutdown the disruptor. Since you're using a single threaded executor, only the first event handler that happens to execute will ever run. (The Disruptor class stores each handler in a hashmap so which handler winds up running will vary)

If you switch to a cachedThreadPool you should find it all starts running. You won't need to do any management of the sequence numbers because that's all handled by the EventProcessor that the Disruptor class sets up and manages for you. Just processing each event you get is exactly right.

Townshend answered 29/11, 2011 at 18:21 Comment(2)
May i ask why sequence number is an input to the onEvent() if sequence numbers are managed by Disruptor class? Thanks.Wedekind
The sequence number turns out to be a really useful bit of information. For example, LMAX expose the current sequence number that each consumer is up to via JMX which helps us monitor that the system is running as expected, how far behind consumers are etc. We also use that sequence number as an ID for messages to support reliable messaging and various other functions.Townshend
P
2

You need to make sure your searchTermMatchingHandler is updating its sequence number after it processes the event. The EventHandlers further downstream (appendStatusHandler, updatePriceHandler, persistUpdatesHandler) will be inspecting the searchTermMatchingHandler sequence number to see which events they can pick up off the ring buffer.

Philomel answered 29/11, 2011 at 11:48 Comment(2)
Thanks for the response, Trisha. I'm not sure I understand. I've updated my question to show my EventHandler, however I don't see in the EventHandler interface how I'm supposed to signal to future handlers that work is completed? The EventHandler interface doesn't expose a sequence number?Crake
Marty, Adrian is correct, you don't need to worry about this because the Disruptor class handles this for you, sorry for the confusion.Philomel
H
0

I had the same problem, but it was because I was instantiating the Disruptor using Spring (Java config) and was instantiating the Executor in the same @Bean method as the Disruptor.

I fixed the problem by instantiating the Executor in a separate @Bean method.

Hearne answered 24/3, 2014 at 12:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.