How to retry failed Unmarshalling of a stream of akka-http requests?
Asked Answered
O

2

5

I know it's possible to restart an akka-stream on error with a supervision strategy on the ActorMaterialzer

val decider: Supervision.Decider = {
  case _: ArithmeticException => Supervision.Resume
  case _                      => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
  ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)

source: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-error.html

I have the following use case.

/***
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-experimental"            % "2.4.2",
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2"
)
*/

import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._

import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import Uri.Query

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

import scala.util.{Success, Failure}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.Future

object SO extends DefaultJsonProtocol {

  implicit val system = ActorSystem()
  import system.dispatcher
  implicit val materializer = ActorMaterializer()

  val httpFlow = Http().cachedHostConnectionPoolHttps[HttpRequest]("example.org")

  def search(query: Char) = {
    val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
    (request, request)
  }

  case class Hello(name: String)
  implicit val helloFormat = jsonFormat1(Hello)

  val searches =
    Source('a' to 'z').map(search).via(httpFlow).mapAsync(1){
      case (Success(response), _) => Unmarshal(response).to[Hello]
      case (Failure(e), _) => Future.failed(e)
    }

  def main(): Unit = {
    Await.result(searches.runForeach(_ => println), Duration.Inf)
    ()
  }
}

Sometime a query will fail to unmarshall. I want to use a retry strategy on that single query https://example.org/?q=v without restarting the whole alphabet.

Olwen answered 31/3, 2016 at 16:7 Comment(4)
Do you want to use the same query with the next letter or do you want to retry the same query with the same letter, or do you want to use different query? If you only want to try next query you can add supervision strategy to the mapAsync, like .withAttributes(supervisionStrategy(resumingDecider)) (but it's in the doc you've linked so I assume you're after something else)Peaceable
yes retry a single query, same letter. If v fails with 500, I want to retry v again two times.Hadfield
Do you need to run it as a part of your flow, because you can run it as a separate flow and then retry using standard future mechanisms, which would be somehow easier I think. Otherwise I think you have to create a graph which will feed back failed queries. I can show you any or both of the solutions if you are interested.Peaceable
I would be interested in the graph solutionHadfield
P
7

I think it will be hard (or impossible) to implement it with a supervsior strategy, mostly because you want to retry "n" times (according to the discussion in comments), and I don't think you can track the number of times the element was tried when using supervision.

I think there are two ways to solve this issue. Either handle the risky operation as a separate stream or create a graph, which will do error handling. I will propose two solutions.

Note also that Akka Streams distinguishes between errors and failures, so if you wont' handle your failures they will eventually collapse the flow (if no strategy is intriduced), so in the example below I convert them to Either, which represent either success or error.

Separate stream

What you can do is to treat each alphabet letter as a separate stream and handle failures for each letter separately with the retry strategy, and some delay.

// this comes after your helloFormat

// note that the method is somehow simpler because it's
// using implicit dispatcher and scheduler from outside scope,
// you may also want to pass it as implicit arguments
def retry[T](f: => Future[T], delay: FiniteDuration, c: Int): Future[T] =
  f.recoverWith {
    // you may want to only handle certain exceptions here...
    case ex: Exception if c > 0 =>
      println(s"failed - will retry ${c - 1} more times")
      akka.pattern.after(delay, system.scheduler)(retry(f, delay, c - 1))
  }

val singleElementFlow = httpFlow.mapAsync[Hello](1) {
  case (Success(response), _) =>
    val f = Unmarshal(response).to[Hello]
    f.recoverWith {
      case ex: Exception =>
        // see https://github.com/akka/akka/issues/20192
        response.entity.dataBytes.runWith(Sink.ignore).flatMap(_ => f)
    }
  case (Failure(e), _) => Future.failed(e)
}

// so the searches can either go ok or not, for each letter, we will retry up to 3 times
val searches =
  Source('a' to 'z').map(search).mapAsync[Either[Throwable, Hello]](1) { elem =>
    println(s"trying $elem")
    retry(
      Source.single(elem).via(singleElementFlow).runWith(Sink.head[Hello]),
      1.seconds, 3
    ).map(ok => Right(ok)).recover { case ex => Left(ex) }
  }
// end

Graph

This method will integrate failures into the graph, and will allow for retries. This example makes all requests run in parallel and prefer to retry those which failed, but if you don't want this behaviour and run them one by one this is something you can also do I believe.

// this comes after your helloFormat

// you may need to have your own class if you
// want to propagate failures for example, but we will use
// right value to keep track of how many times we have
// tried the request
type ParseResult = Either[(HttpRequest, Int), Hello]

def search(query: Char): (HttpRequest, (HttpRequest, Int)) = {
  val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
  (request, (request, 0)) // let's use this opaque value to count how many times we tried to search
}

val g = GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val searches = b.add(Flow[Char])

  val tryParse =
    Flow[(Try[HttpResponse], (HttpRequest, Int))].mapAsync[ParseResult](1) {
      case (Success(response), (req, tries)) =>
        println(s"trying parse response to $req for $tries")
        Unmarshal(response).to[Hello].
          map(h => Right(h)).
          recoverWith {
            case ex: Exception =>
              // see https://github.com/akka/akka/issues/20192
              response.entity.dataBytes.runWith(Sink.ignore).map { _ =>
                Left((req, tries + 1))
              }
          }
      case (Failure(e), _) => Future.failed(e)
    }

  val broadcast = b.add(Broadcast[ParseResult](2))

  val nonErrors = b.add(Flow[ParseResult].collect {
    case Right(x) => x
    // you may also handle here Lefts which do exceeded retries count
  })

  val errors = Flow[ParseResult].collect {
    case Left(x) if x._2 < 3 => (x._1, x)
  }
  val merge = b.add(MergePreferred[(HttpRequest, (HttpRequest, Int))](1, eagerComplete = true))

  // @formatter:off
  searches.map(search) ~> merge ~> httpFlow ~> tryParse ~> broadcast ~> nonErrors
                          merge.preferred <~ errors <~ broadcast
  // @formatter:on

  FlowShape(searches.in, nonErrors.out)
}

def main(args: Array[String]): Unit = {
  val source = Source('a' to 'z')
  val sink = Sink.seq[Hello]

  source.via(g).toMat(sink)(Keep.right).run().onComplete {
    case Success(seq) =>
      println(seq)
    case Failure(ex) =>
      println(ex)
  }

}

Basically what happens here is we run searches through httpFlow and then try to parse the response, we then broadcast the result and split errors and non-errors, the non errors go to sink, and errors get sent back to the loop. If the number of retries exceed the count, we ignore the element, but you can also do something else with it.

Anyway I hope this gives you some idea.

Peaceable answered 1/4, 2016 at 15:57 Comment(2)
Great effort. I will open a bounty for your answer. I think the Graph should be a FlowShape because I still want to process Hello after. Also are you sure this merge is correct? https://mcmap.net/q/583869/-why-akka-streams-cycle-doesn-39-t-end-in-this-graphHadfield
If you want to add input from outside and process it after, then yes, if you just want to process Hallo after, but it originates from Graph, it can be SourceShape. You don't wrap it in the RunnableGraph then, and you can run it like Source.fromGraph(g).runWith(Sink.seq[Hello]) (for example). Yes, if you need your stream to complete you need to make merge eager, like in the link you've sent. I'll update my answer.Peaceable
R
1

For the streams solution above, any retries for the last element in the stream won't execute. That's because when the upstream completes after sending the last element the merge will also complete. After that the only output came come from the non-retry outlet but since the element goes to retry that gets completed too.

If you need all input elements to generate an output you'll need an extra mechanism to stop the upstream complete from reaching the process&retry graph. One possibility is to use a BidiFlow which monitors the input and output from the process&retry graph to ensure all the required outputs have been generated (for the observed inputs) before propagating the oncomplete. In the simple case that could just be counting input and output elements.

Randolph answered 7/5, 2016 at 2:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.