Disruptor example with 1 publisher and 4 parallel consumers
Asked Answered
C

2

7

In this example https://mcmap.net/q/428408/-disruptor-net-example and here Why is my disruptor example so slow? (at the end of the question) there is 1 publisher which publish items and 1 consumer.

But in my case consumer work is much more complicated and takes some time. So I want 4 consumers that process data in parallel.

So for example if producer produce numbers: 1,2,3,4,5,6,7,8,9,10,11..

I want consumer1 to catch 1,5,9,... consumer2 to catch 2,6,10,... consumer3 to catch 3,7,11,... consumer4 to catch 4,8,12... (well not exactly these numbers, the idea is that data should be processed in parallel, i don't care which certain number is processed on which consumer)

And remember this need to be done parallel because in real application consumer work is pretty expensive. I expect consumers to be executed in different threads to use power of multicore systems.

Of course I can just create 4 ringbuffers and attach 1 consumer to 1 ring-buffer. This way I can use original example. But I feel it wouldn't be correct. Likely it would be correct to create 1 publisher (1 ringbuffer) and 4 consumers - as this is what i need.

Adding link to a very simular question in google groups: https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

So we have two options:

  • one ring many consumers (each consumer will "wake-up" on every addition, all consumer should have the same WaitStrategy)
  • many "one ring - one consumer" (each consumer will wake-up only on data that it should process. each consumer can have own WaitStrategy).
Carine answered 12/11, 2012 at 5:28 Comment(0)
V
2

EDIT: I forgot to mention the code is partially taken from the FAQ. I have no idea if this approach is better or worse than Frank's suggestion.

The project is severely under documented, that's a shame as it looks nice.
Anyway try the following snip (based on your first link) - tested on mono and seems to be OK:

using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }
    }

    public class MyHandler : IEventHandler<ValueEntry>
    {
        private static int _consumers = 0;
        private readonly int _ordinal;

        public MyHandler()
        {
            this._ordinal = _consumers++;
        }

        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            if ((sequence % _consumers) == _ordinal)
                Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
            else
                Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
        }
    }

    class Program
    {
        private static readonly Random _random = new Random();
        private const int SIZE = 16;  // Must be multiple of 2
        private const int WORKERS = 4; 

        static void Main()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
            for (int i=0; i < WORKERS; i++)
                disruptor.HandleEventsWith(new MyHandler());
            var ringBuffer = disruptor.Start();

            while (true)
            {
                long sequenceNo = ringBuffer.Next();
                ringBuffer[sequenceNo].Value =  _random.Next();;
                ringBuffer.Publish(sequenceNo);
                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                Console.ReadKey();
            }
        }
    }
}
Very answered 12/11, 2012 at 6:57 Comment(4)
do you know if call to ringbuffer Next() Publish methods is thread safe? can I call them in parallel? Can I call from two different threads ringbuffer Next method?Carine
also how disruptor makes this internally? how many threads are created? do disruptor creates separate thread for each consumer? or some sort of threadpool is used?Carine
Code indicates both are designed to be thread safe, but I don't see what you'll gain by that. It relies on TPL (tasks) - just read the code.Very
thanks, i've created separate question about "locking" #13350842Carine
A
0

From the specs of the ring-buffer you will see that every consumer will try to process your ValueEvent. in your case you don't need that.

I solved it like this:

Add a field processed to your ValueEvent and when a consumer takes the event he test on that field, if it is already processed he moves on to the next field.

Not the most pretty way, but it's how the buffer works.

Armyworm answered 12/11, 2012 at 5:36 Comment(5)
you need synchronization on that field? do you declare it volatile or using Interlock class to update bool field? Also how to attach more than one consumer to ringbuffer? i can pass only one consumer to HandleEventsWith method. So far for me it sounds that it would be easier to create 4 ring-buffers and cycle over them using next ringbuffer for publishing each time :)Carine
If you create 4 ring buffers, you will loose the "load balancing" feature of the ring buffer, and i suppose that is why you are using it. On your other Q's i am using the JAVA buffer so I can't show you code, but just follow the examples, they are pretty clear.Armyworm
in this particular case i do not need "load balancing" as my tasks are almost "equal" and just splitting them between 4 is ok. However it's interesting feature. Should I follow Java examples? Because I almost can't find any c# examples.Carine
@Frank, did you use a WorkerPool?Dagny
@AlexAverbuch i used: ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS);Armyworm

© 2022 - 2024 — McMap. All rights reserved.