Transactions in Android Room w/ RxJava2
Asked Answered
I

2

6

A requirement of my application is to allow the user to progress through multiple steps, then upon completion write values to the database based off of entries in each step. Each step in the UI may contribute to the operations that will need to be written to the database. The data may be in multiple tables and pertain to different rows in those tables. If any of the database operations fail, then the entire operation should fail.

I initially considered loading all of the data into memory, manipulating it, then simply calling the update methods in every possible entity (with a conflict strategy of REPLACE), but there could be an extremely large amount of data in memory.

I figured that I would instead be able to assemble a List, where each Fragment in the display contributes one or more Completables, then execute those sequentially using Completable.concat() at the end of the UI flow. It would look something like below:

    Completable one = Completable.fromAction(() -> Log.w(LOG_TAG, "(1)")).delay(1, TimeUnit.SECONDS);
    Completable two = Completable.fromAction(() -> Log.w(LOG_TAG, "(2)")).delay(2, TimeUnit.SECONDS);
    Completable three = Completable.fromAction(() -> Log.w(LOG_TAG, "(3)")).delay(3, TimeUnit.SECONDS);
    Completable four = Completable.fromAction(() -> Log.w(LOG_TAG, "(4)")).delay(3, TimeUnit.SECONDS);

    Completable.concatArray(one, two, three, four)
            .doOnSubscribe(__ -> {
                mRoomDatabase.beginTransaction();
            })
            .doOnComplete(() -> {
                mRoomDatabase.setTransactionSuccessful();
            })
            .doFinally(() -> {
                mRoomDatabase.endTransaction();
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe();

The Completables would actually be wrappers around Room DAO insert/update/delete methods. I would likely also perform a UI operation upon completion which is why I'm observing on the main thread.

When I execute this code, I get these logs:

W/MyPresenter: Begin transaction.
W/MyPresenter: (1)
W/MyPresenter: (2)
W/MyPresenter: (3)
W/MyPresenter: (4)
W/MyPresenter: Set transaction successful.
W/MyPresenter: End transaction.
W/System.err: java.lang.IllegalStateException: Cannot perform this operation because there is no current transaction.
W/System.err:     at android.database.sqlite.SQLiteSession.throwIfNoTransaction(SQLiteSession.java:915)
W/System.err:     at android.database.sqlite.SQLiteSession.endTransaction(SQLiteSession.java:398)
W/System.err:     at android.database.sqlite.SQLiteDatabase.endTransaction(SQLiteDatabase.java:524)
W/System.err:     at android.arch.persistence.db.framework.FrameworkSQLiteDatabase.endTransaction(FrameworkSQLiteDatabase.java:88)
W/System.err:     at android.arch.persistence.room.RoomDatabase.endTransaction(RoomDatabase.java:220)
W/System.err:     at ...lambda$doTest$22$MyPresenter(MyPresenter.java:490)

Why is the transaction gone by the time I reach doFinally? I also welcome any comments on the quality or feasibility of this approach as I'm quite new to RxJava and Room.

Impropriate answered 1/3, 2018 at 21:8 Comment(0)
I
13

By logging the current thread and perusing Android developer documentation I think I might finally understand what I'm doing wrong.

1) Transactions must occur on the same thread. That's why it's telling me there's no transaction; I'm apparently bouncing between threads.

2) The doOnSubscribe, doOnComplete, and doFinally methods are side effects and therefore not part of the actual stream itself. That means they will not occur on the scheduler I subscribe on. They will occur on the Scheduler I observe on.

3) Because I want to receive results on the UI thread upon completion, but want the side effects to occur on a background thread, I need to change the position in which I observe on.

Completable.concatArray(one, two, three, four)
                .observeOn(Schedulers.single()) // OFF UI THREAD
                .doOnSubscribe(__ -> {
                    Log.w(LOG_TAG, "Begin transaction. " + Thread.currentThread().toString());
                    mRoomDatabase.beginTransaction();
                })
                .doOnComplete(() -> {
                    Log.w(LOG_TAG, "Set transaction successful."  + Thread.currentThread().toString());
                    mRoomDatabase.setTransactionSuccessful();
                })
                .doFinally(() -> {
                    Log.w(LOG_TAG, "End transaction."  + Thread.currentThread().toString());
                    mRoomDatabase.endTransaction();
                })
                .subscribeOn(Schedulers.single())
                .observeOn(AndroidSchedulers.mainThread()) // ON UI THREAD
                .subscribeWith(new CompletableObserver() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.w(LOG_TAG, "onSubscribe."  + Thread.currentThread().toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.w(LOG_TAG, "onComplete."  + Thread.currentThread().toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(LOG_TAG, "onError." + Thread.currentThread().toString());
                    }
                });

The logging statements now look like this:

W/MyPresenter: onSubscribe.Thread[main,5,main]
W/MyPresenter: Begin transaction. Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: (1)
W/MyPresenter: (2)
W/MyPresenter: (3)
W/MyPresenter: (4)
W/MyPresenter: Set transaction successful.Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: End transaction.Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: onComplete.Thread[main,5,main]

I believe this accomplishes what I'm after but it remains to be seen if the step-by-step assembly of Room-based RxJava Completables will work out. I will keep an eye out for any comments/answers and may report back for posterity.

Impropriate answered 2/3, 2018 at 6:43 Comment(1)
Thanks for sharing the answer to your problem!Mouthy
S
1

I managed to do transactions allowing operations from different tables this way (example with Dagger to inject database):

class RxRoomTransaction @Inject constructor(private val db : AppDatabase) {

    fun run(fn : () -> Unit) : Completable {
        return Completable.fromAction {
            try {
                db.beginTransaction()
                fn.invoke()
                db.setTransactionSuccessful()
            } catch (t : Throwable) {
                // Catch everything, including InterruptedException which is invoked on dispose
            } finally {
                try {
                    // Double check to catch possible exception caused by endTransaction (shouldn't occur)
                    db.endTransaction()
                } catch (t : Throwable) {
                }
            }
        }
    }

}

Calling it this way:

rxRoomTransaction.run {
    dao1.insertAll(data1)
    dao2.insert(data2)
    dao3.clear()
}

DAO methods are NOT returning RxJava objects:

@Dao
interface Dao3 {
    @Query("DELETE FROM table3")
    fun clear()
}
Stidham answered 2/7, 2020 at 23:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.