Is it safe to use async/await inside akka.net actor
Asked Answered
D

1

6

In the following code, I am using the syntactic sugar provided by .net, the async/await approach, but read that this is not a good way of handling asynchronous operations within akka, and I should rather use PipeTo().

public class AggregatorActor : ActorBase, IWithUnboundedStash
{
    #region Constructor
    public AggregatorActor(IActorSystemSettings settings, IAccountComponent component, LogSettings logSettings) : base(settings, logSettings)
    {
        _accountComponent = component;
        _settings = settings;
    }
    #endregion

    #region Public Methods

    public override void Listening()
    {

        ReceiveAsync<ProfilerMessages.ProfilerBase>(async x => await HandleMessage(x));
        ReceiveAsync<ProfilerMessages.TimeElasped>(async x => await HandleMessage(x));
    }
    public override async Task HandleMessage(object msg)
    {
        msg.Match().With<ProfilerMessages.GetSummary>(async x =>
        {
            _sender = Context.Sender;
            //Become busy. Stash
            Become(Busy);

            //Handle different request
            await HandleSummaryRequest(x.UserId, x.CasinoId, x.GamingServerId, x.AccountNumber, x.GroupName);
        });
        msg.Match().With<ProfilerMessages.RecurringCheck>(x =>
        {
            _logger.Info("Recurring Message");
            if (IsAllResponsesReceived())
            {
                BeginAggregate();
            }
        });
        msg.Match().With<ProfilerMessages.TimeElasped>(x =>
        {
            _logger.Info("Time Elapsed");
            BeginAggregate();
        });
    }
    private async Task HandleSummaryRequest(int userId, int casinoId, int gsid, string accountNumber, string groupName)
    {
        try
        {
            var accountMsg = new AccountMessages.GetAggregatedData(userId, accountNumber, casinoId, gsid);
            //AskPattern.AskAsync<AccountMessages.AccountResponseAll>(Context.Self, _accountActor, accountMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
            _accountActor.Tell(accountMsg);

            var contactMsg = new ContactMessages.GetAggregatedContactDetails(userId);
            //AskPattern.AskAsync<Messages.ContactMessages.ContactResponse>(Context.Self, _contactActor, contactMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
            _contactActor.Tell(contactMsg);

            var analyticMsg = new AnalyticsMessages.GetAggregatedAnalytics(userId, casinoId, gsid);
            //AskPattern.AskAsync<Messages.AnalyticsMessages.AnalyticsResponse>(Context.Self, _analyticsActor, analyticMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
            _analyticsActor.Tell(analyticMsg);

            var financialMsg = new FinancialMessages.GetAggregatedFinancialDetails(userId.ToString());
            //AskPattern.AskAsync<Messages.FinancialMessages.FinancialResponse>(Context.Self, _financialActor, financialMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
            _financialActor.Tell(financialMsg);

            var verificationMsg = VerificationMessages.GetAggregatedVerification.Instance(groupName, casinoId.ToString(), userId.ToString(), gsid);
            _verificationActor.Tell(verificationMsg);

            var riskMessage = RiskMessages.GeAggregatedRiskDetails.Instance(userId, accountNumber, groupName, casinoId, gsid);
            _riskActor.Tell(riskMessage);

            _cancelable = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromMilliseconds(_settings.AggregatorTimeOut), Self, Messages.ProfilerMessages.TimeElasped.Instance(), Self);
            _cancelRecurring = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.RecurringResponseCheck, _settings.RecurringResponseCheck, Self, Messages.ProfilerMessages.RecurringCheck.Instance(), Self);
        }
        catch (Exception ex)
        {
            ExceptionHandler(ex);
        }
    }
    #endregion
}

As you can see in the example code, I am making use of async/await, and using the ReceiveAsync() method procided by Akka.net.

What is the purpose of ReceiveAsync(), if we cannot use async/await within an actor?

Dismiss answered 8/12, 2017 at 7:10 Comment(1)
I've never used this library, but the documentation seems to suggest the ReceiveAsync(...) method is designed to wait for the async method to complete. If you want to run two tasks simultaneously, you will have to create an async method for them, and use await Task.WhenAll(...), and then call that async method with ReceiveAsync(...).Schram
H
20

You can use async/await within an actor, however this requires a little bit of orchestration necessary to suspend/resume actor's mailbox until the the asynchronous task completes. This makes actor non-reentrant, which means that it will not pick any new messages, until the current task is finished. To make use of async/await within an actor you can either:

  1. Use ReceiveAsync which can take async handlers.
  2. Wrapping your async method call with ActorTaskScheduler.RunTask. This is usually useful in context of actor lifecycle methods (like PreStart/PostStop).

Keep in mind that this will work if a default actor message dispatcher is used, but it's not guaranteed to work, if an actor is configured to use different types of dispatchers.

Also there is a performance downside associated with using async/await inside actors, which is related to suspend/resume mechanics and lack of reentrancy of your actors. In many business cases it's not really a problem, but can sometimes be an issue in high-performance/low-latency workflows.

Hafner answered 8/12, 2017 at 9:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.