Observable vs Flowable rxJava2
Asked Answered
M

3

152

I have been looking at new rx java 2 and I'm not quite sure I understand the idea of backpressure anymore...

I'm aware that we have Observable that does not have backpressure support and Flowable that has it.

So based on example, lets say I have flowable with interval:

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

This is going to crash after around 128 values, and thats pretty obvious I am consuming slower than getting items.

But then we have the same with Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

This will not crash at all, even when I put some delay on consuming it still works. To make Flowable work lets say I put onBackpressureDrop operator, crash is gone but not all values are emitted either.

So the base question I can not find answer currently in my head is why should I care about backpressure when I can use plain Observable still receive all values without managing the buffer? Or maybe from the other side, what advantages do backpressure give me in favour of managing and handling the consuming?

Mabellemable answered 29/10, 2016 at 20:19 Comment(1)
Which type to use? Observable or Flowable?Tekla
K
138

What backpressure manifests in practice is bounded buffers, Flowable.observeOn has a buffer of 128 elements that gets drained as fast as the dowstream can take it. You can increase this buffer size individually to handle bursty source and all the backpressure-management practices still apply from 1.x. Observable.observeOn has an unbounded buffer that keeps collecting the elements and your app may run out of memory.

You may use Observable for example:

  • handling GUI events
  • working with short sequences (less than 1000 elements total)

You may use Flowable for example:

  • cold and non-timed sources
  • generator like sources
  • network and database accessors
Kazukokb answered 30/10, 2016 at 7:24 Comment(4)
Since this has come up in another question - is it correct that more restricted types like Maybe, Single and Completable can always be used instead of Flowable when they are semantically appropriate?Stolid
Yes, Maybe, Single, and Completable are far too small to have any need of the backpressure concept. There is no chance a producer can emit items faster than they can be consumed, since 0–1 items will ever be produced or consumed.Auditory
Maybe I'm not right, but for me Examples of Flowable and Observable should be swapped.Gilcrest
I think in the question he is missing the backpressure strategy that we need to provide to the Flowable, which explains why the missing backpressure exception is thrown, also explains why this exception disappears after he applied .onBackpressureDrop(). And for Observable, since it does not have this strategy and cannot be provided one, it will simply fail later due to OOMSierra
T
136

Backpressure is when your observable (publisher) is creating more events than your subscriber can handle. So you can get subscribers missing events, or you can get a huge queue of events which just leads to out of memory eventually. Flowable takes backpressure into consideration. Observable does not. Thats it.

it reminds me of a funnel which when it has too much liquid overflows. Flowable can help with not making that happen:

with tremendous backpressure:

enter image description here

but with using flowable, there is much less backpressure :

enter image description here

Rxjava2 has a few backpressure strategies you can use depending on your usecase. by strategy i mean Rxjava2 supplies a way to handle the objects that cannot be processed because of the overflow (backpressure).

here are the strategies. I wont go through them all, but for example if you want to not worry about the items that are overflowed you can use a drop strategy like this:

observable.toFlowable(BackpressureStrategy.DROP)

As far as i know there should be a 128 item limit on the queue, after that there can be a overflow (backpressure). Even if its not 128 its close to that number. Hope this helps someone.

if you need to change the buffer size from 128 it looks like it can be done like this (but watch any memory constraints:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

in software developement usually back pressure strategy means your telling the emitter to slow down a bit as the consumer cannot handle the velocity your emitting events.

Tint answered 30/5, 2017 at 17:29 Comment(5)
I always thought backpressure is the name for a family of mechanisms which would let consumer notify producer to slow down...Ginder
Could be the case. YesTint
Are there any downsides to using a Flowable?Goeselt
These images are lying to me. Dropping events won't end up with "more money" on the bottom.Logsdon
@j2emanue, you are confusing the buffer size for operators and Flowable.buffer(int) operator. Please read the javadocs cafefully and fix your answer accordingly: reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.htmlBigotry
P
16

The fact that your Flowable crashed after emitting 128 values without backpressure handling doesn't mean it will always crash after exactly 128 values: sometimes it will crash after 10, and sometimes it will not crash at all. I believe this is what happened when you tried the example with Observable - there happened to be no backpressure, so your code worked normally, next time it may not. The difference in RxJava 2 is that there is no concept of backpressure in Observables anymore, and no way to handle it. If you're designing a reactive sequence that will probably require explicit backpressure handling - then Flowable is your best choice.

Preceptor answered 29/10, 2016 at 20:43 Comment(2)
Yes I have observed that sometimes it broke after less values, sometimes it did not. But again if for example I am handling only interval without backpressure would I expect some weird behaviour or issues?Mabellemable
If you're sure there's no way backpressure issues can occur in a specific Observable sequence - then I guess it's fine to ignore backpressure.Preceptor

© 2022 - 2024 — McMap. All rights reserved.