how to avoid glitches in Rx
Asked Answered
G

1

7

Unlike other "FRP" libraries, Rx doesn't prevent glitches: callbacks invoked with time-mismatched data. Is there a good way to work around this?

As an example, imagine that we have a series of expensive computations derived from a single stream (e.g. instead of _.identity, below, we do a sort, or an ajax fetch). We do distinctUntilChanged to avoid recomputing the expensive things.

sub = new Rx.Subject();
a = sub.distinctUntilChanged().share();
b = a.select(_.identity).distinctUntilChanged().share();
c = b.select(_.identity).distinctUntilChanged();
d = Rx.Observable.combineLatest(a, b, c, function () { return _.toArray(arguments); });
d.subscribe(console.log.bind(console));
sub.onNext('a');
sub.onNext('b');

The second event will end up causing a number of glitchy states: we get three events out, instead of one, which wastes a bunch of cpu and requires us to explicitly work around the mismatched data.

This particular example can be worked around by dropping the distinctUntilChanged, and writing some wonky scan() functions to pass through the previous result if the input hasn't changed. Then you can zip the results, instead of using combineLatest. It's clumsy, but doable.

However if there is asynchrony anywhere, e.g. an ajax call, then zip doesn't work: the ajax call will complete either synchronously (if cached) or asynchronously, so you can't use zip.

Edit

Trying to clarify the desired behavior with a simpler example:

You have two streams, a and b. b depends on a. b is asynchronous, but the browser may cache it, so it can either update independently of a, or at the same time as a. So, a particular event in the browser can cause one of three things: a updates; b updates; both a and b update. The desired behavior is to have a callback (e.g. render method) invoked exactly once in all three cases.

zip does not work, because when a or b fires alone, we get no callback from zip. combineLatest does not work because when a and b fire together we get two callbacks.

Gaona answered 11/3, 2014 at 17:39 Comment(9)
What exactly are you trying to achieve here? Technically this is doing exactly what it's composed to do. What is your expected result? Why do you think zip won't work with sync vs async? That's irrelevant to how zip consumes events, unless you're subscribing to a hot observable, in which case you may need to replay the last value.Symonds
In an FRP system (see flapjax, Elliot's papers, etc.), you would get exactly one event out with one event in, and no time-mismatched data. The second onNext would trigger console.log once, not three times, and a, b, and c would all be 'b' on the second event. zip won't work with async because it waits for a value from all streams. If only one updates, zip outputs nothing.Gaona
Just for reference, here's the same implemented in Bacon.js. No glitches. jsfiddle.net/DeUehIndole
OP, I'd suggest using throttle and just ignoring a if b comes back soon (say within 10ms). So a.merge(b).throttle(10). Alternatively you could use bufferWithTime, if you want to flatten a's event with b's event to form a single object.Symonds
But by definition there's absolutely no way to say "don't use a if b is going to be cached" until b has already been returned, or if you know by some external resource that b will definitely be cached. This isn't a glitch in Rx. It's the nature of uncertainty.Symonds
Furthermore, there is absolutely no reason to use the code you've listed, given that each parameter to combineLatest is exactly the same observable (wrapped). Therefore, I can only assume that the problem is oversimplified and that the solution may in fact require refactoring, and not a simple operator change. For instance, if this is truly a request/response scenario with a possible cache, then the sample code is simply wrong. Could you give a less trivial example, so that anyone who attempts to answer the question has the context needed to solve the problem?Symonds
@ChristopherHarris It's absolutely possible in an FRP library, because in FRP dispatch is based on topology, not order of subscriptions as it is with observer pattern. See any FRP library. See the bacon jsfiddle, above. Bacon is kind of an in-between library, using observer pattern but having glitch suppression for Behaviors only (IIRC).Gaona
I'll try to come up with a more complete example.Gaona
@ChristopherHarris hi chris! See you in the office.Geosyncline
K
6

The concept

both a and b update

where both a and b are observables, doesn't exist as a primitive in Rx.

There is no lossless, general operator that can be defined to decide when it receives a notification from a whether it should pass it downstream or hold off until it receives a notification from b. Notifications in Rx do not natively carry "both" semantics, or any semantics beyond the Rx Grammar for that matter.

Furthermore, Rx's serial contract prevents an operator from taking advantage of overlapping notifications in an attempt to achieve this goal. (Though I suspect that relying on race conditions isn't your desired approach anyway.)

See §§4.2, 6.7 in the Rx Design Guidelines.

Thus, what I meant above by "There is no lossless, general operator that can be defined..." is that given two observables a and b with independent notifications, any operator that attempts to decide when it receives a notification from a or b whether it must push immediately or wait for the "other" value, must rely on arbitrary timings. It's guesswork. So this hypothetical operator must either drop values (e.g., DistinctUntilChanged or Throttle), or drop time (e.g., Zip or Buffer), though probably some combination of both.

Therefore, if the agent has the ability to push a alone, or b alone, or a and b together as a notification unit, then it's the developer's responsibility to reify this concept of notification unit themselves.

A 3-state type is required: a | b | {a,b}

(Please excuse my lousy JS)

var ab = function(a, b) { this.a = a; this.b = b; }
sub.onNext(new ab('a'));        // process a alone
sub.onNext(new ab('a', 'b'));   // process a and b together
sub.onNext(new ab(null, 'c'));  // process c alone

The shape of the observable's query no longer matters. Observers must be defined to accept this data type. It's the generator's responsibility to apply any necessary buffering or timing calculations based on the semantics of its internal state in order to produce correct notifications for its observers.

By the way, thank you for providing a simple explanation in your edit (it seems clear to me anyway). I had first heard about "glitches" in this Rx forum discussion. As you can see, it was never really concluded. Now I wonder whether that OP's problem was really as simple as this, assuming that I've understood your problem correctly, of course. :-)

Update:

Here's another related discussion, including some more of my thoughts on why Rx is not FRP:

https://social.msdn.microsoft.com/Forums/en-US/bc2c4b71-c97b-428e-ad71-324055a3cd03/another-discussion-on-glitches-and-rx?forum=rx

Kiushu answered 11/9, 2014 at 4:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.