How can I perform LiveData transformations on a background thread?
H

7

26

I have a need to transform one type of data, returned by a LiveData object, into another form on a background thread to prevent UI lag.

In my specific case, I have:

  • MyDBRow objects (POJOs consisting of primitive longs and Strings);
  • a Room DAO instance emitting these via a LiveData<List<MyDBRow>>; and
  • a UI expecting richer MyRichObject objects (POJOs with the primitives inflated into e.g. date/time objects)

so I need to transform my LiveData<List<MyDBRow>> into a LiveData<List<MyRichObject>>, but not on the UI thread.

The Transformations.map(LiveData<X>, Function<X, Y>) method does this needed transformation, but I can't use this because it executes the transformation on the main thread:

Applies the given function on the main thread to each value emitted by source LiveData and returns LiveData, which emits resulting values.

The given function func will be executed on the main thread.

What is a clean way to make LiveData transformations occur:

  1. somewhere off the main thread, and
  2. only as needed (i.e. only when something is observing the intended transformation)?
Haun answered 19/11, 2017 at 7:22 Comment(0)
H
21
  • The original, “source” LiveData can be monitored by a new Observer instance.
  • This Observer instance, when source LiveData is emitted, can prepare a background thread to perform the needed transformation and then emit it via a new, “transformed” LiveData.
  • The transformed LiveData can attach the aforementioned Observer to the source LiveData when it has active Observers, and detach them when it doesn't, ensuring that the source LiveData is only being observed when necessary.

The question gives an example source LiveData<List<MyDBRow>> and needs a transformed LiveData<List<MyRichObject>>. A combined transformed LiveData and Observer could look something like this:

class MyRichObjectLiveData
        extends LiveData<List<MyRichObject>>
        implements Observer<List<MyDBRow>>
{
    @NonNull private LiveData<List<MyDBRow>> sourceLiveData;

    MyRichObjectLiveData(@NonNull LiveData<List<MyDBRow>> sourceLiveData) {
        this.sourceLiveData = sourceLiveData;
    }

    // only watch the source LiveData when something is observing this
    // transformed LiveData
    @Override protected void onActive()   { sourceLiveData.observeForever(this); }
    @Override protected void onInactive() { sourceLiveData.removeObserver(this); }

    // receive source LiveData emission
    @Override public void onChanged(@Nullable List<MyDBRow> dbRows) {
        // set up a background thread to complete the transformation
        AsyncTask.execute(new Runnable() {
            @Override public void run() {
                assert dbRows != null;
                List<MyRichObject> myRichObjects = new LinkedList<>();
                for (MyDBRow myDBRow : myDBRows) {
                    myRichObjects.add(MyRichObjectBuilder.from(myDBRow).build());
                }
                // use LiveData method postValue (rather than setValue) on
                // background threads
                postValue(myRichObjects);
            }
        });
    }
}

If multiple such transformations are needed, the above logic could be made generic like this:

abstract class TransformedLiveData<Source, Transformed>
        extends LiveData<Transformed>
        implements Observer<Source>
{
    @Override protected void onActive()   { getSource().observeForever(this); }
    @Override protected void onInactive() { getSource().removeObserver(this); }

    @Override public void onChanged(@Nullable Source source) {
        AsyncTask.execute(new Runnable() {
            @Override public void run() {
                postValue(getTransformed(source));
            }
        });
    }

    protected abstract LiveData<Source> getSource();
    protected abstract Transformed getTransformed(Source source);
}

and the subclass for the example given by the question could look something like this:

class MyRichObjectLiveData
        extends TransformedLiveData<List<MyDBRow>, List<MyRichObject>>
{
    @NonNull private LiveData<List<MyDBRow>> sourceLiveData;

    MyRichObjectLiveData(@NonNull LiveData<List<MyDBRow>> sourceLiveData) {
        this.sourceLiveData = sourceLiveData;
    }

    @Override protected LiveData<List<MyDBRow>> getSource() {
        return sourceLiveData;
    }

    @Override protected List<MyRichObject> getTransformed(List<MyDBRow> myDBRows) {
        List<MyRichObject> myRichObjects = new LinkedList<>();
        for (MyDBRow myDBRow : myDBRows) {
            myRichObjects.add(MyRichObjectBuilder.from(myDBRow).build());
        }
        return myRichObjects;
    }
}
Haun answered 9/1, 2018 at 1:42 Comment(5)
The downside of this solution, @Alex Peters, is that it always goes through the main thread. Unfortunately, LiveData doesn't have the possibility to change on which thread to run the result, unlike ReactiveX which does.Godolphin
Sorry, I don't think I understand. Are you saying that the Runnable passed to AsyncTask.execute is still going to run on the main thread? I'm not worried about any code outside the Runnable running on the main thread, as everything outside that Runnable is comparatively very lightweight.Haun
Ohh, I see—ReactiveX gives you things like observeOn and subscribeOn, which allow you to decide in one line which thread (or pool of threads!) your computations will run on. Very neat, and it would render most of my code above moot.Haun
AdyncTask...???? OMG!!! Google says to avoid using it....Denude
Thanks @james04. What's your recommended alternative?Haun
E
11

It may be eaiser to do using MediatorLiveData. Transformations.map() is implemented with MediatorLiveData under the hood.

@MainThread
public static <X, Y> LiveData<Y> mapAsync(
  @NonNull LiveData<X> source,
  @NonNull final Function<X, Y> mapFunction) {
  final MediatorLiveData<Y> result = new MediatorLiveData<>();
  result.addSource(source, new Observer<X>() {
    @Override
    public void onChanged(@Nullable final X x) {
      AsyncTask.execute(new Runnable() {
        @Override
        public void run() {
          result.postValue(mapFunction.apply(x));
        }
      });
    }
  });
  return result;
}
Electrojet answered 18/8, 2018 at 4:27 Comment(1)
Could you please give a sample implementation of mapFunction?Haun
S
9

Listen to a MediatorLiveData<T> that listens to two other LiveData<T>s.

For example:

val exposed: LiveData<List<T>> = MediatorLiveData<List<T>>().apply {
    addSource(aLiveDataToMap) { doWorkOnAnotherThread(it) }
    addSource(aMutableLiveData) { value = it }
}

private fun doWorkOnAnotherThread(t: T) {
    runWorkOnAnotherThread {
        val t2 = /* ... */
        aMutableLiveData.postValue(t2)
    }
}

Whenever aLiveDataToMap changes, it will trigger doWorkOnAnotherThread() which will then set the value of aMutableLiveData, which finally sets to value of exposed, which a lifecycle-owner will be listening to. Replace Ts with your desired type.

Stabler answered 27/4, 2020 at 0:3 Comment(0)
C
3

Thanks to @jaychang0917

Kotlin Form :

@MainThread
fun <X, Y> mapAsync(source: LiveData<X>, mapFunction: androidx.arch.core.util.Function<X, Y>): LiveData<Y> {
    val result = MediatorLiveData<Y>()
    result.addSource(source) { x -> AsyncTask.execute { result.postValue(mapFunction.apply(x)) } }
    return result
}
Clavicorn answered 3/1, 2021 at 12:59 Comment(2)
This one works, but I would have preferred using Executor instead of AsyncTask.Meatman
Why so many people are still using AsynTasks, Executors and other low level multithreading techniques when there are Rx and Coroutines? Would appreciate if someone explains the reason behind this.Ammadas
P
2

A solution with coroutines:

class RichLiveData(val rows: LiveData<List<MyDBRow>>) : LiveData<List<MyRichObject>>(),
        CoroutineScope by CoroutineScope(Dispatchers.Default) {

    private val observer = Observer<List<MyDBRow>> { rows ->
        launch {
            postValue(/*computationally expensive stuff which returns a List<MyRichObject>*/)
        }
    }

    override fun onActive() {
        rows.observeForever(observer)
    }

    override fun onInactive() {
        rows.removeObserver(observer)
    }
}
Pity answered 6/8, 2019 at 15:5 Comment(1)
No, because it runs with Dispatches.Default which is designed for computationally demanding code.Pity
S
2

Another possible solution with coroutines:

object BackgroundTransformations {

    fun <X, Y> map(
        source: LiveData<X>,
        mapFunction: (X) -> Y
    ): LiveData<Y> {
        val result = MediatorLiveData<Y>()

        result.addSource(source, Observer<X> { x ->
            if (x == null) return@Observer
            CoroutineScope(Dispatchers.Default).launch {
                result.postValue(mapFunction(x))
            }
        })

        return result
    }

    fun <X, Y> switchMap(
        source: LiveData<X>,
        switchMapFunction: (X) -> LiveData<Y>
    ): LiveData<Y> {
        val result = MediatorLiveData<Y>()
        result.addSource(source, object : Observer<X> {
            var mSource: LiveData<Y>? = null

            override fun onChanged(x: X) {
                if (x == null) return

                CoroutineScope(Dispatchers.Default).launch {
                    val newLiveData = switchMapFunction(x)
                    if (mSource == newLiveData) {
                        return@launch
                    }
                    if (mSource != null) {
                        result.removeSource(mSource!!)
                    }
                    mSource = newLiveData
                    if (mSource != null) {
                        result.addSource(mSource!!) { y ->
                            result.setValue(y)
                        }
                   }
                }
            }
        })
        return result
    }

}

Hope it helps

Sidonie answered 16/1, 2020 at 14:30 Comment(1)
IllegalStateException: Cannot invoke observeForever on a background threadAerometry
F
0

How about like this:

@Query("SELECT * FROM " + PeriodicElement.TABLE_NAME)
abstract fun getAll(): LiveData<List<PeriodicElement>>

fun getAllElements(): LiveData<HashMap<String, PeriodicElement>> {
    return Transformations.switchMap(getAll(), ::transform)
}

private fun transform(list: List<PeriodicElement>): LiveData<HashMap<String, PeriodicElement>> {
    val map = HashMap<String, PeriodicElement>()
    val liveData = MutableLiveData(map)

    AsyncTask.execute {
        for (p in list) {
            map[p.symbol] = p

            if (!liveData.hasObservers()) {
                //prevent memory leak
                break
            }
        }
        liveData.postValue(map)
    }
    return liveData
}
Frenetic answered 15/2, 2020 at 13:12 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.