I have several asynchronous tasks/jobs that I need to run on a schedule and it seems that I could do this nicely using Observable
s. When a job fetches the data, an exception could occur (eg 404), and when the resultant data is processed, an error could also occur.
I have seen this answer by Enigmativity which seems like the perfect solution to wrap the IObservable<>
so that if an error occurs (when I fetch the data) I can trap it and continue (ultimately skipping the processing for that particular fetch).
I understand that when an Observable errors it is meant to terminate, but given the answer I mentioned above, it seems that there are ways around this, which would make for a decent job scheduling system. Alternative approaches are welcome, but I would like to understand how to do this with Observables.
I would also like to provide some feedback/logging about the state of the job.
Currently, I have the below method, which won't compile!
job
is the object that contains information about the job (eg a list of job runs and their outcomes/success/failure, run frequency, status, errors, boolean flag indicating if the job should proceed, etc)
interval(job)
returns the frequency in milliseconds that the job should run at
runSelect(job)
is a boolean method that signals if a job should proceed (I think this would be better replaced with an observable? And of course there is the option of using a CancellationToken, but again I'm not sure how to integrate that!)
select(job)
is the method that fetches the data
subscribe(job)
is the method that processes the data
public static IDisposable BuildObservable<TJob, TSelect>(TJob job, Func<TJob, int> interval, Func<TJob, bool> runSelect, Func<TJob, Task<TSelect>> select,
Func<TSelect, Task> subscribe)
where TJob : Job
where TSelect : class
{
return Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(interval(job)))
.SelectMany(x => Observable.FromAsync(async () =>
{
JobRunDetail jobRunDetail = job.StartNewRun();
if (runSelect(job))
{
jobRunDetail.SetRunningSelect();
return new { Result = await select(job), JobRunDetail = jobRunDetail };
}
else
{
jobRunDetail.SetAbandonedSelect();
return new { Result = (TSelect)null!, JobRunDetail = jobRunDetail };
}
}).ToExceptional())
.Subscribe(async (resultAndJobRunDetail) =>
{
//none of the resultAndJobRunDetail.Value.JobDetail or resultAndJobRunDetail.Value.Result statements will compile
resultAndJobRunDetail.Value.JobRunDetail.SetRunningSubscribe();
try
{
if (resultAndJobRunDetail.Value.Result!= null)
await subscribe(resultAndJobRunDetail.Value.Result);
resultAndJobRunDetail.Value.JobRunDetail.SetCompleted();
}
catch (Exception ee)
{
resultAndJobRunDetail.Value.JobRunDetail.SetErrorSubscribe(ee);
}
});
}
As noted, none of the resultAndJobRunDetail.Value.JobDetail or resultAndJobRunDetail.Value.Result statements will compile because resultAndJobRunDetail.Value is still an Observable<>
, but when I remove the .ToExceptional()
call, the value returned is no longer an Observable
.
Clearly I'm missing something.
I have seen different answers on SO that use Do()
rather than Subscribe()
so I'm not sure which is appropriate. I have also seen answers that suggest using Retry()
or one of the "observable error handling methods" but I'm not sure how these would work if I just want my job to keep repeating ad infinitum?
Ultimately, I'm still learning how the whole Observable infrastructure fits together, so I could well be completely off track!
It's worth nothing that searching Google for "Schedule Job using Observable" it pretty fruitless as Observables use schedulers!