Creating a message bus with TPL Dataflow
Asked Answered
A

1

5

I was looking for a lightweight, in process, async message bus and came across TPL Dataflow.

My current implementation is below (full example at https://gist.github.com/4416655).

public class Bus
{
    private readonly BroadcastBlock<object> broadcast =
        new BroadcastBlock<object>(message => message);

    private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
        = new ConcurrentDictionary<Guid, IDisposable>();

    public Task SendAsync<TMessage>(TMessage message)
    {
        return SendAsync<TMessage>(message, CancellationToken.None);
    }

    public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
    {
        return broadcast.SendAsync(message, cancellationToken);
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));

        var subscription = broadcast.LinkTo(handler, 
            new DataflowLinkOptions { PropagateCompletion = true }, 
            message => message is TMessage);

        return AddSubscription(subscription);
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        IDisposable subscription;
        if (subscriptions.TryRemove(subscriptionId, out subscription))
        {
            subscription.Dispose();
        }
    }

    private Guid AddSubscription(IDisposable subscription)
    {
        var subscriptionId = Guid.NewGuid();
        subscriptions.TryAdd(subscriptionId, subscription);
        return subscriptionId;
    }
}

I have a few general questions about using TPL Dataflow in a messaging scenario.

  • Is BroadcastBlock<T> the recommended source for sending messages to multiple handlers concurrently? This was the conclusion I came to based on this post.
  • In my implementation I'm using a single BroadcastBlock<T> instance for all message types. Could this cause problems when handling large numbers of messages? Should I create a separate instance per message type?
  • BroadcastBlock<T> always stores the last item that was sent. This means that any new subscriptions (links) will automatically be passed this message. Is is possible to change this behaviour (new subscriptions should only receive new messages).
  • In my test application, I introduced a delay in the first handler:

        // Subscribe to Message type
        var subscription1 = bus.Subscribe<Message>(async m => { 
            await Task.Delay(2000);
            Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
        });
    

    When sending a message I expected to see each message output to the console one by one, with 2s increments. Instead, after 2s all the messages were output at once. I'm assuming this is due to the parallelism performed by the underlying scheduler but am curious how I can alter these settings (setting MaxDegreeOfParallelism = 1 made no difference).

  • Finally, whilst SendAsync allows me to await the sending of a message, it doesn't allow me to await on the completion of the target (the ActionBlock<T>). I thought that this is what PropagateCompletion would do but that does not seem to be the case. Ideally I'd like to know when all handlers for the message have executed.

Update

The reason I was not getting the expected behaviour with Task.Delay is that this was delaying the execution of each handler, not the processing of all handlers. Thread.Sleep was what I needed.

Apps answered 31/12, 2012 at 1:49 Comment(0)
C
7

After answering your questions (see below), I realized modelling your design with TPL Dataflow blocks is probably not a good idea. TDF is good for processing of messages by largely independent blocks with no built-in way of tracking a single message. But that's what you seem to want: process the message by handlers in sequence with tracking of completion of each message.

Because of that, I think you shouldn't create a whole dataflow network, but instead use a single ActionBlock as an asynchronous message processor:

public class Bus
{
    class Subscription
    {
        public Guid Id { get; private set; }
        public Func<object, Task> HandlerAction { get; private set; }

        public Subscription(Guid id, Func<object, Task> handlerAction)
        {
            Id = id;
            HandlerAction = handlerAction;
        }
    }

    private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
    private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();

    private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;

    public Bus()
    {
        // subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
        var subscriptions = new List<Subscription>();

        m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
            async tuple =>
            {
                var message = tuple.Item1;
                var completedAction = tuple.Item2;

                // could be made more efficient, probably doesn't matter
                Guid idToUnsubscribe;
                while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
                {
                    subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
                }

                Subscription handlerToSubscribe;
                while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
                {
                    subscriptions.Add(handlerToSubscribe);
                }

                foreach (var subscription in subscriptions)
                {
                    await subscription.HandlerAction(message);
                }

                completedAction();
            });
    }

    public Task SendAsync<TMessage>(TMessage message)
    {
        var tcs = new TaskCompletionSource<bool>();
        Action completedAction = () => tcs.SetResult(true);

        m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));

        return tcs.Task;
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        return Subscribe<TMessage>(
            message =>
            {
                handlerAction(message);
                // we need a completed non-generic Task; this is a simple, efficient way to get it
                // another option would be to use async lambda with no await,
                // but that's less efficient and produces a warning
                return Task.FromResult(false);
            });
    }

    public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
    {
        Func<object, Task> actionWithCheck = async message =>
        {
            if (message is TMessage)
                await handlerAction((TMessage)message);
        };

        var id = Guid.NewGuid();
        m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
        return id;
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        m_idsToUnsubscribe.Enqueue(subscriptionId);
    }
}

(I decided to use queues for subscribing and unsubscribing, so that the list of handlers didn't change while a message was being procesed.)

Answers to your questions

Is BroadcastBlock<T> the recommended source for sending messages to multiple handlers concurrently?

Yeah, at first sight, it sounds like BroadcastBlock<T> is what you want. There certainly isn't any similar block directly in TPL Dataflow.

In my implementation I'm using a single BroadcastBlock instance for all message types. Could this cause problems when handling large numbers of messages? Should I create a separate instance per message type?

With a single block for all message types, you do more work (sending to all handlers) on a single thread. With one block for each message type, you would do less work (sending only to the correct handlers) which can be executed on multiple threads. Because of that, I think it's reasonable to assume that the latter will be faster.

But don't forget the rules of performance optimization of applications: First, write code that is simple and readable. Only if it turns out that it is actually slow, try to optimize it. And when comparing two alternatives, always use profiling to figure out which one is actually faster, don't just guess which one should be faster.

BroadcastBlock<T> always stores the last item that was sent. This means that any new subscriptions (links) will automatically be passed this message. Is is possible to change this behaviour (new subscriptions should only receive new messages)?

No, there is no way to configure BroadcastBlock<T> to do that. If you don't need all features of BroadcastBlock<T> (sending to blocks with bounded capacity, that might be temporarily full, supporting non-greedy blocks as targets), you might want to write a custom version of BroadcastBlock<T> to do this.

When sending a message I expected to see each message output to the console one by one, with 2s increments. Instead, after 2s all the messages were output at once. I'm assuming this is due to the parallelism performed by the underlying scheduler but am curious how I can alter these settings (setting MaxDegreeOfParallelism = 1 made no difference).

One of the points of TDF is that each block is independent, so multiple blocks can execute on multiple threads. If that's not what you want, then maybe using separate ActionBlock<T> for each handler might not be the best solution. In fact, TDF might not be the best solution at all.

Also, Subscribe() accepts Action<TMessage>, which means your lambda will be compiled as an async void method. Those should be used only in specific (and relatively rare) cases, where you don't have other options. If you want to support async handlers, you should accept async Task methods, i.e. Func<TMessage, Task>.

The reason I was not getting the expected behaviour with Task.Delay is that this was delaying the execution of each handler, not the processing of all handlers. Thread.Sleep was what I needed.

Using Thread.Sleep() goes against the whole idea of async, you shouldn't use it if possible. Also, I don't think it actually worked the way you wanted: it introduced a delay to each thread, but TPL Dataflow will use more than one thread, so this won't behave as you intended.

Finally, whilst SendAsync allows me to await the sending of a message, it doesn't allow me to await on the completion of the target (the ActionBlock<T>). I thought that this is what PropagateCompletion would do but that does not seem to be the case. Ideally I'd like to know when all handlers for the message have executed.

PropagateCompletion, together with Complete() and Completion is for handing completion of whole blocks, not processing of a single message. One reason for that is with more complicated dataflow networks, it might not be clear when exactly is a message processed. For example, if a message was already sent to all current targets of a BroadcastBlock<T>, but will be also sent to all newly added targets, should it be considered complete?

If you want to do this, you will have to somehow do it manually, possibly by using TaskCompletionSource.

Chairborne answered 31/12, 2012 at 16:44 Comment(5)
Great solution. I particularly like the idea behind the subscribe/unsubscribe queues. One question, how would you extend this to support passing a cancellation token to halt the execution of the handlers?Apps
If you want to cancel processing of a single message, I would add cancellationToken and canceledAction to the Tuple (which means a custom class would be probably better than a Tuple at that point). You would set those in SendAsync() and use them in the subscritpions loop. Ideally, handlerAction should also accept the token (at least optionally).Chairborne
Thanks. I'm curious, why not use a TransformBlock that returns a Task. We could then just return Task.WhenAll(subscriptions) rather than using TaskCompletionSource.Apps
I'm not sure I understand. TransformBlock still won't directly return the output for some input, you would need to somehow receive its results. You certainly could do what you want using TransformBlock, but I think it would be actually more complicated then using ActionBlock and TaskCompletionSource.Chairborne
Okay, thanks again. FYI, I've pushed this code up to GitHub. github.com/benfoster/Fabrik.SimpleBusApps

© 2022 - 2024 — McMap. All rights reserved.