Reactor groupBy: What happens with remaining items after GroupedFlux is canceled?
Asked Answered
N

2

6

I need to group infinite Flux by key with high cardinality.

For example:

  1. group key is domain url
  2. calls to one domain should be strictly sequential (next call happens after previous one is completed)
  3. calls to different domains should be concurrent
  4. time interval between items with same key (url) is unknown, but expected to have burst nature. Several items emitted in short period of time then long pause until next group.
queue
    .groupBy(keyMapper, groupPrefetch)
    .flatMap(
       { group ->
           group.concatMap(
               { task -> makeSlowRemoteCall(task) },
               0
           )
           .takeUntil { remoteCallResult -> remoteCallResult == DONE }
           .timeout(groupTimeout, Mono.empty())
           .then()
       }
      , concurrency
    )

I cancel the group in two cases:

  1. makeSlowRemoteCall() result indicates that with high probability there will be no new items in this group in near future.

  2. Next item is not emitted during groupTimeout. I use timeout(timeout, fallback) variant to suppress TimeoutException and allow flatMap's inner publisher to complete successfully.

I want possible future items with same key to make new GroupedFlux and be processed with same flatMap inner pipeline.

But what happens if GroupedFlux has remaining unrequested items when I cancel it?

Does groupBy operator re-queue them into new group with same key or they are lost forever. If later what is the proper way to solve my problem. I am also not sure if I need to set concatMap() prefetch to 0 in this case.

Notogaea answered 1/4, 2021 at 21:9 Comment(9)
I'm not clear on exactly what you want to happen if the timeout occurs - in this case an error will be thrown and the entire chain will thus be halted (the elements that haven't yet processed won't be requeued somewhere, they're lost.) Do you want to log the remaining elements somewhere, or do some other action on them after a timeout, or something else entirely?Furthermore
I edited my question in attempt to make it more clear. Please note usage of timeout(timeout, fallback) variant to suppress the timeout error. It should cancel source flux group but allow flatMap inner publisher to complete successfully. Basically I want remaining items to be processed by the same concatMap pipeline, but as new GroupedFlux. The reason I am using timeout is to clear idle groups from flatMap inners.Notogaea
Ah I see. I think you're out of luck I'm afraid - as soon as that timeout occurs, the entire GroupedFlux chain is replaced with an empty Mono, you can't get the actual group back. Having said that, it sounds like you're using the timeout() as an optimisation to clear groups, which is a bit of an odd case - you shouldn't need to do that unless the numbers of groups get stupidly large. If that is the case, you'd probably be better off using something like the bulkhead pattern in resilience4j (resilience4j.readme.io/docs/bulkhead) by using a new bulkhead instance for each domain.Furthermore
(continued) ...using the bulkhead in that way should enable you to avoid groups entirely if I've understood correctly - you can just make your slow remote calls in a standard flatMap, but using the bulkhead names as keys.Furthermore
The following javadoc makes me worry "Note that groupBy works best with a low cardinality of groupswhen <...> the criteria produces a large amount of groups, it can lead to hanging if the groups are not suitably consumed downstream". I will have up to 10k possible keys, with 100-1000 inbound elements per second. Each group will idle up to 30 seconds until terminal element arrives and I can cancel it. Does it sounds good enough to not hang flatMap?Notogaea
(cont) And thanks for resilience4j suggestion. I had no experience with bulkhead pattern. For my particular case should I configure it with maxConcurrentCalls=1 and just get instance from BulkheadRegistry with name=my-group-key? Is it intended to work with such high number (10k) of active buckheads?Notogaea
I wouldn't use groupBy under those circumstances personally - the danger you have is you end up with greater than n active idle groups (where n is your concurrency), then the whole thing hangs indefinitely because your entire concurrency utilisation is used up on those idle groups. You could easily hit that ceiling in that sort of scenario. Yes, for your scenario you'd use a bulkhead as you describe - I've never tried using that many active bulkheads, but I can't see it being an issue in particular, as you'd essentially just have 10k objects stored in a collection somewhere.Furthermore
(continued) ...you'd just need to make sure you have sufficient memory to deal with all that throughput. You may also choose to put the bulkheads in some sort of cache where you can expire them if not used within a certain time period - if the number of domains could grow to infinity then this would be prudent to avoid a leak.Furthermore
I decided to rethink architecture. I think it is better to hash my business logic key and uniformly distribute elements to few infinite groups. Less throughput but more simple system.Notogaea
N
2

I think groupBy() operator is not fit for my task with infinite source and a lot of groups. It makes infinite groups so it is necessary to somehow cancel idle groups downstream. But it is not possible to cancel GroupedFlux with guarantee that it has no unconsumed elements.

I think it will be great to have groupBy variant that emits finite groups. Something like groupBy(keyMapper, boundryPredicate). When boundryPredicate returns true current group is complete and next element with same key will start new group.

Notogaea answered 3/4, 2021 at 6:26 Comment(0)
H
0

So I was dealing with something similar - the way to handle this is to use windowUntil or windowUntilChanged first, and then use groupBy on the Flux instances emitted by that. So for example, you can track the keys seen so far for the current window. When there's a new key and the current window is "full", then you start a new window with new groups. Note in this case if you see again a key from a previous window it won't reuse the old GroupedFlux for it. That old one will drain from the previous window, and a new GroupedFlux will be made for that key in the new window.

Horned answered 25/1 at 18:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.