How to use a disruptor with multiple message types
Asked Answered
H

3

19

My system has two different types of messages - type A and B. Each message has a different structure - type A contains an int member and type B contains a double member. My system needs to pass both types of messages to numerous business logic threads. Reducing latency is very important so I am investigating using a Disruptor to pass messages from the main thread to the business logic threads in a mechanically sympathetic manner.

My problem is that the disruptor only accepts one type of object in the ring buffer. This makes sense because the disruptor pre-allocates the objects in the ring buffer. However, it also makes it difficult to pass two different types of messages to my business logic threads via the Disruptor. From what I can tell, I have four options:

  1. Configure the disruptor to use objects containing a fixed size byte array (as recommended by How should one use Disruptor (Disruptor Pattern) to build real-world message systems?). In this case, the main thread must encode the messages into byte arrays before publishing them to the disruptor and each of the business logic threads must decode the byte arrays back into objects upon receipt. The downside of this setup is that the business logic threads are not truly sharing the memory from the disruptor - instead they are creating new objects (and thus creating garbage) from the byte array provided by the disruptor. The upside of this setup is that all business logic threads can read multiple different types of messages from the same disruptor.

  2. Configure the disruptor to use a single type of object but create multiple disruptors, one for for each object type. In the case above, there would be two separate disruptors - one for objects of type A and another for objects of type B. The upside of this setup is that the main thread doesn't have to encode the object to a byte array and the business less logic threads can share the same objects as used in the disruptor (no garbage created). The downside of this setup is that somehow each business logic thread will have to subscribe to messages from multiple disruptors.

  3. Configure the disruptor to use a single type of "super" object that contains all fields of both message A and B. This is very against OO style, but will allow for a compromise between option #1 and #2.

  4. Configure the disruptor to use object references. However, in this case I lose the performance benefits of object preallocation and memory ordering.

What do you recommend for this situation? I feel that option #2 is the cleanest solution, but I don't know whether or how consumers can technically subscribe to messages from multiple disruptors. If anyone can provide an example for how to implement option #2, it would be much appreciated!

Hitherto answered 30/5, 2013 at 21:12 Comment(3)
Michael Barker answered my question in the Disruptor Google Group. Check out his response below: groups.google.com/d/msg/lmax-disruptor/clUkJaFMsZg/54fKplz21MwJHitherto
If that answers your question consider converting it into an answer here and mark that answer as accepted.Junia
I've had success with approach (2), with chained Disruptors where a handler of the first feeds the secondBoscage
A
3

Configure the disruptor to use objects containing a fixed size byte array (as recommended by How should one use Disruptor (Disruptor Pattern) to build real-world message systems?). In this case, the main thread must encode the messages into byte arrays before publishing them to the disruptor and each of the business logic threads must decode the byte arrays back into objects upon receipt. The downside of this setup is that the business logic threads are not truly sharing the memory from the disruptor - instead they are creating new objects (and thus creating garbage) from the byte array provided by the disruptor. The upside of this setup is that all business logic threads can read multiple different types of messages from the same disruptor.

This would be my preferred approach, but I slightly coloured by our use cases, just about every place that we've used the Disruptor it's either receiving from or sending to some sort of I/O device, so our basic currency is byte arrays. You can get around the object creation by using a flyweight approach to marshalling. To see an example of this, I used Javolution's Struct and Union classes in an example that I presented at Devoxx (https://github.com/mikeb01/ticketing). If you can completely deal with the object before returning from the onEvent call from the event handler then this approach works well. If the event needs to live beyond that point then you need to make some sort of copy of the data, e.g. de-serialising it into an object.

Configure the disruptor to use a single type of object but create multiple disruptors, one for for each object type. In the case above, there would be two separate disruptors - one for objects of type A and another for objects of type B. The upside of this setup is that the main thread doesn't have to encode the object to a byte array and the business less logic threads can share the same objects as used in the disruptor (no garbage created). The downside of this setup is that somehow each business logic thread will have to subscribe to messages from multiple disruptors.

Not tried this approach, you'd probably need a custom EventProcessor that can poll from multiple ring buffers.

Configure the disruptor to use a single type of "super" object that contains all fields of both message A and B. This is very against OO style, but will allow for a compromise between option #1 and #2. Configure the disruptor to use object references. However, in this case I lose the performance benefits of object preallocation and memory ordering.

We've done this in a couple of cases where some cases where lack of preallocation is tolerable. It works okay. If you are passing objects then you need to make sure that you null them out once you are finished with them on the consumer side. We found that using a double dispatch pattern for the "super" object kept the implementation fairly clean. One drawback to this is that it you will get slightly longer GC stalls that with something that was a straight array of objects as the GC has more live objects to traverse during the mark phase.

What do you recommend for this situation? I feel that option #2 is the cleanest solution, but I don't know whether or how consumers can technically subscribe to messages from multiple disruptors. If anyone can provide an example for how to implement option #2, it would be much appreciated!

Another option, if you want complete flexibility with regards to the use of data, is to not use the ring buffer, but instead talk directly to the Sequencer and define your object layout as you best see fitting.

Arthrospore answered 18/5, 2015 at 16:16 Comment(0)
I
2

Ben Baumgold, I am sure you found a solution by now. Your #4 (or #3) can be implemented trivially by creating an event holder. Think of it as enum for Objects. To speed look-ups, events should be enriched with an enum type. Notice, I am storing a reference to the original event in the holder. It may be more appropriate to create a copy constructor or clone() and copy events on insertion into the ring buffer.

Illustrating by example:

// this is enum used in events

public enum MyEventEnum {
EVENT_TIMER,
EVENT_MARKETDATA;
}

// this is holder. At any time, this instance in ringbuffer holds just one event indexed by array[ type.ordinal() ]. why array should be obvious from the code.

public class RingBufferEventHolder {    
 private MyEventEnum;   
 private EventBase array[];

 public RingBufferEventHolder() {
    array=new EventBase[MyEventEnum.values().length]; 
 }

 // TODO: null the rest
 public void setEvent(EventBase event) {
    type=event.getType();
    switch( event.getType() ) {
        case EVENT_TIMER:
            array[MyEventEnum.EVENT_TIMER.ordinal()]=event;
            break;
        case EVENT_MARKETDATA:
            array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event;
            break;
        default:
            throw new RuntimeException("Unknown event type " + event );
    }
}

// publish event

   EventBase newEvent=new EventMarketData(....);
   // prepare
   long nextSequence = ringBuffer.next(); 
   RingBufferEventHolder holder = ringBuffer.get(nextSequence);
   holder.setEvent(newEvent);
   // make the event available to EventProcessors 
   ringBuffer.publish(nextSequence);
Incommensurate answered 20/10, 2016 at 6:13 Comment(1)
Too close to my implementation concerning same question. Main difference is use of array. My Event class looks like; EventBase{ private EventTypeEnum type; // [NEW_ORDER, CANCEL_ORDER, L2MarketData, MarketUpdates] private OrderEvent orderEvent; private MarketDataEvent marketEvent; .... }Pandowdy
P
0

Too similar to Vortex's answer but differentiates on keeping sub events. It is mix of #3 and #4. If i can manage business logic complexity i would go to #2 multiple distruptors.

Main concern to prefer over array based enumarated event type implementation is shared object types for different event types.

public enum ExchangeEventType{
    PLACE_ORDER,   // -> OrderEvent
    CANCEL_ORDER,  // -> OrderEvent
    MARKET_FEED,   // -> MarketEvent
    MARKET_UPDATE, // -> MarketEvent
    ADD_USER,      // -> AccountEvent
    SUSPEND_USER,  // -> AccountEvent
    RESUME_USER    // -> AccountEvent
}    

public ExchangeEvent{
  private EventType type;
  private EventResultCode resultCode;
  private long timestamp;

  // event type objects
  private OrderEvent orderEvent;
  private MarketEvent marketEvent;
  private AccountEvent accountEvent;
}

In business logic multiple processors consumes and produces multiple type events, so not using separate distruptors a trade-off i choose consciously.

For example;

  • #1 Engine using OrderEvent & AccountEvent
  • #2 Engine using MarketEvent & OrderEvent
Pandowdy answered 4/1, 2021 at 8:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.