We're creating a chain of actors for every (small) incoming group of messages to guarantee their sequential processing and piping (groups are differentiating by common id). The problem is that our chain has forks, like A1 -> (A2 -> A3 | A4 -> A5)
and we should guarantee no races between messages going through A2 -> A3
and A4 -> A5
. The currrent legacy solution is to block A1
actor til current message is fully processed (in one of sub-chains):
def receive { //pseudocode
case x => ...
val f = A2orA4 ? msg
Await.complete(f, timeout)
}
As a result, count of threads in application is in direct ratio to the count of messages, that are in processing, no matter these messages are active or just asynchronously waiting for some response from outer service. It works about two years with fork-join (or any other dynamic) pool but of course can't work with fixed-pool and extremely decrease performance in case of high-load. More than that, it affects GC as every blocked fork-actor holds redundant previous message's state inside.
Even with backpressure it creates N times more threads than messages received (as there is N sequential forks in the flow), which is still bad as proceesing of one message takes a long time but not much CPU. So we should process as more messages as we have enough memory for. First solution I came up with - to linearize the chain like A1 -> A2 -> A3 -> A4 -> A5
. Is there any better?