Turning paginated requests into an Observable stream with RxJs
Asked Answered
T

4

9

I have a service which returns data in pages. The response to one page contains details on how to query for the next page.

My approach is to return the response data and then immediately concat a deferred call to the same observable sequence if there are more pages available.

function getPageFromServer(index) {
  // return dummy data for testcase
  return {nextpage:index+1, data:[1,2,3]};
}

function getPagedItems(index) {
  return Observable.return(getPageFromServer(index))
    .flatMap(function(response) {
      if (response.nextpage !== null) {
        return Observable.fromArray(response.data)
          .concat(Observable.defer(function() {return getPagedItems(response.nextpage);}));
      }

      return Observable.fromArray(response.data);
    });
}

getPagedItems(0).subscribe(
  function(item) {
    console.log(new Date(), item);
  },
  function(error) {
    console.log(error);
  }
)

This must be the wrong approach, because within 2 seconds you get:

      throw e;
            ^
RangeError: Maximum call stack size exceeded
    at CompositeDisposablePrototype.dispose (/Users/me/node_modules/rx/dist/rx.all.js:654:51)

What is the correct approach to pagination?

Tsaritsyn answered 16/12, 2014 at 21:33 Comment(0)
S
4

EDIT Ah! I see the problem you're facing. A bit of tail call optimization should fix you up:

function mockGetPageAjaxCall(index) {
  // return dummy data for testcase
  return Promise.resolve({nextpage:index+1, data:[1,2,3]});
}

function getPageFromServer(index) {
  return Observable.create(function(obs) {
    mockGetPageAjaxCall(index).then(function(page) {
      obs.onNext(page);
    }).catch(function(err) {
      obs.onError(err)
    }).finally(function() {
      obs.onCompleted();
    });
  });
}

function getPagedItems(index) {
    return Observable.create(function(obs) {
        // create a delegate to do the work
        var disposable = new SerialDisposable();
        var recur = function(index) {
            disposable.setDisposable(getPageFromServer(index).retry().subscribe(function(page) {
                obs.onNext(page.items);
                if(page.nextpage === null) {
                  obs.onCompleted();   
                }

                // call the delegate recursively
                recur(page.nextpage);
            }));
        };

        // call the delegate to start it
        recur(0);

        return disposable;
    });
}

getPagedItems(0).subscribe(
  function(item) {
    console.log(new Date(), item);
  },
  function(error) {
    console.log(error);
  }
)
Sponger answered 16/12, 2014 at 21:44 Comment(9)
When I test this (without the 10 pages at most exit path), it returns 1380 items and then stops. How can it stop?Tsaritsyn
I've created a JSFiddle with this code. jsfiddle.net/1wpcmuke It still throws with maximum call stack size exceeded.Tsaritsyn
You're exceeding the maximum call stack still. At around 430 pages returned. I think recursion might not be the best solution here.Sponger
Thanks for a solution. I was hoping for something built with Rx so that I could add in other composition operators such as retrying a page when an exception occurs. Therefore, I'm going to wait before accepting in case other Rx solutions are posted.Tsaritsyn
Considering you need to examine the return to recur, creating an observable, as you see above, is as RxJS as it can get. You can still compose off of this observable, just like any other observable. FWIW, it's literally created the same way in any of the operator methods you might use create observables internally.Sponger
I've expanded the example to include a retry of your get page, and make it "built with Rx". The thing is you're going to have to materialize those calls to create your final observable, or you will hit a max call stack issue.Sponger
The nested subscribe is problematic and a bit of an Rx antipattern. And the reason for the stackoverflow is not really a problem with the implementation but instead a problem with your mock service--the service is resolving synchronously when in the real world it will be asynchronous. It needs to yield after a timer. Then you won't get the stackoverflow. When I am on something other than my phone I'll post a solution I've used in the past for this.Ovipositor
"The nested subscribe is problematic and a bit of an Rx antipattern" ... source for this being an "anti-pattern"? It's literally all over the core source code. (.switch() for example)Sponger
I did miss returning the disposable, but took care of that with an edit.Sponger
O
7

Looking at the OP code, it is really the right method. Just need to make your mock service asynchronous to simulate real conditions. Here is a solution that avoids the stack exhaustion (I also made getPageFromServer actually return a cold observable instead of requiring the caller to wrap it).

Note that if you really expect your service requests to complete synchronously in the real application and thus you need to ensure your code does not exhaust the stack when this happens, just have getPagedItems() invoke the currentThread scheduler. The currentThread scheduler schedules tasks using a trampoline to prevent recursive calls (and stack exhaustion). See the commented out line at the end of getPagedItems

function getPageFromServer(index) {
    // return dummy data asynchronously for testcase
    // use timeout scheduler to force the result to be asynchronous like
    // it would be for a real service request
    return Rx.Observable.return({nextpage: index + 1, data: [1,2,3]}, Rx.Scheduler.timeout);

    // for real request, if you are using jQuery, just use rxjs-jquery and return:
    //return Rx.Observable.defer(function () { return $.ajaxAsObservable(...); });
}

function getPagedItems(index) {
    var result = getPageFromServer(index)
        .retry(3) // retry the call if it fails
        .flatMap(function (response) {
            var result = Rx.Observable.fromArray(response.data);
            if (response.nextpage !== null) {
                result = result.concat(getPagedItems(response.nextpage));
            }
            return result;
        });

    // If you think you will really satisfy requests synchronously, then instead
    // of using the Rx.Scheduler.timeout in getPageFromServer(), you can
    // use the currentThreadScheduler here to prevent the stack exhaustion...

    // return result.observeOn(Rx.Scheduler.currentThread) 
    return result;
}
Ovipositor answered 22/12, 2014 at 15:24 Comment(3)
I did a jsbin for this: jsbin.com/zaqoqedula/edit?js,console Thanks for the answer, very helpful.Foeticide
The output on this jsbin is easier for me to follow: jsbin.com/neniko/edit?js,consoleSherrod
Here is the first rewritten for RXJS 7 - codepen.io/bbos/pen/rNgXKmE (the other's are pre pipe())Underproduction
S
4

EDIT Ah! I see the problem you're facing. A bit of tail call optimization should fix you up:

function mockGetPageAjaxCall(index) {
  // return dummy data for testcase
  return Promise.resolve({nextpage:index+1, data:[1,2,3]});
}

function getPageFromServer(index) {
  return Observable.create(function(obs) {
    mockGetPageAjaxCall(index).then(function(page) {
      obs.onNext(page);
    }).catch(function(err) {
      obs.onError(err)
    }).finally(function() {
      obs.onCompleted();
    });
  });
}

function getPagedItems(index) {
    return Observable.create(function(obs) {
        // create a delegate to do the work
        var disposable = new SerialDisposable();
        var recur = function(index) {
            disposable.setDisposable(getPageFromServer(index).retry().subscribe(function(page) {
                obs.onNext(page.items);
                if(page.nextpage === null) {
                  obs.onCompleted();   
                }

                // call the delegate recursively
                recur(page.nextpage);
            }));
        };

        // call the delegate to start it
        recur(0);

        return disposable;
    });
}

getPagedItems(0).subscribe(
  function(item) {
    console.log(new Date(), item);
  },
  function(error) {
    console.log(error);
  }
)
Sponger answered 16/12, 2014 at 21:44 Comment(9)
When I test this (without the 10 pages at most exit path), it returns 1380 items and then stops. How can it stop?Tsaritsyn
I've created a JSFiddle with this code. jsfiddle.net/1wpcmuke It still throws with maximum call stack size exceeded.Tsaritsyn
You're exceeding the maximum call stack still. At around 430 pages returned. I think recursion might not be the best solution here.Sponger
Thanks for a solution. I was hoping for something built with Rx so that I could add in other composition operators such as retrying a page when an exception occurs. Therefore, I'm going to wait before accepting in case other Rx solutions are posted.Tsaritsyn
Considering you need to examine the return to recur, creating an observable, as you see above, is as RxJS as it can get. You can still compose off of this observable, just like any other observable. FWIW, it's literally created the same way in any of the operator methods you might use create observables internally.Sponger
I've expanded the example to include a retry of your get page, and make it "built with Rx". The thing is you're going to have to materialize those calls to create your final observable, or you will hit a max call stack issue.Sponger
The nested subscribe is problematic and a bit of an Rx antipattern. And the reason for the stackoverflow is not really a problem with the implementation but instead a problem with your mock service--the service is resolving synchronously when in the real world it will be asynchronous. It needs to yield after a timer. Then you won't get the stackoverflow. When I am on something other than my phone I'll post a solution I've used in the past for this.Ovipositor
"The nested subscribe is problematic and a bit of an Rx antipattern" ... source for this being an "anti-pattern"? It's literally all over the core source code. (.switch() for example)Sponger
I did miss returning the disposable, but took care of that with an edit.Sponger
L
3

Here is a more concise & IMHO clean answer without any recursion. It's using ogen(~46 loc) to transform any generator into an observable.

It has a custom built next function that will emit data anytime your function yield something.

nb: The original article is worth reading

function getPagedItems({offset=0, limit=4}) {
    paginatedQueryGenerator = function*(someParams offset, limit) {
        let hasMore = true
        while(hasMore) {
            const results = yield YOUR_PROMISE_BASED_REQUEST(someParams, limit, offset)
            hasMore = results && results.nextpage !== null 
            offset += limit
        }
    }

    return ogen(paginatedQueryGenerator)(someParams, offset, limit)
} 
Longsufferance answered 17/4, 2017 at 23:23 Comment(0)
O
0

Another one solution is to use retryWhen

getAllData() {
    let page = 0;
    let result = [];

    const getOnePage = () => {
        return of(0).pipe(mergeMap(() => getPaginatedData(page++)));
    };

    return getOnePage()
        .pipe(
            map(items => {
                result = result.concat(items);
                if (templates.length === PAGE_SIZE) {
                    throw 'get next page';
                }
            }),
            retryWhen(e => e.pipe(
                takeWhile(er => er === 'get next page'))
            ),
            map(e => result)
        )
        .subscribe(items => {
            console.log('here is all data', items);
        });

}
Obediah answered 5/12, 2018 at 12:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.