Chain Akka-http-client requests in a Stream
Asked Answered
A

2

9

I would like to chain http request using akka-http-client as Stream. Each http request in a chain depends on a success/response of a previous requests and uses it to construct a new request. If a request is not successful, the Stream should return the response of the unsuccessful request.

How can I construct such a stream in akka-http? which akka-http client level API should I use?

Anschauung answered 11/9, 2016 at 15:31 Comment(2)
What should the stream materialize if all the requests are successful? Presumably you want to get some data out of them?Hedberg
Yes, I would like to get HTTPResponse at the end of the stream; when materialised, I would like to know whether it was a success of a failure and a reason for a failure.Anschauung
H
14

If you're making a web crawler, have a look at this post. This answer tackles a more simple case, such as downloading paginated resources, where the link to the next page is in a header of the current page response.

You can create a chained source - where one item leads to the next - using the Source.unfoldAsync method. This takes a function which takes an element S and returns Future[Option[(S, E)]] to determine if the stream should continue emitting elements of type E, passing the state to the next invocation.

In your case, this is kind of like:

  1. taking an initial HttpRequest
  2. producing a Future[HttpResponse]
  3. if the response points to another URL, returning Some(request -> response), otherwise None

However, there's a wrinkle, which is that this will not emit a response from the stream if it doesn't contain a pointer to the next request.

To get around this, you can make the function passed to unfoldAsync return Future[Option[(Option[HttpRequest], HttpResponse)]]. This allows you to handle the following situations:

  • the current response is an error
  • the current response points to another request
  • the current response doesn't point to another request

What follows is some annotated code which outlines this approach, but first a preliminary:

When streaming HTTP requests to responses in Akka streams, you need to ensure that the response body is consumed otherwise bad things will happen (deadlocks and the like.) If you don't need the body you can ignore it, but here we use a function to convert the HttpEntity from a (potential) stream into a strict entity:

import scala.concurrent.duration._

def convertToStrict(r: HttpResponse): Future[HttpResponse] =
  r.entity.toStrict(10.minutes).map(e => r.withEntity(e))

Next, a couple of functions to create an Option[HttpRequest] from an HttpResponse. This example uses a scheme like Github's pagination links, where the Links header contains, e.g: <https://api.github.com/...> rel="next":

def nextUri(r: HttpResponse): Seq[Uri] = for {
  linkHeader <- r.header[Link].toSeq
  value <- linkHeader.values
  params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri

def getNextRequest(r: HttpResponse): Option[HttpRequest] =
  nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))

Next, the real function we'll pass to unfoldAsync. It uses the Akka HTTP Http().singleRequest() API to take an HttpRequest and produce a Future[HttpResponse]:

def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
  reqOption match {
    case Some(req) => Http().singleRequest(req).flatMap { response =>
      // handle the error case. Here we just return the errored response
      // with no next item.
      if (response.status.isFailure()) Future.successful(Some(None -> response))

      // Otherwise, convert the response to a strict response by
      // taking up the body and looking for a next request.
      else convertToStrict(response).map { strictResponse =>
        getNextRequest(strictResponse) match {
          // If we have no next request, return Some containing an
          // empty state, but the current value
          case None => Some(None -> strictResponse)

          // Otherwise, pass on the request...
          case next => Some(next -> strictResponse)
        }
      }
    }
    // Finally, there's no next request, end the stream by
    // returning none as the state.
    case None => Future.successful(None)
  }

Note that if we get an errored response, the stream will not continue since we return None in the next state.

You can invoke this to get a stream of HttpResponse objects like so:

val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests)

As for returning the value of the last (or errored) response, you simply need to use Sink.last, since the stream will end either when it completes successfully or on the first errored response. For example:

def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
      Some(initialRequest))(chainRequests)
    .map(_.status)
    .runWith(Sink.last)
Hedberg answered 12/9, 2016 at 20:24 Comment(1)
Thank you. Very inspiring answerAnschauung
G
0

You can use Source.unfoldAsync.

class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
  private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
  private val start: Option[String] = Some("https://catfact.ninja/breeds")

  override def getAllBreads: Future[Seq[Cat]] = {
    Source
      .unfoldAsync(start) {
        case Some(next) =>
          val nextChunkFuture: Future[CatsResponse] = sendRequest(next)

          nextChunkFuture.map { resp =>
            resp.nextPageUrl match {
              case Some(url) => Some((Some(url), resp.data))
              case None => Some((None, resp.data))
            }
          }
        case None => Future.successful(None)
      }
      .runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
  }

  private def sendRequest(url: String): Future[CatsResponse] = {
    logger.info(s"CatsHttpClientImpl: Sending request $url")

    val request = HttpRequest(
      uri = Uri(url),
      headers = List(
        RawHeader("Accept", "application/json")
      )
    )
    Http(system).singleRequest(request).flatMap { response =>
      response.status match {
        case StatusCodes.OK =>
          logger.info("CatsHttpClientImpl: Received success")
          Unmarshal(response.entity).to[CatsResponse]

        case _ =>
          logger.error("CatsHttpClientImpl: Received error")
          throw new CatsHttpClientException()
      }
    }
  }
} 

The complete source code and runnable project can be found over on GitHub

Gordan answered 24/12, 2023 at 23:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.