What do parameters to throttleShape mean?
Asked Answered
G

1

5

zio-streams provides throttleShape which

  /**
   * Delays the chunks of this stream according to the given bandwidth parameters using the token bucket
   * algorithm. Allows for burst in the processing of elements by allowing the token bucket to accumulate
   * tokens up to a `units + burst` threshold. The weight of each chunk is determined by the `costFn`
   * function.
   */
  final def throttleShape(units: Long, duration: Duration, burst: Long = 0)(
    costFn: Chunk[O] => Long
  ): ZStream[R with Clock, E, O]

I am struggling to understand how the parameters unit, duration burst and costFun are meant to be used. From my reading of token bucket

throttleShape(1, 1.second)(_ => 1)

means processing one element costs one token (costFun = _ => 1), and one token (unit = 1) is replenished after one second (duration = 1.second). However my experiments with various values do not seem to result in any throttling, except for

throttleShape(1, 1.second)(_ => 2)

which makes it hang. For example, how would one interpret throttling in the following snippets (from the PR) which use infinity duration

Stream(1, 2, 3, 4)
  .throttleShape(1, Duration.Infinity)(_ => 0)
  .runCollect

Stream(1, 2, 3, 4)
  .throttleShape(2, Duration.Infinity)(_ => 1)
  .take(2)
  .runCollect

Specifically, say I want to process 100 elements per minute maximum, then how should throttleShape be specified?

Golly answered 27/8, 2020 at 20:47 Comment(0)
D
7

The problem is that your initial stream is a single Chunk[Int] and in throttleShape as it says in comments - you throttle by chunks, not by elements.

Single chunk is constructed from Stream(1, 2, 3, 4) because it corresponds to

  /**
   * Creates a pure stream from a variable list of values
   */
  def apply[A](as: A*): ZStream[Any, Nothing, A] = fromIterable(as)

In which

  /**
   * Creates a stream from an iterable collection of values
   */
  def fromIterable[O](as: => Iterable[O]): ZStream[Any, Nothing, O] =
    fromChunk(Chunk.fromIterable(as))

So if you want to throttle by elements you should rescale your chunks to 1 element by .chunkN(1). You should do it before throttling.

So in case of

say I want to process 100 elements per minute maximum

If you don't need chunk's optimizations (to process items in batches/chunks) you can just scale chunks to 1 and then just throttleShape(100, 1.minute)(_ => 1)

stream.Stream.fromIterable(1 to 1000)
  .chunkN(1)
  .throttleShape(100, 1.minute)(_ => 1)
  .foreachChunk(chunk => console.putStrLn(s"processed '${chunk.foldLeft("")(_ + _)}'"))

Or if you want process in chunks and keep same processing rate - you can write costFn as _.size:

stream.Stream.fromIterable(1 to 1000)
  .chunkN(5)
  .throttleShape(100, 1.minute)(_.size)
  .foreachChunk(chunk => console.putStrLn(s"processed '${chunk.foldLeft("")(_ + _)}'"))
Drumm answered 27/8, 2020 at 22:36 Comment(3)
@sokolov-artem I'm trying to reproduce these examples, but it seems something wrong with bust parameter in throttleShape. Default value of bust is 0, but it busts 100. Any value that I'm trying to pass as a bust is just ignored and units value is used as a bust. So on start this stream busts 100 of its capacity, and after that 100 per minute.Unspent
@BogdanVakulenko burst - is the addition to the capacity of the bucket. But the bucket is being populated at rate of units per duration. You should read more about the token bucket algorithm.Drumm
chunkN has been renamed into rechunk at some point, so you may use it insteadFlori

© 2022 - 2024 — McMap. All rights reserved.