Throttle only if specific condition met
Asked Answered
S

2

5

I have an observable that I am subscribing on. This obsevable will be returning an object that has a property called ActivationType that can be set multiple times.

What I am trying to achieve is log a message whenever ActivationType is set to "Type1". However, if ActivationType is set to "Type2", log the message only once and wait for 30 seconds before logging again if ActivationType is "Type2".

So if I have:

myObservable
    .Where(o => o.ActivationType == "Type1" || o.ActivationType == "Type2")  //listen for types 1 and 2
    .Throttle() // ??? somehow only throttle if we are currently looking at Type2
    .Subscribe(Log); //log some stuff

I believe Throttle() is what I am looking for but am not sure how to trigger it conditionally.

Any suggestions?

Silicone answered 6/3, 2013 at 17:10 Comment(2)
A simple Throttle probably isn't what you want - Throttle is much like Window, in that it will trigger once at the end of each period (in your case 30 seconds), so any "Type2" events would be delayed until the end of each window.Hebbe
@JerKimball: I see. I am still new to Rx. What should I be using if I just want to trigger the first time without any delay, and then ignore the following Type2 events until 30 secs. has passed (after that just triggering once again if necessary)?Silicone
H
6

Ah, a perfect case for the near-impossible-to-understand Window operator!

EDIT: I post this link like a dozen times a month, I swear - best read-thru I've seen of the Window, Join, Buffer, GroupJoin, etc. operators:

Lee Campbell: Rx Part 9–Join, Window, Buffer and Group Join

var source = new Subject<Thing>();

var feed = source.Publish().RefCount();
var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
    // only window the type2s
    .Where(t => t.ActivationType == "Type2")
    // our "end window selector" will be a tick 30s off from start
    .Window(() => Observable.Timer(TimeSpan.FromSeconds(30)))
    // we want the first one in each window...
    .Select(lst => lst.Take(1))
    // moosh them all back together
    .Merge();

    // We want all "type 1s" and the buffered outputs of "type 2s"
    var query = ofType1.Merge(ofType2);

    // Let's set up a fake stream of data
    var running = true;
    var feeder = Task.Factory.StartNew(
       () => { 
         // until we say stop...
         while(running) 
         { 
             // pump new Things into the stream every 500ms
             source.OnNext(new Thing()); 
             Thread.Sleep(500); 
         }
    });

    using(query.Subscribe(Console.WriteLine))
    {               
        // Block until we hit enter so we can see the live output 
        // from the above subscribe 
        Console.ReadLine();
        // Shutdown our fake feeder
        running = false;
        feeder.Wait();
     }
Hebbe answered 6/3, 2013 at 18:28 Comment(8)
Thanks for the sample and link. Very helpful. It seems to be working as expected but I have a quick question. I made some minor changes to your sample. I changed the timespan from 30 seconds to 5 and in the feeder's while loop, I changed the Thread.Sleep time from 500 to TimeSpan.FromSeconds(4). Now, if I Console.WriteLine the current time in using(query.Subscribe(Console.WriteLine)), I see a line every 4 seconds when I expect it to be every 5 seconds. Do you know why that is?Silicone
@Silicone The "feeder" is just something to simulate your real stream - the code up to var running = true; is the important stuff; You would replace source = ... with your stream and scrap the feeder thing altogether.Hebbe
Yeah, in my code I am using my actual streams. I was just messing around with your sample to get a better understanding of the Window functionality. So after changing some values I got confused, thinking that I would only see 1 update max every 5 seconds but instead seeing updates come every 4 seconds instead of 5.Silicone
@Silicone Well, if you changed the Sleep to 4 seconds, that would be the frequency that updates would come inHebbe
Hmm. Sorry, I guess I am misunderstanding what .Window(() => Observable.Timer(TimeSpan.FromSeconds(5))) does. I thought that only one update would be processed max per 5 seconds. So feeder puts out an update and the subscription processes it. Then after 4 seconds, feeder puts out another update but since less than 5 seconds passed, it is not processed (no console output). Then, the next feeder update comes again after another 4 seconds and is processed (printed out).Silicone
@Silicone The first argument to Window says "when you see a signal on this, end this window and start a new one" - and then we pull only the first element out of that collected window. So - type 2 seen, window starts; more type 2s come within 30s, they are in that window, and effectively ignored; timer ticks, new window starts; next type 2 begins new window.Hebbe
@flack oh, weird; didn't notice that when I wa testing it out...if I get a chance to play with it, I'll see if I can tweak it. in the meantime, you could have one more merged stream that looks like ofType2.Take(1)Hebbe
Thanks for all your help JerKimball. I had deleted my previous comment (before you posted your reply) because I think I made a mistake in saying the first value is ignored. It looks like I was just starting the feeder before the subscription was made so when the feeder output its first value no one was listening for it. I think your sample works fine but if you do play around with it and happen to notice something screwy, please let me know :)Silicone
A
3

Why not just use two streams?

var baseStream = myObservable.Publish().RefCount(); // evaluate once
var type1 = baseStream.Where(o => o.ActivationType == "Type1");
var type2 = baseStream.Where(o => o.ActivationType == "Type2").Throttle(TimeSpan.FromSeconds(30));

type1.Merge(type2).Subscribe(Log);
Arleanarlee answered 6/3, 2013 at 17:26 Comment(5)
One issue with using Throttle (in comment to OP ) - the first receipt of a "Type2" would be delayed by the throttle window, which might not be the desired behavior.Hebbe
Thanks Alex. I did some testing and as JerKimball pointed out, the first receipt of Type2 does not occur until the 30 secs. passed (assuming no subsequent Type2's arrive and reset the throttle period)Silicone
Good point. As a replacement how about .Buffer(TimeSpan.FromSeconds(30)).Where(list => list.Any()).Select(list => list.Last())?Arleanarlee
@AlexG: It seems to behave similarly. Looks like Buffer with a timespan of 30 secs will not trigger anything until at least 30 secs passes. I can't seem to get it to trigger immediately if necessary and then apply the timeout to subsequent updates.Silicone
RefCount is unnecessary here (and overused in general, IMHO). I'd use the Publish(selector) overload here instead.Blurt

© 2022 - 2024 — McMap. All rights reserved.