Repository pattern with SqlBrite/SqlDelight(Offline database) and Retrofit(Http request)
Asked Answered
L

2

12

I am implementing repository pattern in RxJava using SqlBrite/SqlDelight for offline data storage and retrofit for Http requests

Here's a sample of that:

protected Observable<List<Item>> getItemsFromDb() {
     return database.createQuery(tableName(), selectAllStatement())
             .mapToList(cursor -> selectAllMapper().map(cursor));
 }


public Observable<List<Item>>getItems(){
     Observable<List<Item>> server = getRequest()
                 .doOnNext(items -> {
                     BriteDatabase.Transaction transaction = database.newTransaction();
                     for (Item item : items){
                         database.insert(tableName(), contentValues(item));
                     }
                     transaction.markSuccessful();
                     transaction.end();
                 })
                 .flatMap(items -> getItemsFromDbById())
                 .delaySubscription(200, TimeUnit.MILLISECONDS);
         Observable<List<Item>> db = getItemsFromDbById(id)
                 .filter(items -> items != null && items.size() > 0);
     return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
 }

The current implementation uses Observable.amb to get latest of 2 streams and returns db stream in case db has data or server otherwise. To prevent early failure in case of no internet, server has a delaySubscription on it with 200ms.

I tried using Observable.concat but the SqlBrite stream never calls onComplete so server observable is never triggered.

I also tried Observable.combineLatest which didn't work because it keeps waiting for server observable to return data before emitting anything and Observable.switchOnNext didn't work either.

What I am looking for is a repository which:

  • Keeps the subscription to SqlBrite (DB) open, in case of DB updates
  • Always fetches data from server and writes it to database
  • Should not emit empty result in case there was nothing in database and network request is still going on. This, because the user should see a progress bar in the case of the first load.
Leopoldine answered 18/5, 2017 at 5:43 Comment(3)
I can't understand what do you want.Craigie
@DeanXu I have updated the question.Leopoldine
Let me correct , you want to perform 2 operation at a time and want to combine result in case of success , in case of failure you want to manage that flow with observer right !?Colza
L
1

This is how you can solve the problem above, i.e., fetching data from 2 sources (local and remote) and send an update to UI only when required.

The data class wraps your data and also stores the source of data

class Data<T> {

    static final int STATE_LOCAL = 0;
    static final int STATE_SERVER = 1;

    private T data;
    private int state;

    Data(T data, int state) {
        this.data = data;
        this.state = state;
    }

    public int getState() { return state; }

    public T getData() { return data; }
}

...

public Observable<Model> getData(long id) {

    // Used to cache data and compare it with server data, so we can avoid unnecessary UI updates
    Subject<Data<Model>> publishSubject = BehaviorSubject.create();
    publishSubject.onNext(new Data<>(null, Data.STATE_LOCAL));

    Observable<Data<Model>> server = getRequest()
            .map(items -> new Data<>(items, Data.STATE_SERVER))
            // Here we are combining data from server and our `BehaviorSubject`
            // If any one has ideas how to do this without the subject, I'll be glad to hear it.
            .flatMap(items -> Observable.zip(publishSubject.take(1), Observable.just(items), Pair::new))
            .flatMap(oldNewPair -> {
                // Here we are comparing old and new data to see if there was any new data returned from server
                Data<Model> prevData = oldNewPair.first;
                Data<Model> newData = oldNewPair.second;
                //Could be any condition to compare the old and new data
                if (prevData.data != null && prevData.data.updated_at() == newData.data.updated_at()) 
                    return Observable.just(prevData);
                else
                    return database.insert(tableName(), contentValues(newData));

                return getFromDb(id)
                        .map(item -> new Data<>(item, Data.STATE_LOCAL))
                        .onErrorResumeNext(server)
                        .doOnNext(item -> {
                            publishSubject.onNext(item);
                            if (item.getState() == Data.STATE_LOCAL)
                                server.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe();
                        })
                        .map(item -> item.data);
}

This solution is without using amb and uses BehaviorSubject which solves the following problem:

  1. No use of delaySubscription(Earlier used to prevent early failure in case of no internet.)

  2. Earlier, each time two calls were made to the server which is solved in this case.

Leopoldine answered 7/9, 2017 at 7:25 Comment(0)
R
0

You code directly contradicts what you want to do. This line:

     Observable<List<Item>> db = getItemsFromDbById(id)
             .filter(items -> items != null && items.size() > 0);

Is a contradiction to itself because you return single database query's items and name it db - as if the database(or its reference) itself. From this point it's clear that that the code you provided can't be helped.

There are many java templates available of the repository pattern. For example: https://www.bignerdranch.com/blog/the-rxjava-repository-pattern/

If that didn't help enough try to provide code that does at least distantly what you are describing.

Rodolforodolph answered 30/5, 2017 at 1:33 Comment(1)
Is a contradiction to itself because you return single database query's items and name it db - as if the database(or its reference) itself. Can you explain this? The database is SQLBriteDatabase instance and it automatically triggers an update if there's something new in the database. The observable db will emit a new list of data if there is something new.Reeding

© 2022 - 2024 — McMap. All rights reserved.