Below are two implementations of a PeriodicSequentialExecution
method, that creates an observable by executing an asynchronous method in a periodic fashion, enforcing a no-overlapping-execution policy. The interval between subsequent executions can be extended to prevent overlapping, in which case the period is time-shifted accordingly.
The first implementation is purely functional, while the second implementation is mostly imperative. Both implementations are functionally identical. The first one can be supplied with a custom IScheduler
. The second one may be slightly more efficient.
The functional implementation:
/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// action that is invoked periodically and sequentially (without overlapping).
/// </summary>
public static IObservable<T> PeriodicSequentialExecution<T>(
Func<CancellationToken, Task<T>> action,
TimeSpan dueTime, TimeSpan period,
CancellationToken cancellationToken = default,
IScheduler scheduler = null)
{
// Arguments validation omitted
scheduler ??= DefaultScheduler.Instance;
return Delay(dueTime) // Initial delay
.Concat(Observable.Using(() => CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken), linkedCTS =>
// Execution loop
Observable.Publish( // Start a hot delay timer before each operation
Delay(period), hotTimer => Observable
.StartAsync(() => action(linkedCTS.Token)) // Start the operation
.Concat(hotTimer) // Await the delay timer
)
.Repeat()
.Finally(() => linkedCTS.Cancel()) // Unsubscription: cancel the operation
));
IObservable<T> Delay(TimeSpan delay)
=> Observable
.Timer(delay, scheduler)
.IgnoreElements()
.Select(_ => default(T))
.TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
o.OnError(new OperationCanceledException(cancellationToken)))));
}
The imperative implementation:
public static IObservable<T> PeriodicSequentialExecution2<T>(
Func<CancellationToken, Task<T>> action,
TimeSpan dueTime, TimeSpan period,
CancellationToken cancellationToken = default)
{
// Arguments validation omitted
return Observable.Create<T>(async (observer, ct) =>
{
using (var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(
ct, cancellationToken))
{
try
{
await Task.Delay(dueTime, linkedCTS.Token);
while (true)
{
var delayTask = Task.Delay(period, linkedCTS.Token);
var result = await action(linkedCTS.Token);
observer.OnNext(result);
await delayTask;
}
}
catch (Exception ex) { observer.OnError(ex); }
}
});
}
The cancellationToken
parameter can be used for the graceful termination of the resulting observable sequence. This means that the sequence waits for the currently running operation to complete before terminating. If you prefer it to terminate instantaneously, potentially leaving work running unobserved in a fire-and-forget fashion, you can simply dispose the subscription to the observable sequence as always. Canceling the cancellationToken
results to the observable sequence completing in a faulted state (OperationCanceledException
).