How to filter a list inside Kotlin Flow
Asked Answered
U

1

17

I'm replacing my current implementation using RxJava to Coroutines and Flow. I'm having some trouble using some Flow operators.

I'm trying to filter the list of items inside a Flow before providing it to be collected. (Flow<List<TaskWithCategory>>)

Here is the example on Rx2:

        repository.findAllTasksWithCategory()
            .flatMap {
                Flowable.fromIterable(it)
                    .filter { item -> item.task.completed }
                    .toList()
                    .toFlowable()

In the implementation above, I provide a list of TaskWithCategory filtering by Tasks that are already completed.

How can I achieve this using Flow?

Unruly answered 22/12, 2019 at 14:30 Comment(0)
M
21

Given that the only operator in use is filter the inner flowable is unnecessary, making the flow implementation quite straightforward:

repository.findAllTasksWithCategoryFlow()
    .map { it.filter { item -> item.task.completed } }

If the inner transformation is more involved (lets use transform: suspend (X) -> TaskWithCategory):

repository.findAllTasksWithCategoryFlow()
    // Pick according to desired backpressure behavior
    .flatMap(Latest/Concat/Merge) {
        // Scope all transformations together
        coroutineScope {
            it.map { item ->
                // Perform transform in parallel
                async {
                    transform(item)
                }
            }.awaitAll() // Return when all async are finished.
        }
    }
Mata answered 22/12, 2019 at 16:25 Comment(3)
flatMapMerge works much simpler than you've shown, you need to provide a Flow-returning function and it will automatically parallelize the collection of these flows. tasksFlow.flatMapMerge { flow { emit(it.map { transform(it) }) } }Skindeep
That would process different lists in parallel, while my version processes all items in a single list in parallel.Mata
True, but processing each individual item in a separate coroutine is usually detrimental to performance, unless processing each item is really heavy. Also, in that case i don't think you're using flatMapMerge correctly, you make no use of the automatic concurrency it provides. You can get the same result with a plain map or flatMap.Skindeep

© 2022 - 2024 — McMap. All rights reserved.