Live count emitted elements in rx
Asked Answered
P

2

5

Is there any way of counting the amount of elements that have already been processed in a stream in RxAndroid?

I am having something like that:

Observable.fromArray(new String[]{"these", "are", "my", "values", "."})
            .map(s -> doSomeCoolStuff(s))
            // ...
            .subscribe(amountOfProcessedItems -> Log.d("test", "" + amountOfProcessedItems));

And I am looking for something so that my output would look like 1 2 3 4 5, basically after each item count the amount of items that have already been emitted.

Parmenides answered 18/3, 2017 at 11:19 Comment(2)
I understood that there is a mapWithIndex method specifically to do what you ask. Just replace your map call.Avon
FYI: mapWithIndex isn't part of RxJava: it's in github.com/davidmoten/rxjava-extrasSage
O
11

Just count the elements:

AtomicInteger counter = new AtomicInteger();
Observable.fromArray(new String[]{"these", "are", "my", "values", "."})
        .map(s -> doSomeCoolStuff(s))
        // ...
        .subscribe(dummy -> Log.d("test", "" + counter.incrementAndGet()));

Edit: If you just want to convert elements to increasing integers, here's how:

sourceOservable
.zipWith(Observable.range(0, Integer.MAX_VALUE), (any, counter) -> counter)
.whatever(...)
Ostracoderm answered 18/3, 2017 at 12:11 Comment(3)
:D Okay that was simple. But I would really prefer to convert this into a stream of integers and not do the counting after subscribing. Is that possible at all?Parmenides
Look at Observable.reduce()Shana
Just remember this might lockup your thread, so I would subscribe off of your Main/UI thread.Ametropia
C
4

You can use scan as well

        Observable
            .just("One","Two","Three","Four")
            .scan(1,(counter,string) -> ++counter)
            .subscribe(item -> Log.i(TAG,item.toString()));
Chorizo answered 18/7, 2018 at 17:54 Comment(1)
This solution only works for Flowable and Observable streams.Loesceke

© 2022 - 2024 — McMap. All rights reserved.