How to control the concurrency of future.sequence in scala?
Asked Answered
G

2

12

I know that I can transform a Seq[Future[T]] into a Future[Seq[T]] via

  val seqFuture = Future.sequence(seqOfFutures)
  seqFuture.map((seqT: Seq[T]) => {...})

My problem now is, that I have 700 futures in that sequence and I want to be able to control how many of them are resolved in parallel as each future will call an internal rest api, and having 700 requests at the same time is like fireing a dos-attack against that server.

I rather only have something like 10 futures being resolved at a time.

How can I achieve that?


Trying pamu's answer I see the error:

[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:44: com.dreamlines.commons.LazyFuture[A] does not take parameters
[error]         val batch = Future.sequence(c.map(_()))
[error]                                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:28: no type parameters for method sequence: (in: M[scala.concurrent.Future[A]])(implicit cbf: scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]], implicit executor: scala.concurrent.ExecutionContext)scala.concurrent.Future[M[A]] exist so that it can be applied to arguments (List[Nothing])
[error]  --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error]  found   : List[Nothing]
[error]  required: ?M[scala.concurrent.Future[?A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:42: type mismatch;
[error]  found   : List[Nothing]
[error]  required: M[scala.concurrent.Future[A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                                          ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:36: Cannot construct a collection of type M[A] with elements of type A based on a collection of type M[scala.concurrent.Future[A]].
[error]         val batch = Future.sequence(c.map(_()))
[error]                                    ^
[error] four errors found
Gowk answered 19/4, 2018 at 15:33 Comment(1)
Take a look how you can throttle request rate using async HTTP client (Play WS). The same can be applied for Akka Http Client: #37259706Athos
B
7

FoldLeft

Simple foldLeft can be used to control the number of futures that run concurrently at a time.

First, let's create a case class called LazyFuture

case class LazyFuture[+A](f: Unit => Future[A]) {
  def apply() = f()
}

object LazyFuture {
  def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

  def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}

LazyFuture stops future from running immediately

val list: List[LazyFuture[A]] = ...


list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
  val batch = Future.sequence(c.map(_()))
  batch.flatMap(values => r.map(rs => rs ++ values))
}

Change concurFactor accordingly to run multiple futures concurrently.

concurFactor of 1 will run one future at once

concurFactor of 2 will run two futures at once

and so on ...

def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) =
   list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
      val batch = Future.sequence(c.map(_()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

Complete code

  case class LazyFuture[+A](f: Unit => Future[A]) {
    def apply() = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

    def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)(implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

Limiting the execution context

You can also limit the computation resources by limiting the number of threads in the execution pool. But, this solution is not so flexible. Personally, I do not like it.

val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

You have to remember to pass correct execution context which is an implicit value. Sometimes we do not know which implicit is in scope. It's buggy

Warning

When future is constructed like below

val foo = Future {
     1 + 2
} // future starts executing

LazyFuture(foo) // Not a right way

foo already started executing and cannot be controlled.

Right way to construct LazyFuture

val foo = LazyFuture {
  1 + 2
}

or

val foo = LazyFuture {
  Future {
   1 + 2
  }
}

Working example

package main

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

object Main {

  case class LazyFuture[A](f: Unit => Future[A]) {
    def apply(): Future[A] = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
    def apply[A](f: => Future[A]): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)
    (implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => r.map(values=> rs ++ values))
    }

  def main(args: Array[String]): Unit = {
    import scala.concurrent.ExecutionContext.Implicits.global


    val futures: Seq[LazyFuture[Int]] = List(1, 2, 3, 4, 5).map { value =>
      LazyFuture {
        println(s"value: $value started")
        Thread.sleep(value * 200)
        println(s"value: $value stopped")
        value
      }
    }
    val f = executeBatch(futures.toList)(2)
    Await.result(f, Duration.Inf)
  }

}
Beseech answered 19/4, 2018 at 15:48 Comment(12)
What do you mean by the limiting the number of threads of the execution pool not being flexible? What would I lose by going in that direction?Gowk
@Gowk You have to remember to pass correct execution context which is an implicit value. Sometimes we do not know which implicit is in scope. Its buggyBeseech
And how do transform a List[Future[T]] to List[LazyFuture[T]]?Gowk
@Gowk added a constructor. please have a look. Warning: Future must be unevaluated one.Beseech
@Gowk better to use the LazyFuture constructor to construct the computationBeseech
Please see my updated question with a stacktrace when trying this approach, as I see: expression's type is not compatible with formal parameter type errorGowk
Can it be that my error stems from the fact that I am trying to wrap a future inside a LazyFuture, whereas I should create a LazyFuture to begin with?Gowk
@Gowk Fixed compilation errors. error was at LazyFuture declarationBeseech
@pamu your solution doesn't run exactly n futures at a given time. When you start to process the group it utilizes the pool completely. But after few tasks are ready it doesn't pick up tasks from the next group. And the last task in the group will run alone.Arnulfo
@Arnulfo Ur right! it depends on the grouped. number of tasks divided by concur factor is the deal. if the reminder is 0 then all fine if not some tasks will be left to run at last . But number of futures running at a given time is limitedBeseech
@pamu no, the problem does exist for any group size since tasks don't finish simultaneously. I had the same problem and haven't found any nice and simple non-blocking solution.Arnulfo
This doesn't behave as expected. It appears that it doesn't really wait for the 10 futures to complete, and after 70 LazyFutre, it just stops.Gowk
L
4

Concurrency is of Scala Futures is controlled by the ExecutionContext. Note that futures start executing on the context immediately after creation, so the ExecutionContext of Future.sequence doesn't really matter. You have to supply the appropriate context when creating the original futures from the sequence.

The default context ExecutionContext.global (usually imported through import scala.concurrent.ExecutionContext.Implicits.global) uses as many threads as there are processor cores, but it can also create many additional threads for blocking tasks, that are wrapped in scala.concurrent.blocking. This is usually the desired behaviour, but it's not suitable for your problem.

Fortunately, you can use ExecutionContext.fromExecutor method to wrap a Java thread pool. For example:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

val context = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() }(context))
val sequenceFuture = Future.sequence(seqOfFutures)(ExecutionContext.global)

The context can also be provided implicitly of course:

implicit val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() })
// This `sequence` uses the same thread pool as the original futures
val sequenceFuture = Future.sequence(seqOfFutures) 
Latoyialatreece answered 19/4, 2018 at 15:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.