Project Reactor: ConnectableFlux auto-connecting on demand
Asked Answered
S

1

6

I have a single source of data items and I want to share that Flux with multiple downstream streams.

It is very similar to the example in the reference guide, but I feel that example cheats by calling .connect() manually. Specifically, I do not know how many downstream subscribers there will be, and I do not have control to call .connect() "at the end". Consumers should be able to subscribe, but not trigger the pulling of data immediately. And then somewhere in the future when the data is actually needed they will pull as necessary.

Additionally, the source is sensitive to the consumption so it cannot be re-fetched.
To add to that, it is going to be very big so buffering and replaying is not an option.

Ideally, on top of all that, the whole thing happens in one thread, so no concurrency or waiting.
(Giving a very small wait time for subscribers to join is not desirable)

I was able to achieve nearly the desired effect for Monos (single end result values):

public class CoConsumptionTest {
    @Test
    public void convenientCoConsumption() {
        // List used just for the example:
        List<Tuple2<String, String>> source = Arrays.asList(
                Tuples.of("a", "1"), Tuples.of("b", "1"), Tuples.of("c", "1"),
                Tuples.of("a", "2"), Tuples.of("b", "2"), Tuples.of("c", "2"),
                Tuples.of("a", "3"), Tuples.of("b", "3"), Tuples.of("c", "3")
        );

        // Source which is sensitive to consumption
        AtomicInteger consumedCount = new AtomicInteger(0);
        Iterator<Tuple2<String, String>> statefulIterator = new Iterator<Tuple2<String, String>>() {
            private ListIterator<Tuple2<String, String>> sourceIterator = source.listIterator();

            @Override
            public boolean hasNext() {
                return sourceIterator.hasNext();
            }

            @Override
            public Tuple2<String, String> next() {
                Tuple2<String, String> e = sourceIterator.next();
                consumedCount.incrementAndGet();
                System.out.println("Audit: " + e);
                return e;
            }
        };

        // Logic in the service:
        Flux<Tuple2<String, String>> f = Flux.fromIterable(() -> statefulIterator);
        ConnectableFlux<Tuple2<String, String>> co = f.publish();

        Function<Predicate<Tuple2<String, String>>, Mono<Tuple2<String, String>>> findOne = (highlySelectivePredicate) ->
                co.filter(highlySelectivePredicate)
                        .next() //gives us a Mono
                        .toProcessor() //makes it eagerly subscribe and demand from the upstream, so it wont miss emissions
                        .doOnSubscribe(s -> co.connect()); //when an actual user consumer subscribes

        // Subscribing (outside the service)
        assumeThat(consumedCount).hasValue(0);
        Mono<Tuple2<String, String>> a2 = findOne.apply(select("a", "2"));
        Mono<Tuple2<String, String>> b1 = findOne.apply(select("b", "1"));
        Mono<Tuple2<String, String>> c1 = findOne.apply(select("c", "1"));
        assertThat(consumedCount).hasValue(0);

        // Data is needed
        SoftAssertions softly = new SoftAssertions();

        assertThat(a2.block()).isEqualTo(Tuples.of("a", "2"));
        softly.assertThat(consumedCount).hasValue(4);

        assertThat(b1.block()).isEqualTo(Tuples.of("b", "1"));
        softly.assertThat(consumedCount).hasValue(4);

        assertThat(c1.block()).isEqualTo(Tuples.of("c", "1"));
        softly.assertThat(consumedCount).hasValue(4);

        softly.assertAll();
    }

    private static Predicate<Tuple2<String, String>> select(String t1, String t2) {
        return e -> e.getT1().equals(t1) && e.getT2().equals(t2);
    }
}

Question: I want to know how to achieve this for Flux results, i.e. for multiple values after the filtering is applied, not just the first/next. (Still demanding only as much as necessary)
(Tried naively replacing .toProcessor() with .publish().autoConnect(0) but did not succeed)

Edit 1: While buffering of the source is not allowed, the filters that come as parameters are expected to be highly selective, so buffering after the filtering is okay.

Edit 2: Coming back to this after a while, I tried my posted example on a newer version of reactor and it actually works.

io.projectreactor:reactor-bom:Californium-SR8
> io.projectreactor:reactor-core:3.2.9.RELEASE
Staggers answered 16/6, 2019 at 12:48 Comment(0)
N
1

I don't like giving a "non-answer" style answer, but I think at least one of your requirements has to give here. From your question, the requirements seem to be:

  • Buffering not allowed
  • Not allowed to drop elements
  • Unknown number of subscribers
  • Subscribers can connect at any time
  • Each subscriber must have all the data available when it demands it
  • No re-fetching from source

Take the case where one subscriber requests data form a Flux, the first few elements in that Flux are consumed, and then eventually another subscriber shows up at an arbitrary point in the future that wants that same data. With the above requirements, that's impossible - you'll either have to go and get the data again, or have it saved somewhere, and you've ruled both those options out.

However, if you're prepared to relax those requirements a bit, then there's a few potential options:

Known number of subscribers

If you can work out the number of subscribers you'll end up with somehow, then you can use autoConnect(n) to automatically connect to a ConnectableFlux after that number of subscriptions has been made.

Allowing elements to be dropped

If you can allow elements to be dropped, then you can just call share(); on the original Flux to get it to auto-connect on the first subscription, and then future subscribers will have previous elements dropped.

Allowing a time for subscribers to connect

This is perhaps one of the more promising strategies, since you say:

no concurrency or waiting. (Giving a very small wait time for subscribers to join is not desirable)

You can turn the Flux into a hot source that caches all emitted elements for a certain time period. This means that you can, at the cost of some amount of memory (but without buffering the whole stream), give subscribers a small wait time when they can subscribe and still receive all the data.

Buffering a known number of elements

Similarly to above, you can use another variant of the cache() method to just cache a known number of elements. If you know you can safely fit n elements into memory, but no more, then this could give you the maximum time possible for subscribers to safely connect.

Nordic answered 30/7, 2019 at 23:1 Comment(4)
I have to agree - something has to give. I'd like to "refine" two of the bullets points (in your answer). 1) Buffering of the source is not allowed 5) Each subscriber must have all the data that has been published after it subscribed -- The latter is like a requirement of a hot source. But the data should still be pulled like in a cold source. In a sense, this should be a "hybrid" source. The former "No buffering" is still important. But the filters that are accepted as parameters are expected to be highly selective. So buffering after the filters is okay.Staggers
Here is my latest approach. I'd like to name the subscribers "active" and "passive". At first all of them are passive, but they are subscribed. Then later on, one of them becomes active and pulls data, but not necessarily all the data. It keeps pulling until it gets satisfied and completes (terminates*). Then the flow of data "pauses". All the other passive subscribers "heard" all the emissions, applied their filtering, and cached any potential results. So now if another one gets "activated", it first emits the (potentially) cached results, and if still not satisfied, starts pulling data.Staggers
@Staggers Probably just me being picky on terminology, but remember reactor is push based - a subscriber can request data, but it can't activelly pull at all. The problem I see with th above approach is that it doesn't solve the issue of another subscriber subscribing after an initial subscriber has become "active" - in that case, unless you've buffered the initial source somehow, it can't be replayed.Nordic
Okay, true, subscribers can only request and do not actively pull. But effectively they do pull in cold sources. As to the problem you mention - yes, if they "subscribe" late they will lose earlier emissions. That is actually okay - it's how hot sources are. (Though I am putting quotes on "subscribe" because in my example there is a point which technically is subscribing, but is not what I mean here)Staggers

© 2022 - 2024 — McMap. All rights reserved.