In the following code, I am using the syntactic sugar provided by .net, the async/await approach, but read that this is not a good way of handling asynchronous operations within akka, and I should rather use PipeTo().
public class AggregatorActor : ActorBase, IWithUnboundedStash
{
#region Constructor
public AggregatorActor(IActorSystemSettings settings, IAccountComponent component, LogSettings logSettings) : base(settings, logSettings)
{
_accountComponent = component;
_settings = settings;
}
#endregion
#region Public Methods
public override void Listening()
{
ReceiveAsync<ProfilerMessages.ProfilerBase>(async x => await HandleMessage(x));
ReceiveAsync<ProfilerMessages.TimeElasped>(async x => await HandleMessage(x));
}
public override async Task HandleMessage(object msg)
{
msg.Match().With<ProfilerMessages.GetSummary>(async x =>
{
_sender = Context.Sender;
//Become busy. Stash
Become(Busy);
//Handle different request
await HandleSummaryRequest(x.UserId, x.CasinoId, x.GamingServerId, x.AccountNumber, x.GroupName);
});
msg.Match().With<ProfilerMessages.RecurringCheck>(x =>
{
_logger.Info("Recurring Message");
if (IsAllResponsesReceived())
{
BeginAggregate();
}
});
msg.Match().With<ProfilerMessages.TimeElasped>(x =>
{
_logger.Info("Time Elapsed");
BeginAggregate();
});
}
private async Task HandleSummaryRequest(int userId, int casinoId, int gsid, string accountNumber, string groupName)
{
try
{
var accountMsg = new AccountMessages.GetAggregatedData(userId, accountNumber, casinoId, gsid);
//AskPattern.AskAsync<AccountMessages.AccountResponseAll>(Context.Self, _accountActor, accountMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_accountActor.Tell(accountMsg);
var contactMsg = new ContactMessages.GetAggregatedContactDetails(userId);
//AskPattern.AskAsync<Messages.ContactMessages.ContactResponse>(Context.Self, _contactActor, contactMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_contactActor.Tell(contactMsg);
var analyticMsg = new AnalyticsMessages.GetAggregatedAnalytics(userId, casinoId, gsid);
//AskPattern.AskAsync<Messages.AnalyticsMessages.AnalyticsResponse>(Context.Self, _analyticsActor, analyticMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_analyticsActor.Tell(analyticMsg);
var financialMsg = new FinancialMessages.GetAggregatedFinancialDetails(userId.ToString());
//AskPattern.AskAsync<Messages.FinancialMessages.FinancialResponse>(Context.Self, _financialActor, financialMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_financialActor.Tell(financialMsg);
var verificationMsg = VerificationMessages.GetAggregatedVerification.Instance(groupName, casinoId.ToString(), userId.ToString(), gsid);
_verificationActor.Tell(verificationMsg);
var riskMessage = RiskMessages.GeAggregatedRiskDetails.Instance(userId, accountNumber, groupName, casinoId, gsid);
_riskActor.Tell(riskMessage);
_cancelable = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromMilliseconds(_settings.AggregatorTimeOut), Self, Messages.ProfilerMessages.TimeElasped.Instance(), Self);
_cancelRecurring = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.RecurringResponseCheck, _settings.RecurringResponseCheck, Self, Messages.ProfilerMessages.RecurringCheck.Instance(), Self);
}
catch (Exception ex)
{
ExceptionHandler(ex);
}
}
#endregion
}
As you can see in the example code, I am making use of async/await, and using the ReceiveAsync() method procided by Akka.net.
What is the purpose of ReceiveAsync(), if we cannot use async/await within an actor?
ReceiveAsync(...)
method is designed to wait for the async method to complete. If you want to run two tasks simultaneously, you will have to create an async method for them, and useawait Task.WhenAll(...)
, and then call that async method withReceiveAsync(...)
. – Schram