Consuming a paginated API in a reactive style
Asked Answered
P

1

6

I'm trying to figure out how to consume a paginated API in a reactive style, exposing a stream of items instead of the pages that the API gives me.

What I Have - A Paginated API

The Slack API (and many others) handle pagination with some form of meta-data object, so that on a query to /api/stuff you get back an object like this:

{
    "items": [ <list of first page you requested> ],
    "meta": {
        "cursor": "<a cursor string to get the next page>"
    }
}

You can then request the next page with GET /api/stuff?cursor=<the cursor from above>, and in the response you'll get a new cursor, and so you can keep going until you get a response where the cursor is empty - that was then the last available page.

What I Want - A Stream of Items

With Reactor, I'd like to provide a method with the return type Flux<Item>, i.e. a stream of all items where the consumer doesn't need to know or care that the items are actually fetched one page at a time.

Basically, I'd like to do something that takes an initial state and a method with the signature State -> Mono<Tuple2<Collection<Item>, State>> (or equivalent) and gives me a Flux<Item> by sequentially fetching new pages, until the State indicates that we are done.

I've tried to formulate something using Flux.create or Flux.generate but it seems their use cases are slightly different from mine.

How Do I Get There?

Ptolemaic answered 20/6, 2019 at 9:50 Comment(0)
O
4

I'm a reactor newbie, but I think this is what expand() is for:

Here is a complete demo, where getFirstPage() and getNextPage()simulate the slack API. getNextPage() returns the last page when the cursor reaches the value 5.

public class Slack {
    public static void main(String[] args) {
        getFirstPage().expand(page -> getNextPage(page))
                      .flatMapIterable(page -> page.getResult())
                      .subscribe(System.out::println);
    }

    private static Mono<Page> getFirstPage() {
        System.out.println("getting first page");
        return Mono.just(new Page(Arrays.asList("page0-1", "page0-2"), 1));
    }

    private static Mono<Page> getNextPage(Page page) {
        System.out.println("getting page " + page.getCursor());
        if (page.getCursor() == null) {
            return Mono.empty();
        }
        Integer nextCursor = (page.getCursor() == 5) ? null : page.getCursor() + 1;
        return Mono.just(new Page(Arrays.asList("page" + page.getCursor() + "-1", "page" + page.getCursor() + "-2"), nextCursor));
    }

    static class Page {
        private List<String> result;
        private Integer cursor;

        public Page(List<String> result, Integer cursor) {
            this.result = result;
            this.cursor = cursor;
        }

        public List<String> getResult() {
            return result;
        }

        public Integer getCursor() {
            return cursor;
        }
    }
}
Og answered 20/6, 2019 at 10:12 Comment(1)
Nice one. i used this concept to handle pagination. just want to confirm, when Mono.empty() is returned, it will exit the expand chain and flatMap whatever the existing iterable is?Undesigned

© 2022 - 2024 — McMap. All rights reserved.