akka timeout when using spray client for multiple request
Asked Answered
L

1

6

Using spray 1.3.2 with akka 2.3.6. (akka is used only for spray).
I need to read huge files and for each line make a http request.
I read the files line by line with iterator, and for each item make the request. It run successfully for some of the lines but at some time it start to fail with:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/IO-HTTP#-35162984]] after [60000 ms].
I first thought I overloading the service, so I set the "spray.can.host-connector.max-connections" to 1. It run much slower but I got the same errors.

Here the code:

import spray.http.MediaTypes._
val EdnType = register(
MediaType.custom(
  mainType = "application",
  subType = "edn",
  compressible = true,
  binary = false,
  fileExtensions = Seq("edn")))

val pipeline = (
  addHeader("Accept", "application/json")
  ~> sendReceive
  ~> unmarshal[PipelineResponse])

def postData(data: String) = {
  val request = Post(pipelineUrl).withEntity(HttpEntity.apply(EdnType, data))
  val responseFuture: Future[PipelineResponse] = pipeline(request)
  responseFuture
}

dataLines.map { d =>
  val f = postData(d)
  f.onFailure { case e => println("Error - "+e)} // This is where the errors are display
  f.map { p => someMoreLogic(d, p) }
}

aggrigateResults(dataLines)

I do it in such way since I don't need the entire data, just some aggregations.

How can I solve this and keep it entirely async?

Lylelyles answered 16/11, 2014 at 16:1 Comment(3)
That's the timeout of sendReceive. See github.com/spray/spray/blob/master/spray-client/src/main/scala/… You can adapt it by providing another implicit Timeout in scope. E.g. implicit val timeout = Timeout(120.seconds)Mackler
@jrudolph, does each call to the pipeline start the timeout timer at the call? isn't that mean it create an actor for the request at that time? Is there any way of calling it that would create the actor only at the request time?Lylelyles
Ask timeout is implemented via firstCompletedOf, the timer starts at call site. See my answer below.Hydroid
H
7

Akka ask timeout is implemented via firstCompletedOf, so the timer starts when the ask is initialized.

What you seem to be doing, is spawning a Future for each line (during the map) - so all your calls execute nearly at the same time. The timeouts start counting when the futures are initialized, but there are no executor threads left for all the spawned actors to do their work. Hence the asks time out.

Instead of processing "all at once", I would suggest a more flexible approach - somewhat similar to using iteratees, or akka-streams: Work Pulling Pattern. (Github)

You provide the iterator that you already have as an Epic. Introduce a Worker actor, which will perform the call & some logic. If you spawn N workers then, there will be at most N lines being processed concurrently (and the processing pipeline may involve multiple steps). This way you can ensure that you are not overloading the executors, and the timeouts shouldn't happen.

Hydroid answered 16/11, 2014 at 19:19 Comment(3)
This code is for simple tool that should make a lot of http request. I thought about using another http client, but then I make the changes to use the 'Work Pulling Pattern' and it works great.Lylelyles
Happy to hear that! I'm also using a modified WPP.Hydroid
Could you please demonstrate your statement: ... but there are no executor threads left for all the spawned actors to do their work. How do you know that? ThanksStriate

© 2022 - 2024 — McMap. All rights reserved.