I'm going a little crazy here. I'm trying to create an Observable<BigDecimal>
extension function (against RxJava 2.x) to emit the average of the emissions, but I'm getting a compile error with the Single.zip()
function. Does anybody have any ideas what I'm doing wrong? I tried to be explicit with all my types too and that didn't work...
import io.reactivex.Observable
import io.reactivex.Single
import java.math.BigDecimal
fun Observable<BigDecimal>.sum() = reduce { total, next -> total + next }
//compile error
fun Observable<BigDecimal>.average() = publish().autoConnect(2).let {
Single.zip(it.sum().toSingle(), it.count()) {
sum, count -> sum / BigDecimal.valueOf(count)
}
}