I need to group infinite Flux by key with high cardinality.
For example:
- group key is domain url
- calls to one domain should be strictly sequential (next call happens after previous one is completed)
- calls to different domains should be concurrent
- time interval between items with same key (url) is unknown, but expected to have burst nature. Several items emitted in short period of time then long pause until next group.
queue
.groupBy(keyMapper, groupPrefetch)
.flatMap(
{ group ->
group.concatMap(
{ task -> makeSlowRemoteCall(task) },
0
)
.takeUntil { remoteCallResult -> remoteCallResult == DONE }
.timeout(groupTimeout, Mono.empty())
.then()
}
, concurrency
)
I cancel the group in two cases:
makeSlowRemoteCall()
result indicates that with high probability there will be no new items in this group in near future.Next item is not emitted during
groupTimeout
. I usetimeout(timeout, fallback)
variant to suppress TimeoutException and allow flatMap's inner publisher to complete successfully.
I want possible future items with same key to make new GroupedFlux and be processed with same flatMap inner pipeline.
But what happens if GroupedFlux has remaining unrequested items when I cancel it?
Does groupBy operator re-queue them into new group with same key or they are lost forever. If later what is the proper way to solve my problem. I am also not sure if I need to set concatMap() prefetch to 0 in this case.
timeout(timeout, fallback)
variant to suppress the timeout error. It should cancel source flux group but allow flatMap inner publisher to complete successfully. Basically I want remaining items to be processed by the same concatMap pipeline, but as new GroupedFlux. The reason I am using timeout is to clear idle groups from flatMap inners. – NotogaeaGroupedFlux
chain is replaced with an emptyMono
, you can't get the actual group back. Having said that, it sounds like you're using thetimeout()
as an optimisation to clear groups, which is a bit of an odd case - you shouldn't need to do that unless the numbers of groups get stupidly large. If that is the case, you'd probably be better off using something like the bulkhead pattern in resilience4j (resilience4j.readme.io/docs/bulkhead) by using a new bulkhead instance for each domain. – FurthermoreflatMap
, but using the bulkhead names as keys. – FurthermoremaxConcurrentCalls=1
and just get instance from BulkheadRegistry withname=my-group-key
? Is it intended to work with such high number (10k) of active buckheads? – Notogaean
active idle groups (wheren
is your concurrency), then the whole thing hangs indefinitely because your entire concurrency utilisation is used up on those idle groups. You could easily hit that ceiling in that sort of scenario. Yes, for your scenario you'd use a bulkhead as you describe - I've never tried using that many active bulkheads, but I can't see it being an issue in particular, as you'd essentially just have 10k objects stored in a collection somewhere. – Furthermore