Spring Reactor Merge vs Concat
Asked Answered
L

4

12

I´m playing with Spring reactor, and I cannot see any differences between concat and merge operator

Here's my example

    @Test
    public void merge() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.merge(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);
    }

    @Test
    public void concat() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.concat(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);    
}

Both behave exactly the same. Can someone explain the difference between the two operations?

Lantz answered 27/1, 2018 at 17:23 Comment(0)
D
19

The essential difference between merge and concat is that in merge, both streams are live. In case of concat, first stream is terminated and then the other stream is concatenated to it.

Concat enter image description here


Merge enter image description here

Dogtired answered 27/1, 2018 at 17:50 Comment(5)
But then how it’s possible that the order of emission in the merge is always the same?Lantz
What is the sequence?Dogtired
Flux 1, 2 and 3 all the timeLantz
And is there any time difference?Dogtired
Nope, never, that’s the reason of my questionLantz
P
13

The difference is already mentioned in the API docs that while concat first reads one flux completely and then appends the second flux to that, merge operator doesn't guarantee the sequence between the two flux.

In order to see the difference, modify your merge() code as below.

e.g. sample code below

//Flux with Delay
@Test
public void merge() {
    Flux<String> flux1 = Flux.just("Hello", "Vikram");

    flux1 = Flux.interval(Duration.ofMillis(3000))
    .zipWith(flux1, (i, msg) -> msg);
    
    
    Flux<String> flux2 = Flux.just("reactive");
    flux2 = Flux.interval(Duration.ofMillis(2000))
            .zipWith(flux2, (i, msg) -> msg);
    
    Flux<String> flux3 = Flux.just("world");
    Flux.merge(flux1, flux2, flux3)
            .subscribe(System.out::println);

    try {
        Thread.sleep(8000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

As you modify the Flux.interval duration, currently set as 3000 milliseconds you will see that the output with merge() keeps changing. But with concat(), the output will be always same.

Picardi answered 19/8, 2018 at 17:45 Comment(2)
Hi @Vikram Rawat, the output of your example is reactive,world,Hello,Vikram. Why is not shown first the word "world" if it doesn't have any sleep?Demibastion
Hi @AleGallagher, good catch.. I have edited the code.. if you run it the output will be world, reactive, Hello, Vikram. The earlier code was calling a sleep in doOnNext, which was putting the subscriber to sleep rather than delaying the emission of "reactive" from flux 2. So, I changed the flux2 to now delay the emission of the data ("reactive"). Hope it makes sense.Picardi
M
3

Another noteworthy difference is that all variations of concat subscribe to the second stream lazily, only after the first stream has terminated.

Whereas all variations of merge subscribe to the publishers eagerly (all publishers are subscribed together)

Running the below code highlights this aspect:

//Lazy subscription of conact
Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(500)),
                Flux.just(10, 20, 30, 40).delayElements(Duration.ofMillis(500)))
                .subscribe(System.out::println, System.out::println);



//Eager subscription of the merge. Also, try mergeSequential.
Flux.merge(Flux.range(500, 3).delayElements(Duration.ofMillis(500)),
                Flux.range(-500, 3).delayElements(Duration.ofMillis(300)))
                .subscribe(System.out::println, System.out::println);
Monosome answered 30/6, 2021 at 18:4 Comment(0)
C
0
1st example:  Flux<String> stringFlux= Flux.just("a","b").delayElements(Duration.ofMillis(200));
        Flux<String> stringFlux1= Flux.just("c","d").delayElements(Duration.ofMillis(300));
        Flux<String> stringFlux2=Flux.merge(stringFlux,stringFlux1);
                stringFlux2.subscribe(System.out::println);
                Thread.sleep(4000);
output: a
        c
        b
        d
2nd example: Flux<String> stringFlux= Flux.just("a","b");
        Flux<String> stringFlux1= Flux.just("c","d");
        Flux<String> stringFlux2=Flux.merge(stringFlux,stringFlux1);
                stringFlux2.subscribe(System.out::println);
                Thread.sleep(4000);
output: a a c
        c b a
        b c d 
        d d b
conclusion: if you dont give delay to the elements then publishers are subscribed eagerly and output would be random as in 2nd example case.
If you specify delay to the elements then in which publisher you provide less amount of time will subscribed first and emit element and then other publisher emit elements as in 1st example.
It totally depends upon time for example output for below would be :
c,d,a,b because there is big diff between 2 seconds and 300 milliseconds. But in 1st example there is less diff between milliseconds so output would be in **interleaving fashion**.
3rd example :
Flux<String> stringFlux= Flux.just("a","b").delayElements(Duration.ofSeconds(2));
Flux<String> stringFlux1= Flux.just("c","d").delayElements(Duration.ofMillis(300));
Cathexis answered 12/9, 2023 at 8:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.