Difference between Infinite Java Stream and Reactor Flux
Asked Answered
L

2

15

I am trying to figure out the conceptual differences between an infinite Stream and an infinite Flux respectively (if there are any).

For that matter, I have come up with the following examples for an infinite Stream/Flux

@Test
public void infinteStream() {

  //Prints infinite number of integers
  Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1);
 
  infiniteStream.forEach(System.out::println);
}

@Test
public void infiniteFlux()  {
    
   //Prints infinite number of date strings (every second)
   Flux<LocalDateTime> localDateTimeFlux = Flux.interval(Duration.ofSeconds(1))
            .map(t -> LocalDateTime.now());

    localDateTimeFlux.subscribe(t -> System.out.println(t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"))));
}

Regarding these examples, I have the question: Is there an analog for infinteStream() with Flux (and for infinteFlux() with Stream respectively)? And, more generally, are there any differences between an infinite Stream and Flux?

Laparotomy answered 15/10, 2018 at 15:38 Comment(0)
A
29

Stream and Flux are quite different:

  • Stream is single use, vs. you can subscribe multiple times to Flux
  • Stream is pull based (consuming one element calls for the next one) vs. Flux has an hybrid push/pull model where the publisher can push elements but still has to respect backpressure signaled by the consumer
  • Stream are synchronous sequences vs. Flux can represent asynchronous sequences

In example you're generating an infinite sequence of values with Stream, they're produced and consumed as fast as possible. In your Flux example, you're producing values at a fixed interval (something I'm not sure you can do with Stream). With Flux, you can also Flux.generate sequences without intervals, just like your Stream example.

In general, you could consider Flux as a mix of Stream + CompletableFuture, with:

  • a lot of powerful operators
  • backpressure support
  • control over publisher and subscriber behavior
  • control over the notion of time (buffering windows of values, adding timeouts and fallbacks, etc)
  • something tailored for async sequences fetched over the network (from a database or a remote Web API)
Aseptic answered 15/10, 2018 at 19:37 Comment(2)
Dear Brian, thanks for elucidating the differences! I will try out your suggestions with Flux.generate :-)Laparotomy
in what case you should use streams over flux?Nancinancie
L
2

For reference, in the meantime, I have come up with a Stream-Solution for infiniteFlux():

@Test 
public void infiniteFluxWithStream()  {

    Stream<Integer> infiniteStream = Stream.iterate(0, i -> i+1).peek(x->{
    LocalDateTime t = LocalDateTime.now();
    t.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
    System.out.println(t);
    });

    infiniteStream.forEach(x->{
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }); 

}

This is indeed ugly. However, it shows that in (very) principle, it is possible to rewrite simple Flux-Examples in terms of Streams.

Laparotomy answered 16/10, 2018 at 18:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.