I think it will be hard (or impossible) to implement it with a supervsior strategy, mostly because you want to retry "n" times (according to the discussion in comments), and I don't think you can track the number of times the element was tried when using supervision.
I think there are two ways to solve this issue. Either handle the risky operation as a separate stream or create a graph, which will do error handling. I will propose two solutions.
Note also that Akka Streams distinguishes between errors and failures, so if you wont' handle your failures they will eventually collapse the flow (if no strategy is intriduced), so in the example below I convert them to Either
, which represent either success or error.
Separate stream
What you can do is to treat each alphabet letter as a separate stream and handle failures for each letter separately with the retry strategy, and some delay.
// this comes after your helloFormat
// note that the method is somehow simpler because it's
// using implicit dispatcher and scheduler from outside scope,
// you may also want to pass it as implicit arguments
def retry[T](f: => Future[T], delay: FiniteDuration, c: Int): Future[T] =
f.recoverWith {
// you may want to only handle certain exceptions here...
case ex: Exception if c > 0 =>
println(s"failed - will retry ${c - 1} more times")
akka.pattern.after(delay, system.scheduler)(retry(f, delay, c - 1))
}
val singleElementFlow = httpFlow.mapAsync[Hello](1) {
case (Success(response), _) =>
val f = Unmarshal(response).to[Hello]
f.recoverWith {
case ex: Exception =>
// see https://github.com/akka/akka/issues/20192
response.entity.dataBytes.runWith(Sink.ignore).flatMap(_ => f)
}
case (Failure(e), _) => Future.failed(e)
}
// so the searches can either go ok or not, for each letter, we will retry up to 3 times
val searches =
Source('a' to 'z').map(search).mapAsync[Either[Throwable, Hello]](1) { elem =>
println(s"trying $elem")
retry(
Source.single(elem).via(singleElementFlow).runWith(Sink.head[Hello]),
1.seconds, 3
).map(ok => Right(ok)).recover { case ex => Left(ex) }
}
// end
Graph
This method will integrate failures into the graph, and will allow for retries. This example makes all requests run in parallel and prefer to retry those which failed, but if you don't want this behaviour and run them one by one this is something you can also do I believe.
// this comes after your helloFormat
// you may need to have your own class if you
// want to propagate failures for example, but we will use
// right value to keep track of how many times we have
// tried the request
type ParseResult = Either[(HttpRequest, Int), Hello]
def search(query: Char): (HttpRequest, (HttpRequest, Int)) = {
val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
(request, (request, 0)) // let's use this opaque value to count how many times we tried to search
}
val g = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val searches = b.add(Flow[Char])
val tryParse =
Flow[(Try[HttpResponse], (HttpRequest, Int))].mapAsync[ParseResult](1) {
case (Success(response), (req, tries)) =>
println(s"trying parse response to $req for $tries")
Unmarshal(response).to[Hello].
map(h => Right(h)).
recoverWith {
case ex: Exception =>
// see https://github.com/akka/akka/issues/20192
response.entity.dataBytes.runWith(Sink.ignore).map { _ =>
Left((req, tries + 1))
}
}
case (Failure(e), _) => Future.failed(e)
}
val broadcast = b.add(Broadcast[ParseResult](2))
val nonErrors = b.add(Flow[ParseResult].collect {
case Right(x) => x
// you may also handle here Lefts which do exceeded retries count
})
val errors = Flow[ParseResult].collect {
case Left(x) if x._2 < 3 => (x._1, x)
}
val merge = b.add(MergePreferred[(HttpRequest, (HttpRequest, Int))](1, eagerComplete = true))
// @formatter:off
searches.map(search) ~> merge ~> httpFlow ~> tryParse ~> broadcast ~> nonErrors
merge.preferred <~ errors <~ broadcast
// @formatter:on
FlowShape(searches.in, nonErrors.out)
}
def main(args: Array[String]): Unit = {
val source = Source('a' to 'z')
val sink = Sink.seq[Hello]
source.via(g).toMat(sink)(Keep.right).run().onComplete {
case Success(seq) =>
println(seq)
case Failure(ex) =>
println(ex)
}
}
Basically what happens here is we run searches through httpFlow
and then try to parse the response, we
then broadcast the result and split errors and non-errors, the non errors go to sink, and errors get sent
back to the loop. If the number of retries exceed the count, we ignore the element, but you can also do
something else with it.
Anyway I hope this gives you some idea.
mapAsync
, like.withAttributes(supervisionStrategy(resumingDecider))
(but it's in the doc you've linked so I assume you're after something else) – Peaceablev
fails with 500, I want to retryv
again two times. – Hadfield