What's a good way to run periodic tasks using Rx, with a single concurrent execution restriction?
Asked Answered
C

4

8

I want to run periodic tasks in with a restriction that at most only one execution of a method is running at any given time.

I was experimenting with Rx, but I am not sure how to impose at most once concurrency restriction.

var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
timer.Subscribe(tick => DoSomething());

Additionally, if a task is still running, I want the subsequent schedule to elapse. i.e I don't want the tasks to queue up and cause problems.

I have 2 such tasks to execute periodically. The tasks being executed is currently synchronous. But, I could make them async if there is a necessity.

Congreve answered 14/7, 2015 at 6:18 Comment(0)
W
6

You are on the right track, you can use Select + Concat to flatten out the observable and limit the number of inflight requests (Note: if your task takes longer than the interval time, then they will start to stack up since they can't execute fast enough):

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
          //I assume you are doing async work since you want to limit concurrency
          .Select(_ => Observable.FromAsync(() => DoSomethingAsync()))
          //This is equivalent to calling Merge(1)
          .Concat();

source.Subscribe(/*Handle the result of each operation*/);
Wake answered 14/7, 2015 at 7:14 Comment(1)
Thanks. I made some modification to the question. How can I ensure that tasks are not getting queued up in cases where the task takes longer than the interval. Currently tasks are sync, but open to async.Congreve
R
6

You should have tested your code as is because this is exactly what Rx imposes already.

Try this as a test:

void Main()
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething()))
    {
        Console.ReadLine();
    }
}

private void DoSomething()
{
    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    Thread.Sleep(1000);
    Console.WriteLine(">");
}

When you run this you'll get this kind of output:

!
<16:54:57.111>
!
<16:54:58.112>
!
<16:54:59.113>
!
<16:55:00.113>
!
<16:55:01.114>
!
<16:55:02.115>
!
<16:55:03.116>
!
<16:55:04.117>
!
<16:55:05.118>
!
<16:55:06.119

It is already ensuring that there's no overlap.

Reek answered 14/7, 2015 at 7:25 Comment(6)
This is true as long as DoSomething isn't kicking anything off in the background.Wake
Got it. I made some modification to the question. How can I ensure that tasks are not getting queued up in cases where the task takes longer than the interval.Congreve
@Congreve - Again Rx takes care of it. That's the reason I put the "!" in the .Do(...) method off of the interval and why I set the interval to 100 milliseconds. If it were queuing I would have got 10x as many "!" as the <16:54:57.111>, but I didn't. It doesn't queue.Reek
@Congreve - @Reek has the correct answer if your tasks are synchronous, Interval uses recursive scheduling, meaning that what happens is the next interval will only be scheduled after the completion of your action. As a result every operation is guaranteed to be 100 milliseconds apartWake
@Wake - I think strictly speaking they are guaranteed to be 100ms apart (in other words, the gap), but the starting times won't be every 100ms.Reek
@Reek - correct, I meant "relative to the end of the previous action"Wake
C
2

Below are two implementations of a PeriodicSequentialExecution method, that creates an observable by executing an asynchronous method in a periodic fashion, enforcing a no-overlapping-execution policy. The interval between subsequent executions can be extended to prevent overlapping, in which case the period is time-shifted accordingly.

The first implementation is purely functional, while the second implementation is mostly imperative. Both implementations are functionally identical. The first one can be supplied with a custom IScheduler. The second one may be slightly more efficient.

The functional implementation:

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// action that is invoked periodically and sequentially (without overlapping).
/// </summary>
public static IObservable<T> PeriodicSequentialExecution<T>(
    Func<CancellationToken, Task<T>> action,
    TimeSpan dueTime, TimeSpan period,
    CancellationToken cancellationToken = default,
    IScheduler scheduler = null)
{
    // Arguments validation omitted
    scheduler ??= DefaultScheduler.Instance;
    return Delay(dueTime) // Initial delay
        .Concat(Observable.Using(() => CancellationTokenSource.CreateLinkedTokenSource(
            cancellationToken), linkedCTS => 
            // Execution loop
            Observable.Publish( // Start a hot delay timer before each operation
                Delay(period), hotTimer => Observable
                    .StartAsync(() => action(linkedCTS.Token)) // Start the operation
                    .Concat(hotTimer) // Await the delay timer
            )
            .Repeat()
            .Finally(() => linkedCTS.Cancel()) // Unsubscription: cancel the operation
        ));

    IObservable<T> Delay(TimeSpan delay)
        => Observable
            .Timer(delay, scheduler)
            .IgnoreElements()
            .Select(_ => default(T))
            .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
                o.OnError(new OperationCanceledException(cancellationToken)))));
}

The imperative implementation:

public static IObservable<T> PeriodicSequentialExecution2<T>(
    Func<CancellationToken, Task<T>> action,
    TimeSpan dueTime, TimeSpan period,
    CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    return Observable.Create<T>(async (observer, ct) =>
    {
        using (var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(
            ct, cancellationToken))
        {
            try
            {
                await Task.Delay(dueTime, linkedCTS.Token);
                while (true)
                {
                    var delayTask = Task.Delay(period, linkedCTS.Token);
                    var result = await action(linkedCTS.Token);
                    observer.OnNext(result);
                    await delayTask;
                }
            }
            catch (Exception ex) { observer.OnError(ex); }
        }
    });
}

The cancellationToken parameter can be used for the graceful termination of the resulting observable sequence. This means that the sequence waits for the currently running operation to complete before terminating. If you prefer it to terminate instantaneously, potentially leaving work running unobserved in a fire-and-forget fashion, you can simply dispose the subscription to the observable sequence as always. Canceling the cancellationToken results to the observable sequence completing in a faulted state (OperationCanceledException).

Cienfuegos answered 17/11, 2020 at 3:14 Comment(0)
A
0

Here is a factory function that does exactly what you are asking for.

public static IObservable<Unit> Periodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Concat(Observable.Return(Unit.Default).Delay(timeSpan).Repeat());
}

Here is an example usage

Periodic(TimeSpan.FromSeconds(1))
    .Subscribe(x =>
    {
        Console.WriteLine(DateTime.Now.ToString("mm:ss:fff"));
        Thread.Sleep(500);
    });

If you run this, each console print will be roughly 1.5 seconds apart.

Note, If you don't want the first tick to run immediately, you could instead use this factory, which won't send the first Unit until after the timespan.

public static IObservable<Unit> DelayedPeriodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Delay(timeSpan).Repeat();
}
Antipodes answered 6/3, 2017 at 21:12 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.