Force a Task to continue on the current thread?
Asked Answered
V

2

10

I'm making a port of the AKKA framework for .NET (don't take this too serious now, it is a weekend hack of the Actor part of it right now)

I'm having some problems with the "Future" support in it. In Java/Scala Akka, Futures are to be awaited synchronously with an Await call. Much like the .NET Task.Wait()

My goal is to support true async await for this. It works right now, but the continuation is executed on the wrong thread in my current solution.

This is the result when passing a message to one of my actors that contain an await block for a future. As you can see, the actor always executes on the same thread, while the await block executes on a random threadpool thread.

actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...

The actor gets a message using a DataFlow BufferBlock<Message> Or rather, I use RX over the bufferblock to subscribe to messages. It is configured like this:

var messages = new BufferBlock<Message>()
{
        BoundedCapacity = 100,
        TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);

So far so good.

However, when I await on a future result. like so:

protected override void OnReceive(IMessage message)
{
    ....

    var result = await Ask(logger, m);
    // This is not executed on the same thread as the above code
    result.Match()  
       .With<SomeMessage>(t => {
       Console.WriteLine("await thread {0}",
          System.Threading.Thread.CurrentThread.GetHashCode());
        })
       .Default(_ => Console.WriteLine("Unknown message"));
     ...

I know this is normal behavior of async await, but I really must ensure that only one thread has access to my actor.

I don't want the future to run synchronously, I want to to run async just like normal, but I want the continuation to run on the same thread as the message processor/actor does.

My code for the future support looks like this:

public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
    TaskCompletionSource<IMessage> result = 
        new TaskCompletionSource<IMessage>();
    var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());

    // once this object gets a response, 
    // we set the result for the task completion source
    var futureActorRef = new FutureActorRef(result);            
    future.Tell(new SetRespondTo(), futureActorRef); 
    actor.Tell(message, future); 
    return result.Task;
}

Any ideas what I can do to force the continuation to run on the same thread that started the above code?

Vest answered 1/1, 2014 at 11:28 Comment(18)
OT: if anyone is interested in helping with this, or just play around, the code is located here github.com/rogeralsing/PigeonVest
Something on that thread has to cooperate. You can't hijack a thread. The thread must somehow call your continuation. Maybe use a custom SynchronizationContext.Eviaevict
Can't the continuation just be scheduled for the active thread somehow?Vest
Imagine a thread doing this: while(true);. How could other code possibly execute on that thread? It is impossible to interrupt code (at arbitrary locations). If your thread looked like this: while(true) ExecuteNextItemForSyncContext(); then someone could submit an item and inject code. Look into how sync contexts work.Eviaevict
I'm pretty sure the case is the latter here, since BufferBlock uses normal thread pool threads, so it must be scheduling messages to a specific thread I guessVest
Threadpool threads do not take specific work items, they take unspecified items from the queue. You won't be able to target any specific thread. You can use a custom TaskScheduler, though, that uses threads you control. This gets tricky now. Maybe it is easier to remove the requirement to end up on the same thread?!Eviaevict
That would be too bad, if an actor are processing a message and at the same time another thread handling an await continuation for the same actor, there will be concurrency issues which the actor is supposed to prevent.Vest
I solved it :) Actors FTW, once the result is received, I wrap the completion source in an Action and pass that as a special message to the owning actor, this way, the SetResult is called from within the Actor itselfVest
I believe this only works by accident. SetResult does not guarantee synchronous execution of registered callbacks. They might randomly end up on the thread pool.Eviaevict
It doesn't matter since SetResult is blocking, it uses a SpinWait untill the task is completed, thus, the actor that receives the message with the Action, is blocked for a short short while when the task completes, and thus, no internal concurrency issues can arise, It does however look like it always do execute on the same thread from the tests I've done right now. not 100% sure though, but as I said, since SetResult is blocking, it doesn't matterVest
What I was trying to say is that SetResult is not guaranteed to block. If you don't find that statement in the documentation, it is not guaranteed and you can't rely on it. You also cannot test for that.Eviaevict
The code inside SetResult throws an exception if the task doesn't complete correctly during the spin wait, since it is throwing on fail, it would be a breaking change if they ever change it to not blockVest
I found this documented on MSDN: msdn.microsoft.com/en-us/library/… "ExecuteSynchronously Specifies that the continuation task should be executed synchronously. With this option specified, the continuation will be run on the same thread that causes the antecedent task to transition into its final state. If the antecedent is already complete when the continuation is created, the continuation will run on the thread creating the continuation. Only very short-running continuations should be executed synchronously". It is reliable.Eviaevict
@usr: Unfortunately, that's not always the case.Pancreas
@StephenCleary good to know! Concurrency is such a sensitive topic. This inaccurate documentation is disappointing.Eviaevict
Anyway, TrySetResult has to be considered reliable since it returns a bool for success/failure, it can't return true unless it actually blocks until the task is completed..Vest
"wrong thread": why do you consider one thread is better than the other? do you also think that a hardware processor can be better than others?Anticipatory
This thread is 6 years old.....Vest
P
6

I'm making a port of the AKKA framework for .NET

Sweet. I went to an Akka talk at CodeMash '13 despite having never touched Java/Scala/Akka. I saw a lot of potential there for a .NET library/framework. Microsoft is working on something similar, which I hope will eventually be made generally available (it's currently in a limited preview).

I suspect that staying in the Dataflow/Rx world as much as possible is the easier approach; async is best when you have asynchronous operations (with a single start and single result for each operation), while Dataflow and Rx work better with streams and subscriptions (with a single start and multiple results). So my first gut reaction is to either link the buffer block to an ActionBlock with a specific scheduler, or use ObserveOn to move the Rx notifications to a specific scheduler, instead of trying to do it on the async side. Of course I'm not really familiar with the Akka API design, so take that with a grain of salt.

Anyway, my async intro describes the only two reliable options for scheduling await continuations: SynchronizationContext.Current and TaskScheduler.Current. If your Akka port is more of a framework (where your code does the hosting, and end-user code is always executed by your code), then a SynchronizationContext may make sense. If your port is more of a library (where end-user code does the hosting and calls your code as necessary), then a TaskScheduler would make more sense.

There aren't many examples of a custom SynchronizationContext, because that's pretty rare. I do have an AsyncContextThread type in my AsyncEx library which defines both a SynchronizationContext and a TaskScheduler for that thread. There are several examples of custom TaskSchedulers, such as the Parallel Extensions Extras which has an STA scheduler and a "current thread" scheduler.

Pancreas answered 1/1, 2014 at 15:55 Comment(1)
As a note that Microsoft is working in public on ActorFx library. I find it somewhat unclear if Orleans and ActorFx should be merged in the future or what's the deal. There has been also some discussion on the F# users group regarding Akka port to F#, and pointers to some implementation code. I guess the main points are in Fakka - F# Akka and the role it could play for f# broader appeal.Acreinch
M
1

Task scheduler decides whether to run a task on a new thread or on the current thread. There is an option to force running it on a new thread, but none forcing it to run on the current thread. But there is a method Task.RunSynchronously() which Runs the Task synchronously on the current TaskScheduler. Also if you are using async/await there is already a similar question on that.

Marxmarxian answered 1/1, 2014 at 12:12 Comment(1)
But this question isn't about running a Task, it's about where execution continues after await.Not

© 2022 - 2024 — McMap. All rights reserved.