JavaRX Pagination - observe in each interration rather than at the end - Generic Paginator
Asked Answered
L

2

0

I'm working with a paginated API. I have used the following solution provided by Adam Millerchip and it works well.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.processors.BehaviorProcessor;

public class Pagination {

    // Fetch all pages and return the items contained in those pages, using the provided page fetcher function
    public static <T> Flowable<T> fetchItems(Function<Integer, Single<Page<T>>> fetchPage) {
        // Processor issues page indices
        BehaviorProcessor<Integer> processor = BehaviorProcessor.createDefault(0);
        // When an index number is issued, fetch the corresponding page
        return processor.concatMap(index -> fetchPage.apply(index).toFlowable())
                        // when returning the page, update the processor to get the next page (or stop)
                        .doOnNext(page -> {
                            if (page.hasNext()) {
                                processor.onNext(page.getNextPageIndex());
                            } else {
                                processor.onComplete();
                            }
                        })
                        .concatMapIterable(Page::getElements);
    }

    public static void main(String[] args) {
        fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println);
    }

    // A function to fetch a page of our paged data
    private static Single<Page<String>> examplePageFetcher(int index) {
        return Single.just(pages.get(index));
    }

    // Create some paged data
    private static ArrayList<Page<String>> pages = new ArrayList<>(3);

    static {
        pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1)));
        pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2)));
        pages.add(new Page<>(Arrays.asList("five"), Optional.empty()));
    }

    static class Page<T> {
        private List<T> elements;
        private Optional<Integer> nextPageIndex;

        public Page(List<T> elements, Optional<Integer> nextPageIndex) {
            this.elements = elements;
            this.nextPageIndex = nextPageIndex;
        }

        public List<T> getElements() {
            return elements;
        }

        public int getNextPageIndex() {
            return nextPageIndex.get();
        }

        public boolean hasNext() {
            return nextPageIndex.isPresent();
        }
    }
}

But I have 2 questions :

  • In this implementation elements are processed at the end (subscribe(System.out::println)) when all pages are loaded. This may cause memory problems if gathered data are numerous. I would prefer to process them (data base save) immediately when they are loaded (in the .doOnNext(page -> { }). I have been able to do it but in a "dirty way" (add database save code in the doOnNext). How can I do this ?

  • in my implementation of the "page" class I use a custom Gson deserializer. And I don't know how to deal with Generic data. I have had to write "list.add((MyGenericClass)context.deserialize(anArray.getAsJsonObject(), MyGenericClass.class));" where I would want something like "list.add((T)context.deserialize(anArray.getAsJsonObject(), T.class));". How can I keep things realy generic ?

    public static JsonDeserializer<Paginator> deserializer = new JsonDeserializer<Paginator>() {
    @Override
    public Paginator deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
        JsonObject jsonObject = json.getAsJsonObject();
        Paginator paginator = new Paginator(null, Optional.of(1));
        if (jsonObject.get("data") != null && !jsonObject.get("data").isJsonNull()) {
            JsonArray array = jsonObject.getAsJsonArray("data");
            List<MyGenericClass> list = new ArrayList<>();
            for (JsonElement anArray : array) {
                list.add((MyGenericClass)context.deserialize(anArray.getAsJsonObject(), MyGenericClass.class));
            }
            paginator.setData(list);
        }
        paginator.setCurrent_page(jsonAsInt(jsonObject, "current_page",-1));
        paginator.setFrom(jsonAsInt(jsonObject,"from",-1));
        paginator.setTo(jsonAsInt(jsonObject,"to",-1));
        paginator.setTotal(jsonAsInt(jsonObject,"total",-1));
        paginator.setLast_page(jsonAsInt(jsonObject, "last_page", -1));
        paginator.setNextPage(); // calculate next page
        return paginator;
    }
    };
    
Librarianship answered 26/6, 2019 at 11:49 Comment(0)
B
0

To answer your first question:

In this implementation elements are processed at the end (subscribe(System.out::println)) when all pages are loaded."

This is incorrect. The whole point of reactive programming is to avoid this. fetchItems() returns a Flowable<T>, which doesn't actually fetch any items until something subscribes to it. When you subscribe to something, the subscriber gets notified each time an item is ready. You should call subscribe() and pass a function that will get called each time an item is ready. In my example, I pass it System.out::println, which prints the values, but you can implement your own handler that saves to the database.

I would prefer to process them (data base save) immediately when they are loaded (in the .doOnNext(page -> { })

This is confusing the difference between a Publisher and a Consumer. A publisher produces items - in my example it's a Flowable<T> that produces items of type T. A consumer consumes the items a publisher produces. doOnNext() is a function of the publisher. It says "when you publish something, also do this side effect". In my example, the side effect is to issue the next page number to fetch. You shouldn't handle the DB save there, you should write your own callback function (Consumer) or Subscriber to handle it, and provide that to the subscribe call.

Bessiebessy answered 26/6, 2019 at 14:48 Comment(1)
Yes your are right. I have inserted log.d and I can see that they are processed as soon as they are received. So I have moved my database save into the suscribe like in your example. Your example was very well writen it was nice to have such an help. Thnak you.Librarianship
D
0

As Adam Millerchip mentioned, you need to process every item in single fetch subscription. Here is an example:

List<Integer> dataSource = new ArrayList<>(10);

    public void fetch(int bufferSize) {
        Observable.fromIterable(dataSource) //Use special constructor to get stream from the iterable
                .buffer(bufferSize) //Take N items from the stream...
                .flatMapIterable((bunch) -> bunch) //... and then process them separately
                .subscribe(this::processItem); //here you can get every item arriving from the buffer
    }

After the buffer emptied - another part of the data will be fetched and passed to the buffer. And so on till your source Observable (Observable.fromIterable(dataSource)) will emit onComplete or onError.

Dittmer answered 26/6, 2019 at 15:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.