How to do a groupBy and collect using RxJava and Kotlin?
Asked Answered
P

3

8

I have got Observable<Rates> and Rate is just a simple object:

Rate(val value:String){}
Rates(val rates: List<Rate>)

and i wanna change that Observable<Rates> into Observable<HashMap<String,Long>.

so for example for rates Rates(arrayOf(Rate("1"),Rate("2"), Rate("3"),Rate("3"), Rate("2"),Rate("2"))) i expect result:

(1 -> 1)
(2 -> 3)
(3 -> 2)
(4 -> 0)
(5 -> 0)

I start creating something like that :

service.getRates()
        .flatMap {it-> Observable.from(it.rates) }
        .filter { !it.value.isNullOrEmpty() }
        .groupBy {it -> it.value}
        .collect({ HashMap<String,Long>()}, { b, t -> b.put(t.key, t.count???)}

but I am stuck here and i do not know count all values? and i do not know how to add empty values (0) if there is no 5 of 4. Is there any way to do this using rx?

Polyethylene answered 31/10, 2016 at 2:48 Comment(0)
S
5

Please look at the comments in the code for answers to your question.

import rx.Observable

fun main(args: Array<String>) {
    val service = Service()

    // This adds all keys with each key mapped to zero
    val referenceKeyCounts = Observable
        .just("1", "2", "3", "4", "5")
        .map { it to 0 }

    val keyCountsFromService = service.getRates()
        .flatMap { Observable.from(it.rates) }
        .filter { !it.value.isNullOrEmpty() }
        .map { it.value to 1 } // map each occurrence of key to 1

    Observable.concat(referenceKeyCounts, keyCountsFromService)
        .groupBy { it.first }
        .flatMap { group ->  // this converts GroupedObservable to final values
            group.reduce(0, { acc, pair -> acc + pair.second }) // add instead of counting
                .map { group.key to it }
        }
        .subscribe(::println)

}

class Service {
    fun getRates(): Observable<Rates> = Observable.just(Rates(listOf(
        Rate("1"), Rate("2"), Rate("3"), Rate("3"), Rate("2"), Rate("2")
    )))
}

class Rate(val value: String)

class Rates(val rates: List<Rate>)
Sokil answered 31/10, 2016 at 8:2 Comment(0)
E
4

The trick is to use count on the GroupedObservable since it only emits single value when the source observable completes:

enter image description here

From there it follows:

rates
  .flatMap { Observable.from(it.rates) }
  .filter { !it.value.isNullOrEmpty() }
  .groupBy { it.value }
  .flatMap { group -> group.count().map { group.key to it } } // list "1"->1, "2"->3, ...
  .mergeWith(Observable.from((1..5).map { it.toString() to 0 })) // defaults "4"->0
  .reduce(mutableMapOf<String, Int>()) { acc, cur ->
      acc.apply {
        val (key, count) = cur  
        this[key] = (this[key] ?: 0) + count // add counts
      }
  }.subscribe { countedRates -> 
    println(countedRates)
}
Eutectoid answered 31/10, 2016 at 6:14 Comment(0)
M
1

I think it is more about functional programming rather than a RxJava related Question

Implement a mapping function from Rates -> Map<String,Int>.

Tricks: Merge two list of Pair<String,Int> to form a Map<String,Int>

val ratesToMapWithEmptyValues: (Rates) -> Map<String, Int> = { source ->

  //TODO: Just for demo
  val validRatesValue = arrayOf("1","2","3","4","5")

  mapOf(
    *validRatesValue.map { it to 0 }.toTypedArray(),
    *source.rates.groupBy(Rate::value).mapValues { it.value.size }.toList().toTypedArray()
  )

}

Apply the function in Observable.map

service.getRates()
        .map(ratesToMapWithEmptyValues)
Mcgrody answered 31/10, 2016 at 8:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.