I have a single source of data items and I want to share that Flux with multiple downstream streams.
It is very similar to the example in the reference guide,
but I feel that example cheats by calling .connect()
manually.
Specifically, I do not know how many downstream subscribers there will be, and I do not have control to call .connect()
"at the end".
Consumers should be able to subscribe, but not trigger the pulling of data immediately. And then somewhere in the future when the data is actually needed they will pull as necessary.
Additionally, the source is sensitive to the consumption so it cannot be re-fetched.
To add to that, it is going to be very big so buffering and replaying is not an option.
Ideally, on top of all that, the whole thing happens in one thread, so no concurrency or waiting.
(Giving a very small wait time for subscribers to join is not desirable)
I was able to achieve nearly the desired effect for Monos (single end result values):
public class CoConsumptionTest {
@Test
public void convenientCoConsumption() {
// List used just for the example:
List<Tuple2<String, String>> source = Arrays.asList(
Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
);
// Source which is sensitive to consumption
AtomicInteger consumedCount = new AtomicInteger(0);
Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();
@Override
public boolean hasNext() {
return sourceIterator.hasNext();
}
@Override
public Tuple2<String, String> next() {
Tuple2<String, String> e = sourceIterator.next();
consumedCount.incrementAndGet();
System.out.println("Audit: " + e);
return e;
}
};
// Logic in the service:
Flux<Tuple2<String, String>> f = Flux.fromIterable(() -> statefulIterator);
ConnectableFlux<Tuple2<String, String>> co = f.publish();
Function<Predicate<Tuple2<String, String>>, Mono<Tuple2<String, String>>> findOne = (highlySelectivePredicate) ->
co.filter(highlySelectivePredicate)
.next() //gives us a Mono
.toProcessor() //makes it eagerly subscribe and demand from the upstream, so it wont miss emissions
.doOnSubscribe(s -> co.connect()); //when an actual user consumer subscribes
// Subscribing (outside the service)
assumeThat(consumedCount).hasValue(0);
Mono<Tuple2<String, String>> a2 = findOne.apply(select("a", "2"));
Mono<Tuple2<String, String>> b1 = findOne.apply(select("b", "1"));
Mono<Tuple2<String, String>> c1 = findOne.apply(select("c", "1"));
assertThat(consumedCount).hasValue(0);
// Data is needed
SoftAssertions softly = new SoftAssertions();
assertThat(a2.block()).isEqualTo(Tuples.of("a", "2"));
softly.assertThat(consumedCount).hasValue(4);
assertThat(b1.block()).isEqualTo(Tuples.of("b", "1"));
softly.assertThat(consumedCount).hasValue(4);
assertThat(c1.block()).isEqualTo(Tuples.of("c", "1"));
softly.assertThat(consumedCount).hasValue(4);
softly.assertAll();
}
private static Predicate<Tuple2<String, String>> select(String t1, String t2) {
return e -> e.getT1().equals(t1) && e.getT2().equals(t2);
}
}
Question: I want to know how to achieve this for Flux results, i.e. for multiple values after the filtering is applied, not just the first/next. (Still demanding only as much as necessary)
(Tried naively replacing .toProcessor()
with .publish().autoConnect(0)
but did not succeed)
Edit 1: While buffering of the source is not allowed, the filters that come as parameters are expected to be highly selective, so buffering after the filtering is okay.
Edit 2:
Coming back to this after a while, I tried my posted example on a newer version of reactor
and it actually works.
io.projectreactor:reactor-bom:Californium-SR8
> io.projectreactor:reactor-core:3.2.9.RELEASE