Why Akka streams cycle doesn't end in this graph?
Asked Answered
B

1

2

I would like to create a graph that loop n times before going to sink. I've just created this sample that fulfill my requirements but doesn't end after going to sink and I really don't understand why. Can someone enlighten me?

Thanks.

    import akka.actor.ActorSystem
    import akka.stream.scaladsl._
    import akka.stream.{ActorMaterializer, UniformFanOutShape}

    import scala.concurrent.Future

    object test {
      def main(args: Array[String]) {
        val ignore: Sink[Any, Future[Unit]] = Sink.ignore
        val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
          sink => {
            import FlowGraph.Implicits._

            val fileSource = Source.single((0, Array[String]()))
            val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
            val afterMerge = Flow[(Int, Array[String])].map {
              e =>
                println("after merge")
                e
            }
            val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
            val toRetry = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("retry " + (r < 3) + " " + r)
                r < 3
              }
            }.map {
              case (r, s) => (r + 1, s)
            }
            val toSink = Flow[(Int, Array[String])].filter {
              case (r, s) => {
                println("sink " + (r >= 3) + " " + r)
                r >= 3
              }
            }
            merge.preferred <~ toRetry <~ broadcastArray
            fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
          }
        }
        implicit val system = ActorSystem()
        implicit val _ = ActorMaterializer()
        val run: Future[Unit] = closed.run()
        import system.dispatcher
        run.onComplete {
          case _ => {
            println("finished")
            system.shutdown()
          }
        }
      }
    }`
Brad answered 8/9, 2015 at 13:37 Comment(0)
P
13

The Stream is never completed because the merge never signals completion.

After formatting your graph structure, it basically looks like:

//ignoring the preferred which is inconsequential

fileSource ~> merge ~> afterMerge ~> broadcastArray ~> toSink ~> sink
              merge <~ toRetry    <~ broadcastArray

The problem of non-completion is rooted in your merge step :

// 2 inputs into merge

fileSource ~> merge 
              merge <~ toRetry

Once the fileSource has emitted its single element (namely (0, Array.empty[String])) it sends out a complete message to merge.

However, the fileSource's completion message gets blocked at the merge. From the documentation:

akka.stream.scaladsl.MergePreferred

Completes when all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)

The merge will not send out complete until all of its input streams have completed.

// fileSource is complete ~> merge 
//                           merge <~ toRetry is still running

// complete fileSource + still running toRetry = still running merge

Therefore, merge will wait until toRetry also completes. But toRetry will never complete because it is waiting for merge to complete.

If you want your specific graph to complete after fileSource completes then just set eagerClose=True which will cause merge to complete once fileSource completes. E.g.:

//Add this true                                             |
//                                                          V
val merge = b.add(MergePreferred[(Int, Array[String])](1, true).named("merge")

Without the Stream Cycle

A simpler solution exists for your problem. Just use a single Flow.map stage which utilizes a tail recursive function:

//Note: there is no use of akka in this implementation

type FileInputType = (Int, Array[String])

@scala.annotation.tailrec
def recursiveRetry(fileInput : FileInputType) : FileInputType = 
  fileInput match { 
    case (r,_) if r >= 3  => fileInput
    case (r,a)            => recursiveRetry((r+1, a))
  }    

Your stream would then be reduced to

//ring-fenced akka code

val recursiveRetryFlow = Flow[FileInputType] map recursiveRetry

fileSource ~> recursiveRetryFlow ~> toSink ~> sink

The result is a cleaner stream & it avoids mixing "business logic" with akka code. This allows unit testing of the retry functionality completely independent from any third party library. The retry loop you have embedded in your stream is the "business logic". Therefore the mixed implementation is tightly coupled to akka going forward, for better or worse.

Also, in the segregated solution the cycle is contained in a tail recursive function, which is idiomatic Scala.

Proprietor answered 27/11, 2015 at 17:58 Comment(1)
Hi Ramon, I ran into same kind of issue lately, can you please have a look at it. #66725339Gentian

© 2022 - 2024 — McMap. All rights reserved.