Async API call inside an actor and exceptions
Asked Answered
U

2

9

I know about PipeTo, but some stuff, like synchronous waiting on nested continuation, seems to go against the async & await way.

So, my first question [1] would be: is there any 'magic' here, so that we can just synchronously wait for nested tasks in a continuation and it's still async in the end?

While we're at async & await differences, how are failures handled?

Let's create a simple example:

public static class AsyncOperations
{
    public async static Task<int> CalculateAnswerAsync()
    {
        await Task.Delay(1000).ConfigureAwait(false);
        throw new InvalidOperationException("Testing!");
        //return 42;
    }

    public async static Task<string> ConvertAsync(int number)
    {
        await Task.Delay(600).ConfigureAwait(false);
        return number + " :)";
    }
}

In a 'regular', async & await way:

var answer = await AsyncOperations.CalculateAnswerAsync();
var converted = await AsyncOperations.ConvertAsync(answer);

the exception will bubble up from the first operation, just as you'd expect.

Now, let's create an actor that's going to work with those async operations. For the sake of an argument, let's say that CalculateAnswerAsync and ConvertAsync should be used one after another as one, full operation (similar to, for example, StreamWriter.WriteLineAsync and StreamWriter.FlushAsync if you just want to write one line to a stream).

public sealed class AsyncTestActor : ReceiveActor
{
    public sealed class Start
    {
    }

    public sealed class OperationResult
    {
        private readonly string message;

        public OperationResult(string message)
        {
            this.message = message;
        }

        public string Message
        {
            get { return message; }
        }
    }

    public AsyncTestActor()
    {
        Receive<Start>(msg =>
               {
                   AsyncOperations.CalculateAnswerAsync()
                     .ContinueWith(result =>
                            {
                                var number = result.Result;
                                var conversionTask = AsyncOperations.ConvertAsync(number);
                                conversionTask.Wait(1500);
                                return new OperationResult(conversionTask.Result);
                            })
                     .PipeTo(Self);
                });
        Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message));
    }
}

If there are no exceptions, I still get Got 42 :) without any issues, which brings me back to 'magic' point above [1]. Also, are the AttachedToParent and ExecuteSynchronously flags provided in an example optional, or are they pretty much required to have everything working as intended? They don't seem to have any effect on exception handling...

Now, if the CalculateAnswerAsync throws an exception, which means that result.Result throws AggregateException, it's pretty much swallowed without a trace.

What should I do here, if it's even possible, to make the exception inside an asynchronous operation crash the actor as a 'regular' exception would?

Unbent answered 16/2, 2015 at 21:2 Comment(0)
A
10

The joys of error-handling in the TPL :)

Once a Task starts running on its own thread, everything that happens inside it is already asynchronous from the caller - including error-handling

  1. When you kick off your first Task inside of an actor, that task runs independently on the ThreadPool from your actor. This means that anything you do inside that Task will already be asynchronous from your actor - because it's running on a different thread. This is why I made a Task.Wait call inside the PipeTo sample you linked to at the top of your post. Makes no difference to the actor - it just looks like a long-running task.
  2. Exceptions - if your inner task failed, the conversionTask.Result property will throw the exception captured during its run, so you'll want to add some error-handling inside your Task to ensure that your actor gets notified that something went wrong. Notice I did just that here: https://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117 - if you turn your Exceptions into messages your actor can handle: birds start singing, rainbows shine, and TPL errors stop being a source of pain and agony.
  3. As for what happens when an exception gets thrown...

Now, if the CalculateAnswerAsync throws an exception, which means that result.Result throws AggregateException, it's pretty much swallowed without a trace.

The AggregateException will contain the list of inner exceptions wrapped inside of it - the reason the TPL has this concept of aggregate errors is in the event that (a) you have one task that is the continuation of multiple tasks in aggregate, i.e. Task.WhenAll or (b) you have errors propagated up the ContinueWith chain back to the parent. You can also call the AggregateException.Flatten() call to make it a little easier to manage nested exceptions.

Best Practices for TPL + Akka.NET

Dealing with Exceptions from the TPL is a nuisance, that's true - but the best way to deal with it is to try..catch.. exceptions inside your Task and turn them into message classes your actor can handle.

Also, are the AttachedToParent and ExecuteSynchronously flags provided in an example optional, or are they pretty much required to have everything working as intended?

This is mostly an issue for when you have continuations on continuations - PipeTo automatically uses these flags on itself. It has zero impact on error handling, but ensures that your continuations are executed immediately on the same thread as the original Task.

I recommend using these flags only when you're doing a lot of nested continuations - the TPL starts to take some liberties with how it schedules your tasks once you go deeper than 1 continuation (and in fact, flags like OnlyOnCompleted stop being accepted after more than 1 continuation.)

Amaryllis answered 16/2, 2015 at 22:8 Comment(6)
Come to think of it... Turning exceptions into messages would make for a good blog post...Amaryllis
Thanks! Well, it really seems that having a message that just wraps an exception (e.g. ProcessingFailure) that's created and piped back to the same actor which then just re-throws the exception in the message handler is the easiest way to propagate said exception up the chain.Brilliancy
@Amaryllis So basically you are saying where possible avoid async as child calls in actors, especially multiple layers of async?Decelerate
You said it's running on a different thread. Async calls are not executed on different threads as described in There Is No Thread. But multiple tasks still run at the same time.Inconceivable
@NoelWidmer I think you misunderstood Stephen's blog post. An async I/O call uses an I/O completion port, true, therefore a thread is not doing work while waiting for the IO to complete. However, when the I/O operation completes the continuation / antecedent tasks have to be executed somewhere, ergo a thread. The difference between await and PipeTo in an Akka.NET actor is "which thread?" In await, the actor uses its own execution thread to process the continuation. In PipeTo, it executes on whichever ThreadPool thread it gets scheduled per the TPL.Amaryllis
@Amaryllis I am not yet that familiar with Akka.NET so what you are saying is probably true. Although I'd still say that plain C# async/await operations do not spawn new threads. Can we agree on that?Inconceivable
C
7

Just to add to what Aaron said. As of yesterday, we do support safe async await inside actors when using the Task dispatcher.

public class AsyncAwaitActor : ReceiveActor
{
    public AsyncAwaitActor()
    {
        Receive<string>(async m =>
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
            Sender.Tell("done");
        });
    }
}

public class AskerActor : ReceiveActor
{
    public AskerActor(ActorRef other)
    {
        Receive<string>(async m =>
        {
            var res = await other.Ask(m);
            Sender.Tell(res);
        });
    }
}

public class ActorAsyncAwaitSpec : AkkaSpec
{
    [Fact]
    public async Task Actors_should_be_able_to_async_await_ask_message_loop()
    {
        var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>()
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Worker");
        //IMPORTANT: you must use the akka.actor.task-dispatcher
        //otherwise async await is not safe

        var asker = Sys.ActorOf(Props.Create(() => new AskerActor(actor))
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Asker");

        var res = await asker.Ask("something");
        Assert.Equal("done", res);
    }
}

This is not our default dispatcher since it does come with a price in performance/throughput. There is also a risk of deadlocks if you trigger tasks that block(e.g. using task.Wait() or task.Result) So the PipeTo pattern is still the preferred approach since it is more true to the actor model. But the async await support is there for you as an extra tool if you really need to do some TPL integration.

This feature actually uses PipeTo under the covers. It will take every task continuation and wrap that up in a special message and pass that message back to the actor and execute that task inside the actors own concurrency context.

Cardigan answered 17/2, 2015 at 5:52 Comment(9)
Ohh, that's great! Even though piping message 'to self' might be more true to actor model, when you have - for example - 3 step operation where each step is async, it starts getting a bit... Unwieldy, in my opinion; and you have to create custom messages just for the sake of wrapping async API, which is pretty much all IO and database access right now. You mentioned the performance/throughtput price, what is it exactly?Brilliancy
On top of that, does the task dispatcher handle all exceptions same as regular async & await? Because if it automatically 'propagates' the exception up so the actor can crash, that would take out a lot of manual message wrapping and sending on any kind of asynchronous failure...Brilliancy
There is a fair amount of overhead to setting the async state. in our ping-pong benchmark, the throughput is pretty much cut in half.. which is a lot, but then again in a real world scenario, it will most likely not be noticeable since real actors will do real work, which is greater than the overhead of just passing messages.Cardigan
if the continuation crash, that will be picked up by the supervisor since the continuation is actually executed inside the actors message process, so the normal supervisor handling will kick in..Cardigan
Well yes, naturally I'd pretty much use it only for actors operating with async IO, so any kind of operation like that will most probably dwarf message passing overhead. Automatic supervision/exception propagation seems like a really sweet deal here.Brilliancy
Do note that if you want to try it out, this feature is only available in our dev branch at github yet. not in any nuget package.Cardigan
Noted. I'll just wait for the next 'official' release, I'm not in a hurry. Generally, it would be nice to have a more extensive example in documentation how to handle multi-step asynchronous operations with failure handling - at least showing what the 'recommended way' is. :)Brilliancy
@RogerAlsing do you know when this feature will be available via NuGet?Cloven
@NedStoyanov it has been available since march I think, it was part of the 1.0 release.Cardigan

© 2022 - 2024 — McMap. All rights reserved.