zio-streams provides throttleShape
which
/**
* Delays the chunks of this stream according to the given bandwidth parameters using the token bucket
* algorithm. Allows for burst in the processing of elements by allowing the token bucket to accumulate
* tokens up to a `units + burst` threshold. The weight of each chunk is determined by the `costFn`
* function.
*/
final def throttleShape(units: Long, duration: Duration, burst: Long = 0)(
costFn: Chunk[O] => Long
): ZStream[R with Clock, E, O]
I am struggling to understand how the parameters unit
, duration
burst
and costFun
are meant to be used. From my reading of token bucket
throttleShape(1, 1.second)(_ => 1)
means processing one element costs one token (costFun = _ => 1
), and one token (unit = 1
) is replenished after one second (duration = 1.second
). However my experiments with various values do not seem to result in any throttling, except for
throttleShape(1, 1.second)(_ => 2)
which makes it hang. For example, how would one interpret throttling in the following snippets (from the PR) which use infinity duration
Stream(1, 2, 3, 4)
.throttleShape(1, Duration.Infinity)(_ => 0)
.runCollect
Stream(1, 2, 3, 4)
.throttleShape(2, Duration.Infinity)(_ => 1)
.take(2)
.runCollect
Specifically, say I want to process 100 elements per minute maximum, then how should throttleShape
be specified?
bust
parameter inthrottleShape
. Default value ofbust
is0
, but it busts100
. Any value that I'm trying to pass as abust
is just ignored andunits
value is used as abust
. So on start this stream busts100
of its capacity, and after that100
per minute. – Unspent