Scheduling asynchronous jobs/tasks (and ignoring exceptions) using Observable.Timer
Asked Answered
R

1

0

I have several asynchronous tasks/jobs that I need to run on a schedule and it seems that I could do this nicely using Observables. When a job fetches the data, an exception could occur (eg 404), and when the resultant data is processed, an error could also occur.

I have seen this answer by Enigmativity which seems like the perfect solution to wrap the IObservable<> so that if an error occurs (when I fetch the data) I can trap it and continue (ultimately skipping the processing for that particular fetch).

I understand that when an Observable errors it is meant to terminate, but given the answer I mentioned above, it seems that there are ways around this, which would make for a decent job scheduling system. Alternative approaches are welcome, but I would like to understand how to do this with Observables.

I would also like to provide some feedback/logging about the state of the job.


Currently, I have the below method, which won't compile!

job is the object that contains information about the job (eg a list of job runs and their outcomes/success/failure, run frequency, status, errors, boolean flag indicating if the job should proceed, etc)

interval(job) returns the frequency in milliseconds that the job should run at

runSelect(job) is a boolean method that signals if a job should proceed (I think this would be better replaced with an observable? And of course there is the option of using a CancellationToken, but again I'm not sure how to integrate that!)

select(job) is the method that fetches the data

subscribe(job) is the method that processes the data

    public static IDisposable BuildObservable<TJob, TSelect>(TJob job, Func<TJob, int> interval, Func<TJob, bool> runSelect, Func<TJob, Task<TSelect>> select,
        Func<TSelect, Task> subscribe)
        where TJob : Job
        where TSelect : class
    {
        return Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(interval(job)))
            .SelectMany(x => Observable.FromAsync(async () =>
            {
                JobRunDetail jobRunDetail = job.StartNewRun();

                if (runSelect(job))
                {
                    jobRunDetail.SetRunningSelect();
                    return new { Result = await select(job), JobRunDetail = jobRunDetail };
                }
                else
                {
                    jobRunDetail.SetAbandonedSelect();
                    return new { Result = (TSelect)null!, JobRunDetail = jobRunDetail };
                }
            }).ToExceptional())
            .Subscribe(async (resultAndJobRunDetail) =>
            {

                //none of the resultAndJobRunDetail.Value.JobDetail or resultAndJobRunDetail.Value.Result statements will compile 

                resultAndJobRunDetail.Value.JobRunDetail.SetRunningSubscribe();
                try
                {
                    
                    if (resultAndJobRunDetail.Value.Result!= null)
                        await subscribe(resultAndJobRunDetail.Value.Result);

                    resultAndJobRunDetail.Value.JobRunDetail.SetCompleted();
                }
                catch (Exception ee)
                {
                    resultAndJobRunDetail.Value.JobRunDetail.SetErrorSubscribe(ee);
                }
            });
    }

As noted, none of the resultAndJobRunDetail.Value.JobDetail or resultAndJobRunDetail.Value.Result statements will compile because resultAndJobRunDetail.Value is still an Observable<>, but when I remove the .ToExceptional() call, the value returned is no longer an Observable. Clearly I'm missing something.

I have seen different answers on SO that use Do() rather than Subscribe() so I'm not sure which is appropriate. I have also seen answers that suggest using Retry() or one of the "observable error handling methods" but I'm not sure how these would work if I just want my job to keep repeating ad infinitum?

Ultimately, I'm still learning how the whole Observable infrastructure fits together, so I could well be completely off track!

It's worth nothing that searching Google for "Schedule Job using Observable" it pretty fruitless as Observables use schedulers!

Renault answered 23/11, 2022 at 23:2 Comment(0)
D
0

I'm not sure if this helps or not, but your .ToExceptional() call was in the wrong place:

public static IDisposable BuildObservable<TJob, TSelect>(TJob job, Func<TJob, int> interval, Func<TJob, bool> runSelect, Func<TJob, Task<TSelect>> select,
        Func<TSelect, Task> subscribe)
        where TJob : Job
        where TSelect : class
{
    return Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(interval(job)))
        .SelectMany(x => Observable.FromAsync(async () =>
        {
            JobRunDetail jobRunDetail = job.StartNewRun();

            if (runSelect(job))
            {
                jobRunDetail.SetRunningSelect();
                return new { Result = await select(job), JobRunDetail = jobRunDetail }.ToExceptional();
            }
            else
            {
                jobRunDetail.SetAbandonedSelect();
                return new { Result = (TSelect)null!, JobRunDetail = jobRunDetail }.ToExceptional();
            }
        }))
        .Subscribe(async (resultAndJobRunDetail) =>
        {

            //none of the resultAndJobRunDetail.Value.JobDetail or resultAndJobRunDetail.Value.Result statements will compile 

            resultAndJobRunDetail.Value.JobRunDetail.SetRunningSubscribe();
            try
            {

                if (resultAndJobRunDetail.Value.Result != null)
                    await subscribe(resultAndJobRunDetail.Value.Result);

                resultAndJobRunDetail.Value.JobRunDetail.SetCompleted();
            }
            catch (Exception ee)
            {
                resultAndJobRunDetail.Value.JobRunDetail.SetErrorSubscribe(ee);
            }
        });
}
Deoxyribonuclease answered 29/11, 2022 at 5:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.