Correct flow in RxJava with Retrofit and Realm
Asked Answered
H

3

6

I'm implementing network API with the combination of RxJava and Retrofit, and I use Realm as my database. I got it pretty much working but I'm wondering if it is the correct approach and flow of events. So, here is the RetrofitApiManager.

public class RetrofitApiManager {

    private static final String BASE_URL = "***";

    private final ShopApi shopApi;

    public RetrofitApiManager(OkHttpClient okHttpClient) {

        // GSON INITIALIZATION

        Retrofit retrofit = new Retrofit.Builder()
                .client(okHttpClient)
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create(gson))
                .baseUrl(BASE_URL)
                .build();

        shopApi = retrofit.create(ShopApi.class);
    }

    public Observable<RealmResults<Shop>> getShops() {
        return shopApi.getShops()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(response -> {
                    Realm realm = Realm.getDefaultInstance();
                    realm.executeTransaction(realm1 -> 
                            realm1.copyToRealmOrUpdate(response.shops));
                    realm.close();
                })
                .flatMap(response -> {
                    Realm realm = Realm.getDefaultInstance();
                    Observable<RealmResults<Shop>> results = realm.where(Shop.class)
                            .findAllAsync()
                            .asObservable()
                            .filter(RealmResults::isLoaded);
                    realm.close();
                    return results;
                });
    }
}

And here is the call to get RealmResults<Shop> inside a Fragment.

realm.where(Shop.class)
        .findAllAsync()
        .asObservable()
        .filter(RealmResults::isLoaded)
        .first()
        .flatMap(shops -> 
                shops.isEmpty() ? retrofitApiManager.getShops() : Observable.just(shops))
        .subscribe(
                shops -> initRecyclerView(),
                throwable -> processError(throwable));

Here are my questions:

  1. Is it a correct approach to chain events like in the example above or should I manage them in a different way?

  2. Is it OK to useRealm instance in getShops() method and close i there or would it be better to pass it as an argument and then manage it somehow? Although, this idea seems to be a bit problematic with threads and calling Realm.close() always at the right time.

Hierodule answered 27/6, 2016 at 11:31 Comment(0)
S
11

1) I would try to do as much as possible on the background thread, right now you are doing a lot of the work on the UI thread.

2)

  public Observable<RealmResults<Shop>> getShops() {
        return shopApi.getShops()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(response -> {
                    try(Realm realm = Realm.getDefaultInstance()) {
                        realm.executeTransaction(realm1 -> 
                            realm1.insertOrUpdate(response.shops));
                    } // auto-close
                })
                .flatMap(response -> {
                    try(Realm realm = Realm.getDefaultInstance()) {
                        Observable<RealmResults<Shop>> results = realm.where(Shop.class)
                            .findAllAsync()
                            .asObservable()
                            .filter(RealmResults::isLoaded);
                    } // auto-close
                    return results;
                });
    }

All Realm data is lazy-loaded, so it is only available while the Realm instance is open, so closing it after retrieving it has a high chance of not working. In your case though you are flat-mapping on the main thread, so most likely there is already an open instance there.

If you want you can use copyFromRealm() to get unmanaged data out that can be moved across threads and are not connected to Realm anymore, but they will also loose their live update features and take up more memory.

It would probably do this instead:

  public Observable<RealmResults<Shop>> getShops() {
        return shopApi.getShops()
                .subscribeOn(Schedulers.io())
                .doOnNext(response -> {
                    try(Realm realm = Realm.getDefaultInstance()) {
                        realm.executeTransaction(realm1 -> 
                            realm1.copyToRealmOrUpdate(response.shops));
                    } // auto-close
                })
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(response -> {
                    Observable<RealmResults<Shop>> results = realm.where(Shop.class)
                            .findAllAsync()
                            .asObservable()
                            .filter(RealmResults::isLoaded);
                    return results;
                });

Alternatively you can treat the network request as a side-effect and just depend on Realm notifying you when there is changes (better approach IMO as you separate network from DB access which is e.g. what the Repository pattern is about)

public Observable<RealmResults<Shop>> getShops() {
    // Realm will automatically notify this observable whenever data is saved from the network
    return realm.where(Shop.class).findAllAsync().asObservable()
            .filter(RealmResults::isLoaded)
            .doOnNext(results -> {
                if (results.size() == 0) {
                    loadShopsFromNetwork();
                }
            }); 
}

private void loadShopsFromNetwork() {
    shopApi.getShops()
            .subscribeOn(Schedulers.io())
            .subscribe(response -> {
                try(Realm realm = Realm.getDefaultInstance()) {
                    realm.executeTransaction(r -> r.insertOrUpdate(response.shops));
                } // auto-close
            });
}
Spontaneous answered 28/6, 2016 at 7:19 Comment(3)
I don't think returning the observable at the first code snippet of 2) would work, because the Observable belongs to a Realm instance that is closed right after obtaining the async query. But the last one is pretty goodHoyden
@ChristianMelchior @Hoyden What sort of detriments could arise from instantiating a new Realm object in doOnNext like this? Couldn't this get bad if doOnNext() were called a lot of times in a row? Also, what effects does this have on the objects being inserted/updated since they're being inserted via a newly instantiated Realm instead of a Realm that would be created on the main thread? (Genuinely asking for my knowledge).Bifilar
Found an answer to my questions here: realm.io/news/viraj-tank-safe-vs-deep-integrationBifilar
E
1

What Christian Melchior mentioned in his answer, makes perfect sense, and should solve the problem you are having at your hand, but down the line this approach may introduce other issue(s).

In a good architecture, all the major modules(or libraries) should be isolated from rest of the code. Since Realm, RealmObject or RealmResult can not be passed across threads it is even more important to make Realm & Realm related operations isolated from rest of the code.

For each of your jsonModel class, you should have a realmModel class and a DAO (Data Access Object). Idea here is that other than DAO class none of the class must know or access realmModel or Realm. DAO class takes jsonModel, converts to realmModel, performs read/write/edit/remove operations, for read operations DAO converts realmModel to jsonModel and returns with it.

This way it is easy to maintain Realm, avoid all Thread related issues, easy to test and debug.

Here is an article about Realm best practices with a good architechture https://medium.com/@Viraj.Tank/realm-integration-in-android-best-practices-449919d25f2f

Also a sample project demonstrating Integration of Realm on Android with MVP(Model View Presenter), RxJava, Retrofit, Dagger, Annotations & Testing. https://github.com/viraj49/Realm_android-injection-rx-test

Espinosa answered 1/7, 2016 at 11:45 Comment(13)
Doesn't this approach miss a big part of auto-updates of RealmObjects? Let's say I have Shop named "shopA" and I change it to "shopB" somewhere in the code. With RealmResults<Shop> as a list for my adpater it automatically updates the display with a correct RealmChangeListner inside the adapter. If I were to separate it with DAO and used jsonModel it wouldn't be that easy or am I missing something? Also about threading - I undestand this might be a concern but why would you try to pass RealmObjects between threads in the first place?Peterson
It is true that you miss out on Realm's way of auto-updates, but if you are using RxJava, it is much safer and cleaner to use RxJava to update the view based on updated jsonModel. Realm's biggest selling point is Speed, Simplicity & easy integration, but with Realm's way of auto-update we miss out on simplicity part and have to change already tested code to bring Realm aboard.Espinosa
Addition : I am not passing RealmObjects between threads, that is the whole point of this approach, and I also don't have to do any Realm operation on MainThread.Espinosa
In my opinion you lose too much when you introduce another layer. It is definitely project specific but when you have objects that can be sorted by multiple fields, should be auto-updated on changes and so on, you need to write a lot of boilerplate code just to provide Realm's functionality and additionaly you have lower performance. There is no need to pass RealmObjects between threads in this approach as well. If you want to edit an object or do something with it just pass its id to a Presenter and let it handle the rest.Peterson
A valid point indeed, let me see if I can come up with a different architecture which focuses on being able to do all Realm operations in an MVP, RxJava eco system.Espinosa
Hello, thanks for the discussion earlier, which inspired me to create a different approach, please have a look at this medium.com/@Viraj.Tank/… and let me know your opinion.Espinosa
@Michal Using the "safe approach" portrayed by the original answer, you lose out on auto-updates and you're also doing copying which means you lose on the zero-copy aspect of Realm and lazy query evaluation of RealmResults, resulting in both additional CPU usage (for the copying) and higher memory usage (all queries are executed immediately into a List, taking up space)Hoyden
@Hoyden - This is already mentioned in the post that you miss these essential features, and that is why there is also a DeepIntegration approach mentioned as an alternative. SafeIntegration is useful if production code is huge and developers want to gradually move to Realm.Espinosa
The post says it is easy to maintain Realm, avoid all Thread related issues, easy to test and debug, it doesn't say anything about the slower execution and higher memory usage and the lack of auto-updating. After all, you lose zero-copy; your list will contain every single element that the result returns, not just the one you actually index with get().Hoyden
Either you are reading some other post or you have not read the disadvantages section, Here is the list for you, 1. We miss out on Auto-update feature of Realm 2. We have to use our own boilerplate code for auto-update feature 3. We create a copy of the data, every time we make a Realm query (which is opposite of zero copy object store idea of Realm) 4. Realm queries are relatively slower since we are making a copyEspinosa
Oh, you've updated it based on the Reddit thread info. I haven't seen the update until now, sorry.Hoyden
Exactly, no problem, suggestions and discussions like these, made me read & experiment more and eventually understand Realm better.Espinosa
@VirajTank Hey, sorry for the late response. I've forgotten to reply. The approach in the second article is much more to my liking. It is essentially something I use with Realm. Keep up the good work!Peterson
H
1

In my case, I seem to have defined a query for the RealmRecyclerViewAdapter like this:

    recyclerView.setAdapter(new CatAdapter(getContext(),
            realm.where(Cat.class).findAllSortedAsync(CatFields.RANK, Sort.ASCENDING)));

And otherwise defined a condition for Retrofit with RxJava to download more stuff when the condition is met:

    Subscription downloadCats = Observable.create(new RecyclerViewScrollBottomOnSubscribe(recyclerView))
            .filter(isScrollEvent -> isScrollEvent || realm.where(Cat.class).count() <= 0)
            .switchMap(isScrollEvent -> catService.getCats().subscribeOn(Schedulers.io()))  // RETROFIT
            .retry()
            .subscribe(catsBO -> {
                try(Realm outRealm = Realm.getDefaultInstance()) {
                    outRealm.executeTransaction((realm) -> {
                        Cat defaultCat = new Cat();
                        long rank;
                        if(realm.where(Cat.class).count() > 0) {
                            rank = realm.where(Cat.class).max(Cat.Fields.RANK.getField()).longValue();
                        } else {
                            rank = 0;
                        }
                        for(CatBO catBO : catsBO.getCats()) {
                            defaultCat.setId(catBO.getId());
                            defaultCat.setRank(++rank);
                            defaultCat.setSourceUrl(catBO.getSourceUrl());
                            defaultCat.setUrl(catBO.getUrl());
                            realm.insertOrUpdate(defaultCat);
                        }
                    });
                }
            }, throwable -> {
                Log.e(TAG, "An error occurred", throwable);
            });

And this is for example a search based on an edit text's input:

    Subscription filterDogs = RxTextView.textChanges(editText)
                     .switchMap((charSequence) -> 
                           realm.where(Dog.class)
                                .contains(DogFields.NAME, charSequence.toString())
                                .findAllAsyncSorted(DogFields.NAME, Sort.ASCENDING)
                                .asObservable())
                     .filter(RealmResults::isLoaded) 
                     .subscribe(dogs -> realmRecyclerAdapter.updateData(dogs));
Hoyden answered 6/9, 2016 at 22:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.