Throttle HTTP request on Akka/Spray
Asked Answered
U

2

10

I'm using Akka actors in Scala to download resources from external service (HTTP get request). Response from external service is JSON and I have to use paging (provider is very slow). I want to download all paged results concurrently in 10 threads. I use an URL such as this to download chunk: http://service.com/itmes?limit=50&offset=1000

I have created following pipeline:

ScatterActor => RoundRobinPool[10](LoadChunkActor) => Aggreator

ScatterActor takes total count of items to download and divides it into chunks. I created 10 LoadChunkActor's to process tasks concurrently.

  override def receive: Receive = {
    case LoadMessage(limit) =>
    val offsets: IndexedSeq[Int] = 0 until limit by chunkSize
    offsets.foreach(offset => context.system.actorSelection(pipe) !
    LoadMessage(chunkSize, offset))
 }

LoadChunkActor uses Spray to send request. Actor looks like this:

val pipeline = sendReceive ~> unmarshal[List[Items]]
override def receive: Receive = {
  case LoadMessage(limit, offset) =>
    val uri: String = s"http://service.com/items?limit=50&offset=$offset"
    val responseFuture = pipeline {Get(uri)}
    responseFuture onComplete {
      case Success(items) => aggregator ! Loaded(items)
    }
 }

As you can see, LoadChunkActor is requesting chunk from external service and adding callback to be run onComplete. Actor is now ready to take another message and he is requesting another chunk. Spray is using nonblocking API to download chunks. In result external service is flooded with my requests and I get timeouts.

How can I schedule list of tasks but I want to process maximum 10 at the same time?

Unreality answered 7/8, 2014 at 6:54 Comment(4)
did you read this article?Hexarchy
Yes, I have read this article. It's describing system to limit up to 3m/s. I'm interested in throttling with feedback, I want to start processing next message when LoadChunkActor will send response from HTTP service to aggregator.Unreality
This seems like an excellent candidate for work pulling: michaelpollmeier.com/akka-work-pulling-pattern. Use one actor as the master to create the work list based on total number of batches to pull down and manage the results and then a pool of workers underneath it to handle a chunk at a time, reporting results back to the master.Iphlgenia
@cmbaxter, I have used similar solution similar to pulling. It works great.Unreality
U
3

I have created following solution (similar to pulling http://www.michaelpollmeier.com/akka-work-pulling-pattern/:

ScatterActor (10000x messages) => 
  ThrottleActor => LoadChunkActor => ThrottleMonitorActor => Aggregator
         ^                                    |
         |<--------WorkDoneMessage------------|
  1. ThrottleActor pub messages into ListBuffer and sends to LoadChunkActor maximum N count of messages.
  2. When LoadChunkActor sends message to Aggregator through ThrottleMonitorActor.
  3. ThrottleMonitorActor sends confirmation to ThrottleActor.
  4. ThrottleActor sends next message to LoadChunkActor.
Unreality answered 8/8, 2014 at 10:24 Comment(0)
S
1

From the project adhoclabs/akka-http-contrib, you now (July 2016, two years later) the scala.co.adhoclabs.akka.http.contrib.throttle package from Yeghishe Piruzyan.

See "Akka Http Request Throttling"

implicit val throttleSettings = MetricThrottleSettings.fromConfig

Http().bindAndHandle(
  throttle.apply(routes),
  httpInterface,
  httpPort
)
Squinty answered 25/7, 2016 at 8:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.