Why is my disruptor example so slow?
Asked Answered
T

2

2

I've taken the code example from Stack Overflow question Disruptor.NET example and modified it to "measure" time. Full listing is below:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }

        public ValueEntry()
        {
            Console.WriteLine("New ValueEntry created");
        }
    }

    public class ValueAdditionHandler : IEventHandler<ValueEntry>
    {
        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            Program.sw.Stop();
            long microseconds = Program.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
            Console.WriteLine("elapsed microseconds = " + microseconds);
            Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
        }
    }

    class Program
    {
        public static Stopwatch sw = Stopwatch.StartNew();

        private static readonly Random _random = new Random();
        private static readonly int _ringSize = 16;  // Must be multiple of 2

        static void Main(string[] args)
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

            disruptor.HandleEventsWith(new ValueAdditionHandler());

            var ringBuffer = disruptor.Start();

            while (true)
            {
                var valueToSet = _random.Next();
                long sequenceNo = ringBuffer.Next();

                ValueEntry entry = ringBuffer[sequenceNo];

                entry.Value = valueToSet;

                sw.Restart();
                ringBuffer.Publish(sequenceNo);

                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

                Thread.Sleep(1000);
            }
        }
    }
}

And the output is:

New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
Published entry 0, value 1510145842
elapsed microseconds = 2205
Event handled: Value = 1510145842 (processed event 0
Published entry 1, value 1718075893
elapsed microseconds = 85
Event handled: Value = 1718075893 (processed event 1
Published entry 2, value 1675907645
elapsed microseconds = 32
Event handled: Value = 1675907645 (processed event 2
Published entry 3, value 1563009446
elapsed microseconds = 75
Event handled: Value = 1563009446 (processed event 3
Published entry 4, value 1782914062
elapsed microseconds = 34
Event handled: Value = 1782914062 (processed event 4
Published entry 5, value 1516398244
elapsed microseconds = 50
Event handled: Value = 1516398244 (processed event 5
Published entry 6, value 76829327
elapsed microseconds = 50
Event handled: Value = 76829327 (processed event 6

So it takes about 50 microseconds to pass data from one thread to another. But it is not fast at all! "The current version of the Disruptor can do ~50 ns between threads at a rate of 1 million messages per second." So my results are 1000 times slower than expected.

What's wrong with my example and how do achieve 50 ns speed?

I've modified program above and now receive 1 microsecond delay, which is much better. However, I am still waiting for the response from disruptor pattern experts. I'm looking for an example which can prove that I can actually pass data in 50 ns.

Also I wrote the same test using BlockingCollection and received 14 microseconds in average, which proves that Disruptor is faster:

Using BlockingCollection:

average = 14 minimum = 0 0-5 = 890558, 5-10 = 1773781, 10-30 = 6900128, >30 = 435433

Using Disruptor:

average = 0 minimum = 0 0-5 = 9908469, 5-10 = 64464, 10-30 = 19902, >30 = 7065

BlockingCollection code:

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public int Value { get; set; }

        public ValueEntry()
        {
            //   Console.WriteLine("New ValueEntry created");
        }
    }

    //public class ValueAdditionHandler : IEventHandler<ValueEntry>
    //{
    //    public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
    //    {

    //        long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
    //        Program.results[data.Value] = microseconds;
    //        //Console.WriteLine("elapsed microseconds = " + microseconds);
    //        //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
    //    }
    //}

    class Program
    {
        public const int length = 10000000;
        public static Stopwatch[] sw = new Stopwatch[length];
        public static long[] results = new long[length];

        static BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(150);

        static void Main(string[] args)
        {
            for (int i = 0; i < length; i++)
            {
                sw[i] = Stopwatch.StartNew();
            }

            // A simple blocking consumer with no cancellation.
            Task.Factory.StartNew(() =>
            {
                while (!dataItems.IsCompleted)
                {

                    ValueEntry ve = null;
                    try
                    {
                        ve = dataItems.Take();
                        long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
                        results[ve.Value] = microseconds;

                        //Console.WriteLine("elapsed microseconds = " + microseconds);
                        //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
                    }
                    catch (InvalidOperationException) { }
                }
            }, TaskCreationOptions.LongRunning);

            for (int i = 0; i < length; i++)
            {
                var valueToSet = i;

                ValueEntry entry = new ValueEntry();
                entry.Value = valueToSet;

                sw[i].Restart();
                dataItems.Add(entry);

                //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
                //Thread.Sleep(1000);
            }

            // Wait until all events are delivered
            Thread.Sleep(5000);

            long average = 0;
            long minimum = 10000000000;
            int firstFive = 0;
            int fiveToTen = 0;
            int tenToThirty = 0;
            int moreThenThirty = 0;

            // Do not count first 100 items because they could be extremely slow
            for (int i = 100; i < length; i++)
            {
                average += results[i];
                if (results[i] < minimum)
                {
                    minimum = results[i];
                }
                if (results[i] < 5)
                {
                    firstFive++;
                }
                else if (results[i] < 10)
                {
                    fiveToTen++;
                }
                else if (results[i] < 30)
                {
                    tenToThirty++;
                } else
                {
                    moreThenThirty++;
                }
            }
            average /= (length - 100);
            Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty);
        }
    }
}

Disruptor code:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public int Value { get; set; }

        public ValueEntry()
        {
            //   Console.WriteLine("New ValueEntry created");
        }
    }

    public class ValueAdditionHandler : IEventHandler<ValueEntry>
    {
        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {

            long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
            Program.results[data.Value] = microseconds;
            //Console.WriteLine("elapsed microseconds = " + microseconds);
            //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
        }
    }

    class Program
    {
        public const int length = 10000000;
        public static Stopwatch[] sw = new Stopwatch[length];
        public static long[] results = new long[length];

        private static readonly Random _random = new Random();
        private static readonly int _ringSize = 1024;  // Must be multiple of 2

        static void Main(string[] args)
        {
            for (int i = 0; i < length; i++)
            {
                sw[i] = Stopwatch.StartNew();
            }

            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

            disruptor.HandleEventsWith(new ValueAdditionHandler());

            var ringBuffer = disruptor.Start();

            for (int i = 0; i < length; i++)
            {
                var valueToSet = i;
                long sequenceNo = ringBuffer.Next();

                ValueEntry entry = ringBuffer[sequenceNo];

                entry.Value = valueToSet;

                sw[i].Restart();
                ringBuffer.Publish(sequenceNo);

                //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

                //Thread.Sleep(1000);
            }

            // wait until all events are delivered
            Thread.Sleep(5000);

            long average = 0;
            long minimum = 10000000000;
            int firstFive = 0;
            int fiveToTen = 0;
            int tenToThirty = 0;
            int moreThenThirty = 0;

            // Do not count first 100 items because they could be extremely slow
            for (int i = 100; i < length; i++)
            {
                average += results[i];
                if (results[i] < minimum)
                {
                    minimum = results[i];
                }
                if (results[i] < 5)
                {
                    firstFive++;
                }
                else if (results[i] < 10)
                {
                    fiveToTen++;
                }
                else if (results[i] < 30)
                {
                    tenToThirty++;
                }
                else
                {
                    moreThenThirty++;
                }
            }
            average /= (length - 100);
            Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty);
        }
    }
}
Tiercel answered 11/11, 2012 at 19:49 Comment(5)
You cannot count elapsed time on single items, Stopwatch (or any timer mechanism) doesn't have enough precision. You should count thousands, millions of items to get statistically correct measurement.Navarro
@Navarro stopwatch has enough precision to count microsecondsTiercel
Nope. Also, changes in runtime enviroment, GC, printing to console can easily disrupt such tests. Simply fix your tests first before trying to find fault with code.Navarro
Your measurement is still wrong. Count WHOLE run and then divide by number of items. Counting time for each item is wrong.Navarro
@Navarro how can I do that? i then need wait while previous run is completed before starting new one. So I will need some kind of synchronization, but this will destroy idea of disruptor.Tiercel
N
1

Here, I fixed your code:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public int Value { get; set; }

        public ValueEntry()
        {
         //   Console.WriteLine("New ValueEntry created");
        }
    }

    class Program
    {
        public const int length = 1000000;
        public static Stopwatch sw;

        private static readonly Random _random = new Random();
        private static readonly int _ringSize = 1024;  // Must be multiple of 2

        static void Main(string[] args)
        {
            sw = Stopwatch.StartNew();

            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

            var ringBuffer = disruptor.Start();

            for (int i = 0; i < length; i++)
            {
                var valueToSet = i;
                long sequenceNo = ringBuffer.Next();

                ValueEntry entry = ringBuffer[sequenceNo];

                entry.Value = valueToSet;

                ringBuffer.Publish(sequenceNo);

                //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

                //Thread.Sleep(1000);
            }

            var elapsed = sw.Elapsed.Miliseconds();
            // wait until all events are delivered
            Thread.Sleep(10000);

            double average = /(double)length;
            Console.WriteLine("average = " + average);
        }
    }
}

This should correctly test how long does it take for each item.

Navarro answered 11/11, 2012 at 20:32 Comment(1)
this is likely wrong. you test how long it takes to publish each item. i need to test how long it takes to publish item and receive it in another thread. I thinkg that ringBuffer.Publish(sequenceNo); call is asynchronous and that's why your code is measuring only half of the work. There could be a lot of "subscribers" and I do not think that publisher will wait until all subscribers process data.Tiercel
H
1

I read the BlockingCollecton code, You add many Console.WriteLine in Disruptor but no one in BlockingCollection, Console.WriteLine is slow, it have a lock inside.

Your RingBufferSize is too small, this effects performance, shoule be 1024 or larger.

and while (!dataItems.IsCompleted) may have some problem, BlockCollection isn't always in adding state, it will cause thread ends early.

Task.Factory.StartNew(() => {
    while (!dataItems.IsCompleted)
    {

        ValueEntry ve = null;
        try
        {
    ve = dataItems.Take();
    long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
    results[ve.Value] = microseconds;

    //Console.WriteLine("elapsed microseconds = " + microseconds);
    //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
        }
        catch (InvalidOperationException) { }
    }
}, TaskCreationOptions.LongRunning);


for (int i = 0; i < length; i++)
{
    var valueToSet = i;

    ValueEntry entry = new ValueEntry();
    entry.Value = valueToSet;

    sw[i].Restart();
    dataItems.Add(entry);

    //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
    //Thread.Sleep(1000);
}

I have rewrite you code, Disruptor is 10x faster than BlockingCollection with multi producer (10 parallel producet), 2x faster than BlockingCollection with Single producer:

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
using NUnit.Framework;

namespace DisruptorTest.Ds
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class MyHandler : IEventHandler<ValueEntry>
    {
        public void OnEvent(ValueEntry data, long sequence, bool endOfBatch)
        {
        }
    }

    [TestFixture]
    public class DisruptorPerformanceTest
    {
        private volatile bool collectionAddEnded;

        private int producerCount = 10;
        private int runCount = 1000000;
        private int RingBufferAndCapacitySize = 1024;

        [TestCase()]
        public async Task TestBoth()
        {
            for (int i = 0; i < 1; i++)
            {
                foreach (var rs in new int[] {64, 512, 1024, 2048 /*,4096,4096*2*/})
                {
                    Console.WriteLine($"RingBufferAndCapacitySize:{rs}, producerCount:{producerCount}, runCount:{runCount} of {i}");
                    RingBufferAndCapacitySize = rs;
                    await DisruptorTest();
                    await BlockingCollectionTest();
                }
            }
        }

        [TestCase()]
        public async Task BlockingCollectionTest()
        {
            var sw = new Stopwatch();
            BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(RingBufferAndCapacitySize);

            sw.Start();

            collectionAddEnded = false;

            // A simple blocking consumer with no cancellation.
            var task = Task.Factory.StartNew(() =>
            {
                while (!collectionAddEnded && !dataItems.IsCompleted)
                {
                    //if (!dataItems.IsCompleted && dataItems.TryTake(out var ve))
                    if (dataItems.TryTake(out var ve))
                    {
                    }
                }
            }, TaskCreationOptions.LongRunning);


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        ValueEntry entry = new ValueEntry();
                        entry.Id = i;
                        dataItems.Add(entry);
                    }
                });
            }

            await Task.WhenAll(tasks);

            collectionAddEnded = true;
            await task;

            sw.Stop();

            Console.WriteLine($"BlockingCollectionTest Time:{sw.ElapsedMilliseconds/1000d}");
        }


        [TestCase()]
        public async Task DisruptorTest()
        {
            var disruptor =
                new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingBufferAndCapacitySize, TaskScheduler.Default,
                    producerCount > 1 ? ProducerType.Multi : ProducerType.Single, new BlockingWaitStrategy());
            disruptor.HandleEventsWith(new MyHandler());

            var _ringBuffer = disruptor.Start();

            Stopwatch sw = Stopwatch.StartNew();

            sw.Start();


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        long sequenceNo = _ringBuffer.Next();
                        _ringBuffer[sequenceNo].Id = 0;
                        _ringBuffer.Publish(sequenceNo);
                    }
                });
            }


            await Task.WhenAll(tasks);


            disruptor.Shutdown();

            sw.Stop();
            Console.WriteLine($"DisruptorTest Time:{sw.ElapsedMilliseconds/1000d}s");
        }
    }
}

BlockingCollectionTest with a shared ValueEntry instance (no new ValueEntry() in for loop)

  • RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:16.962s

    BlockingCollectionTest Time:18.399

  • RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0 DisruptorTest Time:6.101s

    BlockingCollectionTest Time:19.526

  • RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.928s

    BlockingCollectionTest Time:20.25

  • RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.448s

    BlockingCollectionTest Time:20.649

BlockingCollectionTest create a new ValueEntry() in for loop

  • RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:27.374s

    BlockingCollectionTest Time:21.955

  • RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:5.011s

    BlockingCollectionTest Time:20.127

  • RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.877s

    BlockingCollectionTest Time:22.656

  • RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.384s

    BlockingCollectionTest Time:23.567

https://www.cnblogs.com/darklx/p/11755686.html

Handel answered 28/10, 2019 at 6:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.