TPL Dataflow and Rx Combined example [closed]
Asked Answered
M

1

14

I just want to learn both and how to use them together. I understand that they can complement each other I just could not find an example of someone actually doing it.

Moneylender answered 8/7, 2012 at 14:41 Comment(11)
SO is best for specific questions how to do something. “Give me an example of X” doesn't fit that very well.Uplift
Ok, you are right about the form of the question but i think the issue is still valid. Maybe "How to use them in combination to effectively exploit the features from both?" is more appropriate.Moneylender
@Moneylender Would it be sufficient to look at a combined C#-async + Rx example?Transfigure
@Transfigure Actually I want to use f# but async is not related to TPL Dataflow blocks at least not explicitly.Moneylender
@Moneylender Would it then be OK for me to add an F# tag onto your question?Transfigure
@Moneylender I can see how quite a few SO users would vote to have this question closed. Is there a specific problem that you need to solve that involves both technologies?Transfigure
@Transfigure It's not an f# specific problem. And no, there is not any specific problem I just want to combine them if possible so I can enjoy the features from both.Moneylender
What exactly do you expect from that example, apart from showing you that the methods AsObservable() and AsObserver() exist?Uplift
@Uplift The problems involving parallelism and asynchronity. What happens if i mix them together and stuff like that. I'm staring to think that the question is to trivial or to hard or pointless and I'm not getting something very basic :)Moneylender
I believe there is nothing special about combining the two in that regard. Just write your TDF code like you would if you were using TDF alone and write your Rx code like you would if you were using Rx alone.Uplift
This seems a very open question. Which version of .NET 3.5, 4, 4.5? Which version of Rx? What are you hoping to do? Continuations, streaming? Are you looking to leaverage Futures or Single value sequences, the TaskPool vs Schedulers...? The only valid answer would be pages long. Please be more specific.Woundwort
P
21

Let me start with a bit of background.

The .NET framework has a number of special types - either proper classes or interfaces - Task<T>, IObservable<T>, Nullable<T>, IEnumerable<T>, Lazy<T>, etc - that provide special powers to the underlying type T.

The TPL uses Task<T> to represent asynchronous computation of a single value of T.

Rx uses IObservable<T> to represent asynchronous computation of zero or more values of T.

It's the "asynchronous computation" aspect of both of these that bring TPL and Rx together.

Now, the TPL also uses the type Task to represent the asynchronous execution of an Action lambda, but this can be considered a special case of Task<T> where T is void. Very much like a standard method in c# returns void like so:

public void MyMethod() { }

Rx also allow for the same special case with use of a special type called Unit.

The difference between the TPL and Rx is in the number of values returned. TPL is one and only one whereas Rx is zero or more.

So, if you treat Rx in a special way by only working with observable sequences that return a single value you can do some computations in a similar way to the TPL.

For example, in the TPL I could write:

Task.Factory
    .StartNew(() => "Hello")
    .ContinueWith(t => Console.WriteLine(t.Result));

And in Rx the equivalent would be:

Observable
    .Start(() => "Hello")
    .Subscribe(x => Console.WriteLine(x));

I could go one step further in Rx by specifying that the TPL should be used to execute the computation like so:

Observable
    .Start(() => "Hello", Scheduler.TaskPool)
    .Subscribe(x => Console.WriteLine(x));

(By default the Thread Pool is used.)

Now I could do some "mixing and matching". If I add a reference to the System.Reactive.Threading.Tasks namespace I can move between tasks and observables quite easily.

Task.Factory
    .StartNew(() => "Hello")
    .ToObservable()
    .Subscribe(x => Console.WriteLine(x));

Observable
    .Start(() => "Hello")
    .ToTask()
    .ContinueWith(t => Console.WriteLine(t.Result));

Notice the ToObservable() & .ToTask() calls and the resulting flips from one library to the other.

If I have an observable that returns more than one value I can use the observable .ToArray() extension method to turn multiple sequence values into a single array value that can be turned into a task. Like so:

Observable
    .Interval(TimeSpan.FromSeconds(1.0))
    .Take(5) // is IObservable<long>
    .ToArray()
    .ToTask() // is Task<long[]>
    .ContinueWith(t => Console.WriteLine(t.Result.Length));

I think this is a fairly basic answer to your question. Is it what you were expecting?

Pelagia answered 10/7, 2012 at 12:56 Comment(6)
TPL Dataflow is a very different library from TPL, so I don't feel the answer does not precisely address the question. The discussion was, however, noteworthy, so +1.Transfigure
Too bad the .Net type system doesn't actually allow Task<void>.Uplift
Sorry but as GregC stated before i need an example involving TPL Dataflow not "just" TPL. What I want is to combine TPL Dataflow blocks and with Rx.Moneylender
My apologies - I did not know about DataFlow. Sorry I haven't answered your question.Pelagia
@Uplift - The lack of Task<void> is what IObservable<Unit> is meant to represent. Could you not use Task<Unit> for the same thing as Task<void>?Pelagia
@Pelagia Mostly, yeah, except it's not as nice. For example, you still have to explicitly return Unit.Default; at the end of each Unit-returning function.Uplift

© 2022 - 2024 — McMap. All rights reserved.