ZeroMQ PUB/SUB Pattern with Multi-Threaded Poller Cancellation
Asked Answered
G

2

18

I have two applications, a C++ server, and a C# WPF UI. The C++ code takes requests (from anywhere/anyone) via a ZeroMQ messaging [PUB/SUB] service. I use my C# code for back testing and to create "back tests" and execute them. These back tests can be made up of many "unit tests" and each of these sending/receiving thousands of messages from the C++ server.

Currently individual back tests work well can send off N unit tests each with thousands of requests and captures. My problem is architecture; when I dispatch another back test (following the first) I get a problem with event subscription being done a second time due to the polling thread not being cancelled and disposed. This results in erroneous output. This may seem like a trivial problem (perhaps it is for some of you), but the cancellation of this polling Task under my current configuration is proving troublesome. Some code...

My message broker class is simple and looks like

public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    private Task pollingTask;
    private NetMQContext context;
    private PublisherSocket pubSocket;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    public MessageBroker()
    {
        this.source = new CancellationTokenSource();
        this.token = source.Token;

        StartPolling();
        context = NetMQContext.Create();
        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);
    }

    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    private void StartPolling()
    {
        pollerCancelled = new ManualResetEvent(false);
        pollingTask = Task.Run(() =>
        {
            try
            {
                using (var context = NetMQContext.Create())
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    while (true)
                    {
                        buffer = subSocket.Receive();
                        MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                        if (this.token.IsCancellationRequested)
                            this.token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                pollerCancelled.Set();
            }
        }, this.token);
    }

    private void CancelPolling()
    {
        source.Cancel();
        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }

    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
    public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
    public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (this.pollingTask != null)
                {
                    CancelPolling();
                    if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
                         this.pollingTask.Status == TaskStatus.Faulted ||
                         this.pollingTask.Status == TaskStatus.Canceled)
                    {
                        this.pollingTask.Dispose();
                        this.pollingTask = null;
                    }
                }
                if (this.context != null)
                {
                    this.context.Dispose();
                    this.context = null;
                }
                if (this.pubSocket != null)
                {
                    this.pubSocket.Dispose();
                    this.pubSocket = null;
                }
                if (this.source != null)
                {
                  this.source.Dispose();
                  this.source = null;
                }
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    ~MessageBroker()
    {
        Dispose(false);
    }
}

The backtesting "engine" use to execute each back test, first constructs a Dictionary containing each Test (unit test) and the messages to dispatch to the C++ application for each test.

The DispatchTests method, here it is

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
    broker = new MessageBroker();
    broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
    testCompleted = new ManualResetEvent(false);

    try
    {
        // Loop through the tests. 
        foreach (var kvp in feedMuxCollection)
        {
            testCompleted.Reset();
            Test t = kvp.Key;
            t.Bets = new List<Taurus.Bet>();
            foreach (Taurus.FeedMux mux in kvp.Value)
            {
                token.ThrowIfCancellationRequested();
                broker.Dispatch(mux);
            }
            broker.Dispatch(new Taurus.FeedMux()
            {
                type = Taurus.FeedMux.Type.PING,
                ping = new Taurus.Ping() { event_id = t.EventID }
            });
            testCompleted.WaitOne(); // Wait until all messages are received for this test. 
        }
        testCompleted.Close();
    }
    finally
    {
        broker.Dispose(); // Dispose the broker.
    }
}

The PING message at the end, it to tell the C++ that we are finished. We then force a wait, so that the next [unit] test is not dispatched before all of the returns are received from the C++ code - we do this using a ManualResetEvent.

When the C++ receives the PING message, it sends the message straight back. We handle the received messages via OnMessageRecieved and the PING tells us to set the ManualResetEvent.Set() so that we can continue the unit testing; "Next Please"...

private async void OnMessageRecieved(Taurus.FeedMux mux)
{
    string errorMsg = String.Empty;
    if (mux.type == Taurus.FeedMux.Type.MSG)
    {
        // Do stuff.
    }
    else if (mux.type == Taurus.FeedMux.Type.PING)
    {
        // Do stuff.

        // We are finished reciving messages for this "unit test"
        testCompleted.Set(); 
    }
}

My problem is that, broker.Dispose() in the finally above is never hit. I appreciate that finally blocks that are executed on background threads are not guaranteed to get executed.

The crossed out text above was due to me messing about with the code; I was stopping a parent thread before the child had completed. However, there are still problems...

Now broker.Dispose() is called correctly, and broker.Dispose() is called, in this method I attempt to cancell the poller thread and dispose of the Task correctly to avoid any multiple subscriptions.

To cancel the thread I use the CancelPolling() method

private void CancelPolling()
{
    source.Cancel();
    pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
    pollerCancelled.Close();
}

but in the StartPolling() method

while (true)
{
    buffer = subSocket.Receive();
    MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
    if (this.token.IsCancellationRequested)
        this.token.ThrowIfCancellationRequested();
}

ThrowIfCancellationRequested() is never called and the thread is never cancelled, thus never properly disposed. The poller thread is being blocked by the subSocket.Receive() method.

Now, it is not clear to me how to achieve what I want, I need to invoke the broker.Dispose()/PollerCancel() on a thread other than that used to poll for messages and some how force the cancellation. Thread abort is not what I want to get into at any cost.

Essentially, I want to properly dispose of the broker before executing the next back test, how do I correctly handle this, split out the polling and run it in a separate Application Domain?

I have tried, disposing inside the OnMessageRecived handler, but this is clearly executed on the same thread as the poller and is not the way to do this, without invoking additional threads, it blocks.

What is the best way to achieve what I want and is there a pattern for this sort of case that I can follow?

Thanks for your time.

Gyratory answered 30/4, 2015 at 20:34 Comment(1)
I don't understand what is the problem. You said broker.Dispose() is never hit. But why? Did you debug and see why?Pardoes
G
2

This is how I eventually got around this [although I am open to a better solution!]

public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    // Vars.
    private NetMQContext context;
    private PublisherSocket pubSocket;
    private Poller poller;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    /// <summary>
    /// Default ctor.
    /// </summary>
    public FeedMuxMessageBroker()
    {
        context = NetMQContext.Create();

        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);

        pollerCancelled = new ManualResetEvent(false);
        source = new CancellationTokenSource();
        token = source.Token;
        StartPolling();
    }

    #region Methods.
    /// <summary>
    /// Send the mux message to listners.
    /// </summary>
    /// <param name="message">The message to dispatch.</param>
    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    /// <summary>
    /// Start polling for messages.
    /// </summary>
    private void StartPolling()
    {
        Task.Run(() =>
            {
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    subSocket.ReceiveReady += (s, a) =>
                    {
                        buffer = subSocket.Receive();
                        if (MessageRecieved != null)
                            MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                    };

                    // Poll.
                    poller = new Poller();
                    poller.AddSocket(subSocket);
                    poller.PollTillCancelled();
                    token.ThrowIfCancellationRequested();
                }
            }, token).ContinueWith(ant => 
                {
                    pollerCancelled.Set();
                }, TaskContinuationOptions.OnlyOnCanceled);
    }

    /// <summary>
    /// Cancel polling to allow the broker to be disposed.
    /// </summary>
    private void CancelPolling()
    {
        source.Cancel();
        poller.Cancel();

        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }
    #endregion // Methods.

    #region Properties.
    /// <summary>
    /// Event that is raised when a message is recived. 
    /// </summary>
    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }

    /// <summary>
    /// The address to use for the publisher socket.
    /// </summary>
    public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }

    /// <summary>
    /// The address to use for the subscriber socket.
    /// </summary>
    public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
    #endregion // Properties.

    #region IDisposable Members.
    private bool disposed = false;

    /// <summary>
    /// Dispose managed resources.
    /// </summary>
    /// <param name="disposing">Is desposing.</param>
    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                CancelPolling();
                if (pubSocket != null)
                {
                    pubSocket.Disconnect(PublisherAddress);
                    pubSocket.Dispose();
                    pubSocket = null;
                }
                if (poller != null)
                {
                    poller.Dispose();
                    poller = null;
                }
                if (context != null)
                {
                    context.Terminate();
                    context.Dispose();
                    context = null;
                }
                if (source != null)
                {
                    source.Dispose();
                    source = null;
                }
            }

            // Shared cleanup logic.
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// Finalizer.
    /// </summary>
    ~FeedMuxMessageBroker()
    {
        Dispose(false);
    }
    #endregion // IDisposable Members.
}

So we poll in the same way, but using the Poller class from NetMQ. In the Task continuation we set so we are sure that both the Poller and Task are cancelled. We are then safe to dispose...

Gyratory answered 5/5, 2015 at 9:22 Comment(3)
Your solution looks good. You could also use the socket option DontWait (there's a bool dontWait overload on Receive too) or set a custom ReceiveTimeout on the socket to do your own polling loop. You may find it useful to use NetMQMonitor to get events for various state changes (such as connection and disconnection). FYI - you should copy the MessageReceived event into a temp variable before calling it for thread-safety. And you should remove the finalizer unless you're cleaning up some unmamanged resources not seen in your example.Beutner
"you should copy the MessageReceived event into a temp variable before calling it for thread-safety", but the event handler will be invoked on the same thread as the invoking thread will it not? Also, this is an IProgress<Taurus.FeedMux> object as well, does this make any difference here? What do you mean about the finally block, there is no finally in the above solution - I am forcing the Dispose method to cancel and dispose the thread/Task so the continuation tells the ManualResetEvent that it can continue. Any further insight would be most appreciated - thanks for your time...Gyratory
I misread MessageReceived. I thought it was an event and is commented as such. But basically the problem is after the null check and before the invoke another thread could set it to null. So the common pattern when calling events is to copy them into a temp variable first. Even if that's not the case it's a good habit to get into. I meant the finalizer which is your ~FeedMuxMessageBroker method. Finalizers are used to clean up unmanaged resources and you don't have any.Beutner
I
1

A higher-level view on subject

Your focus and efforts, dedicated to creating a testing framework, signal that your will aims at developing a rigorous and professional-grade approach, which has made me first raise my hat in a salute of admiration to such brave undertaking.

While testing is an important activity for providing a reasonable quantitative evidence, that a System Under Test is meeting defined expectations, the success in this depends on how close the testing environment meets the real-deployment's conditions.

One may agree, that testing on another, different, bases does not prove the real-deployment will run as expected in an environment, that is principally different from the tested one(s).


Element-wise control or just a state-wise control, that's the question.

Your efforts ( at least at the time of OP was posted ) concentrate on code-architecture, that tries to keep instances in-place and tries to re-set an internal state of a Poller instance before a next test-battery starts.

In my view, testing has a few principles to follow, should you strive for professional testing:

  • Principle of a Test Repeatability ( tests' re-runs shall serve same results, thus avoiding a quasi-testing that provides just a result-"lottery" )

  • Principle of Non-Intervening Testing ( tests' re-runs shall not be subject of "external"-interference, not controlled by the test scenario )

Having said this, let me bring a few notes inspired by Harry Markowitz, Nobelist awarded for his remarkable quantitative portfolio optimisation studies.

Rather move one step back to get control over elements' full life-cycle

CACI Simulations, Inc., ( one of Harry Markowitz's companies ) developed in the early 90s their flagship software framework COMET III - an exceptionally powerful simulation engine for large, complex design-prototyping and performance-simulations of processes operated in large-scale computing / networking / telco networks.

The greatest impression from COMET III was it's capability to generate testing scenarios including a configurable pre-test "warm-up" pre-load(s), that have made the tested elements get into a state similar to what "fatigue" means in mechanical torture-test experiments or what hydrogen-diffusion fragility means to nuclear power-plant metallurgists.

Yes, once you go into low-level details on how algorithms, node-buffers, memory-allocations, pipe-lined / load-balanced / grid-processing architecture selections, fault-resilience overheads, garbage collection policies and limited resource-sharing algorithms work and impact ( under real-use work-load patterns "pressure" ) end-to-end performance / latencies, this feature is simply indispensable.

This means, that an individual instance-related simple state-wise control is not sufficient, as it does not provide means for either the test-repeatability and test-isolation/non-intervening behaviour. Simply put, even if you find a way to "reset" a Poller instance, this will not get you into realistic testing with guaranteed test-repeatability with pre-test warm-up(s) possible.

A step-back and a higher layer of abstraction and test-scenario controls is needed.

How does this apply to the OP problem?

  • Instead of just state-control
  • Create a multi-layer-ed architecture / control-plane(s) / separate signalling

A ZeroMQ way of supporting this goal

  • Create super-structures as non-trivial patterns
  • Use full life-cycle controls of instances used inside testing scenarios
  • Keep ZeroMQ-maxims: Zero-sharing, Zero-blocking, ...
  • Benefit from Multi-Context()
Intermezzo answered 1/5, 2015 at 10:15 Comment(3)
Thanks for your answer, but I am not sure if this answers my question. My architecture for this Test Explorer is non-trivial as it is and the idea is to always simplify where ever possible, not over abstract and complicate unnecessarily. If you could provide an example of the "higher layer of abstraction" or "super-structures" it would be appreciated. Currently, I do not "share" the MessageBroker for each back test, this is why I want to properly dispose of it. Thanks for your time.Gyratory
As an illustration of "super-structures", take a look on a picture with messaging nodes in >>> https://mcmap.net/q/742486/-python-multithreaded-zeromq-req-rep. Such composite elements can realise messaging policies over several disjunct control-planes -- one transport ( your SUT / DUT ), next "above" as a test-control-signalling, next "above" as a performance-control for monitoring/logging et al. Keeping intentionally separate the transport Context() instance from other signalling services' Context() instance(s) helps you keep both key principles -- Principle of Isolation + Principle of RepeatabilityIntermezzo
N.b.: I believe it'd serve better this Community of Knowledge to reduce censoring extremisms - @halfer - you deserve my salute for a persistence, with which you edit many posts, to remove grammar errors ( AI/ML could get into shape to perform such role in near future, so as to make you more time for some indeed creative content, not just deleting repetitive patterns of ( many, I admit ) unintentional errors ). But why do you still delete pieces of intended content? (cit) "... remove repeated material about Nobel Prize winner" 1) It had meaning 2) Nobel prize is awarded, not wonIntermezzo

© 2022 - 2024 — McMap. All rights reserved.