I am implementing repository pattern in RxJava using SqlBrite/SqlDelight for offline data storage and retrofit for Http requests
Here's a sample of that:
protected Observable<List<Item>> getItemsFromDb() {
return database.createQuery(tableName(), selectAllStatement())
.mapToList(cursor -> selectAllMapper().map(cursor));
}
public Observable<List<Item>>getItems(){
Observable<List<Item>> server = getRequest()
.doOnNext(items -> {
BriteDatabase.Transaction transaction = database.newTransaction();
for (Item item : items){
database.insert(tableName(), contentValues(item));
}
transaction.markSuccessful();
transaction.end();
})
.flatMap(items -> getItemsFromDbById())
.delaySubscription(200, TimeUnit.MILLISECONDS);
Observable<List<Item>> db = getItemsFromDbById(id)
.filter(items -> items != null && items.size() > 0);
return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
}
The current implementation uses Observable.amb
to get latest of 2 streams and returns db
stream in case db
has data or server otherwise. To prevent early failure in case of no internet, server
has a delaySubscription
on it with 200ms
.
I tried using Observable.concat
but the SqlBrite stream never calls onComplete
so server
observable is never triggered.
I also tried Observable.combineLatest
which didn't work because it keeps waiting for server
observable to return data before emitting anything and Observable.switchOnNext
didn't work either.
What I am looking for is a repository which:
- Keeps the subscription to SqlBrite (DB) open, in case of DB updates
- Always fetches data from server and writes it to database
- Should not emit empty result in case there was nothing in database and network request is still going on. This, because the user should see a progress bar in the case of the first load.