RxJS: Recursive list of observables and single observer
Asked Answered
T

1

9

I've been having some trouble with a recursive chain of observables.

I am working with RxJS, which is currently in version 1.0.10621, and contains most basic Rx functionality, in conjunction with Rx for jQuery.

Let me introduce an example scenario for my problem: I am polling the Twitter search API (JSON response) for tweets/updates containing a certain keyword. The response also includes a "refresh_url" which one should use to generate follow-up request. The response to that follow-up request will again contain a new refresh_url, etc.

Rx.jQuery allows me to make the Twitter search API call an observable event, which produces one onNext and then completes. What I have tried so far is to have the onNext handler remember the refresh_url and use it in the onCompleted handler to produce both a new observable and corresponding observer for the next request. This way, one observable + observer pair follows the other indefinitely.

The problem with this approach is:

  1. The follow-up observable/observer are already alive when their predecessors have not yet been disposed of.

  2. I have to do lots of nasty bookkeeping to maintain a valid reference to the currently living observer, of which there can actually be two. (One in onCompleted and the other somewhere else in its life-cycle) This reference is, of course, needed to unsubscribe/dispose of the observer. An alternative to the bookkeeping would be to implement a side effect by the means of a "still running?"-boolean, as I have done in my example.

Example code:

            running = true;
            twitterUrl = "http://search.twitter.com/search.json";
            twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
            twitterMaxId = 0; //actually twitter ignores its since_id parameter

            newTweetObserver = function () {
                return Rx.Observer.create(
                        function (tweet) {
                            if (tweet.id > twitterMaxId) {
                                twitterMaxId = tweet.id;
                                displayTweet(tweet);
                            }
                        }
                    );
            }

            createTwitterObserver = function() {
                twitterObserver = Rx.Observer.create(
                        function (response) {
                            if (response.textStatus == "success") {
                                var data = response.data;
                                if (data.error == undefined) {
                                    twitterQuery = data.refresh_url;
                                    var tweetObservable;
                                    tweetObservable = Rx.Observable.fromArray(data.results.reverse());
                                    tweetObservable.subscribe(newTweetObserver());
                                }
                            }
                        },
                        function(error) { alert(error); },
                        function () {
                            //create and listen to new observer that includes a delay 
                            if (running) {
                                twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
                                twitterObservable.subscribe(createTwitterObserver());
                            }
                        } 
                    );
                return twitterObserver;
            }
            twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
            twitterObservable.subscribe(createTwitterObserver());

Don't be fooled by the double layer of observables/observers from requests to tweets. My example concerns itself mainly with the first layer: requesting data from Twitter. If in solving this problem the second layer (converting responses into tweets) can become one with the first one, that would be fantastic; But i think that's a whole different thing. For now.

Erik Meijer pointed out the Expand operator to me (see example below), and suggested Join patterns as an alternative.

var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
                   , i => ( i == 10 ? Observable.Empty<int>() // terminate
         : new[]{i+1}.ToObservable() // recurse
 )
);

ys.ToArray().Select(a => string.Join(",", a)).DumpLive();

This should be copy-pastable into LINQPad. It assumes singleton observables and produces one final observer.

So my question is: How can I do the expand trick nicest in RxJS?

EDIT:
The expand operator can probably be implemented as shown in this thread. But one would need generators (and I only have JS < 1.6).
Unfortunately RxJS 2.0.20304-beta does not implement the Extend method.

Toehold answered 27/3, 2012 at 22:57 Comment(3)
I have come to the conclusion that the solution to this problem is indeed the Expand operator which is not yet implemented in RxJS up to version 2.0.20304-beta.Toehold
expand is exactly what you need. I realize this post is old, but it is possible to simply transplant the operators you need from a future version of Rx into your own version of Rx. It may require some manipulation, but the code base has not changed substantially from pre-v1.Superhighway
Furthermore, upgrading Rx (where possible) is a good decision. There are many bug fixes and improvements in the latest versions.Superhighway
C
4

So i am going to attempt to solve your problem slightly different than you did and take some liberties that you can solve easier.

So 1 thing i cannot tell is that are you attempting to the following steps

  • Get the first tweet list, which contains next url
  • Once tweet list received, onNext the current observer and get the next set of tweets
    • Do this indefinitely

Or is there a user action (the get more / scrolling to bottom). Either way, its really the same problem. I may be reading your problem incorrectly though. Here is the answer for that.

function getMyTweets(headUrl) {
    return Rx.Observable.create(function(observer) {

        innerRequest(headUrl);
        function innerRequest(url) {
            var next = '';

            // Some magic get ajax function
            Rx.get(url).subscribe(function(res) {
                observer.onNext(res);
                next = res.refresh_url;
            },
            function() {
                // Some sweet handling code
                // Perhaps get head?
            },
            function() {
                innerRequest(next);
            });
        }
    });
}

This may not be the answer you were asking for. If not, sorry!


Edit: After looking through your code, it looks like you want to take results as an array and observablize it.

// From the results perform a select then a merge (if ordering does not matter).
getMyTweets('url')
    .selectMany(function(data) {
        return Rx.Observable.fromArray(data.results.reverse());
    });

// Ensures ordering
getMyTweets('url')
    .select(function(data) {
        return Rx.Observable.fromArray(data.results.reverse());
    })
    .concat();
Cirrostratus answered 24/12, 2013 at 4:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.