Split Akka Stream Source into two
Asked Answered
A

5

13

I have an Akka Streams Source which I want to split into two sources according to a predicate.

E.g. having a source (types are simplified intentionally):

val source: Source[Either[Throwable, String], NotUsed] = ???

And two methods:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ???
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???

I would like to be able to split the source according to _.isRight predicate and pass the right part to handleSuccess method and left part to handleFailure method.

I tried using Broadcast splitter but it requires Sinks at the end.

Ambition answered 18/7, 2016 at 14:13 Comment(2)
I don't think it is possible to split a source into two sources in such a way because these split sources can then be materialized separately and it is absolutely not clear how it should even work then.Strait
Yes, I understand the implications of this. I'm interested in an alternative pattern to re-structure the code into. For example, I could make my methods return Sinks instead of accepting a Source.Ambition
L
10

Although you can choose which side of the Source you want to retrieve items from it's not possible to create a Source that that yields two outputs which is what it seems like you would ultimately want.

Given the GraphStage below which essentially splits the left and right values into two outputs...

/**
  * Fans out left and right values of an either
  * @tparam L left value type
  * @tparam R right value type
  */
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
  import akka.stream.{Attributes, Outlet}
  import akka.stream.stage.GraphStageLogic

  override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    var out0demand = false
    var out1demand = false

    setHandler(shape.in, new InHandler {
      override def onPush(): Unit = {

        if (out0demand && out1demand) {
          grab(shape.in) match {
            case Left(l) =>
              out0demand = false
              push(shape.out0, l)
            case Right(r) =>
              out1demand = false
              push(shape.out1, r)
          }
        }
      }
    })

    setHandler(shape.out0, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out0demand) {
          out0demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })

    setHandler(shape.out1, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out1demand) {
          out1demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })
  }
}

.. you can route them to only receive one side:

val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s =>
  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> Sink.ignore

  SourceShape(eitherFanOut.out1)
})

Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf)

... or probably more desirable, route them to two seperate Sinks:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> l.in
  eitherFanOut.out1 ~> r.in

  ClosedShape
})


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

(Imports and initial setup)

import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, InHandler, OutHandler}
import akka.stream._
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val classLoader = getClass.getClassLoader
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader)
implicit val materializer = ActorMaterializer()

val values: List[Either[Throwable, String]] = List(
  Right("B"),
  Left(new Throwable),
  Left(new RuntimeException),
  Right("B"),
  Right("C"),
  Right("G"),
  Right("I"),
  Right("F"),
  Right("T"),
  Right("A")
)

val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator)
Lx answered 18/7, 2016 at 20:4 Comment(0)
M
9

Edit: this other answer with divertTo is a better solution than mine, IMO. I'll leave my answer as-is for posterity.


original answer:

This is implemented in akka-stream-contrib as PartitionWith. Add this dependency to SBT to pull it in to your project:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"```

`PartitionWith` is shaped like a `Broadcast(2)`, but with potentially different types for each of the two outlets. You provide it with a predicate to apply to each element, and depending on the outcome, they get routed to the applicable outlet. You can then attach a `Sink` or `Flow` to each of these outlets independently as appropriate. Building on [cessationoftime's example](https://mcmap.net/q/853390/-split-akka-stream-source-into-two), with the `Broadcast` replaced with a `PartitionWith`:

    val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty
    val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
    val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

    val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)
                                      ((_, _, _)) { implicit b => (s, l, r) =>

      import GraphDSL.Implicits._

      val pw = b.add(
        PartitionWith.apply[Either[Throwable, String], Throwable, String](identity)
      )

      eitherSource ~> pw.in
      pw.out0 ~> leftSink
      pw.out1 ~> rightSink

      ClosedShape
    })

    val r = flow.run()
    Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)
Mcalpin answered 26/9, 2017 at 12:23 Comment(0)
A
5

For this you can use a broadcast, then filter and map the streams within the GraphDSL:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))


val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

       import GraphDSL.Implicits._

       val broadcast = b.add(Broadcast[Either[Throwable,String]](2))


       s ~> broadcast.in
       broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> l.in
       broadcast.out(1).filter(_.isRight).map(_.right.get) ~> r.in


       ClosedShape
  })


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

I expect you will be able to run the functions you want from within the map.

Astrid answered 28/9, 2016 at 10:7 Comment(1)
Bit safer variant of .filter(_.isLeft).map(_.left.get) is .collect { case Left(l) => l } And so for the right branchHeavensent
O
2

In the meantime this has been introduced to standard Akka-Streams: https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Partition.html.

You can split the input stream with a predicate and then use collect on each outputs to get only the types you are interested in.

Olenta answered 27/9, 2019 at 12:49 Comment(0)
C
2

You can use divertTo to attach alternative Sink to the flow to handle Lefts: https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/divertTo.html

source
  .divertTo(handleFailureSink, _.isLeft)
  .map(rightEither => handleSuccess(rightEither.right.get()))
Cavan answered 16/11, 2020 at 10:32 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.