Waiting for ParallelFlux completion
Asked Answered
J

1

10

I have created a ParallelFlux and then used .sequential(), expecting at that point I can count or "reduce" the results of the parallel computations. The problem seems to be that the parallel threads are fired off and nothing is waiting for them.

I have sort of gotten things to work using a CountDownLatch but I don't think I should have to do that.

TL;DR - I cannot get a result to print out for this code:

    Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .subscribe(System.out::println);
Jehoshaphat answered 23/3, 2020 at 21:28 Comment(2)
from where are you executing that code? could it be that you run that in a main or a unit test, and the JVM exits before the computation terminates (due to running in a separate elastic thread)?Fetter
I have as part of a simple main.Jehoshaphat
S
14

When executing that code in a main, this is a good demonstration of the asynchronous nature of things. The operation runs on a thread from the elastic scheduler, and subscribe triggers the asynchronous processing so it returns immediately.

There are two ways of synchronizing the end of the application with the end of the Flux:

print in doOnNext and use blockLast()

The block* methods are typically used in main and tests, where there is no choice but to switch back to a blocking model (ie because otherwise the test/main would exit before the end of the processing).

We switch the "side effect" of printing each emitted item in a doOnNext, which is dedicated to that sort of use cases. Then we block up until the completion of the flux, with blockLast().

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doOnNext(System.out::println)
    .blockLast();

print in subscribe and use a CountDownLatch in doFinally

This one is a bit more convoluted but allows you to actually use subscribe if you want to explore how that method works.

We just add a doFinally, which triggers on any completion signal (cancellation, error or completion). We use a CountDownLatch to block the main thread, which will be asynchronously counted down from within the doFinally.

CountDownLatch latch = new CountDownLatch(1);

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doFinally(signal -> latch.countDown())
    .subscribe(System.out::println);

latch.await(10, TimeUnit.SECONDS);
Spy answered 25/3, 2020 at 8:39 Comment(1)
Thanks Simon, these look good. I was trying to use CountDownLatch to count all (from 100000), but doing it in .doFinally makes more sense. BlockLast should work to but I like to avoid that because of issues with porting to WebFlux. I probably don't need either solution with WebFlux because the client will do the blocking but I still wanted to have something working in a simple main. Much appreciated.Jehoshaphat

© 2022 - 2024 — McMap. All rights reserved.