I've been having some trouble with a recursive chain of observables.
I am working with RxJS, which is currently in version 1.0.10621, and contains most basic Rx functionality, in conjunction with Rx for jQuery.
Let me introduce an example scenario for my problem: I am polling the Twitter search API (JSON response) for tweets/updates containing a certain keyword. The response also includes a "refresh_url" which one should use to generate follow-up request. The response to that follow-up request will again contain a new refresh_url, etc.
Rx.jQuery allows me to make the Twitter search API call an observable event, which produces one onNext and then completes. What I have tried so far is to have the onNext handler remember the refresh_url and use it in the onCompleted handler to produce both a new observable and corresponding observer for the next request. This way, one observable + observer pair follows the other indefinitely.
The problem with this approach is:
The follow-up observable/observer are already alive when their predecessors have not yet been disposed of.
I have to do lots of nasty bookkeeping to maintain a valid reference to the currently living observer, of which there can actually be two. (One in onCompleted and the other somewhere else in its life-cycle) This reference is, of course, needed to unsubscribe/dispose of the observer. An alternative to the bookkeeping would be to implement a side effect by the means of a "still running?"-boolean, as I have done in my example.
Example code:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function () {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function () {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
Don't be fooled by the double layer of observables/observers from requests to tweets. My example concerns itself mainly with the first layer: requesting data from Twitter. If in solving this problem the second layer (converting responses into tweets) can become one with the first one, that would be fantastic; But i think that's a whole different thing. For now.
Erik Meijer pointed out the Expand operator to me (see example below), and suggested Join patterns as an alternative.
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => ( i == 10 ? Observable.Empty<int>() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
This should be copy-pastable into LINQPad. It assumes singleton observables and produces one final observer.
So my question is: How can I do the expand trick nicest in RxJS?
EDIT:
The expand operator can probably be implemented as shown in this thread. But one would need generators (and I only have JS < 1.6).
Unfortunately RxJS 2.0.20304-beta does not implement the Extend method.