Dynamically creating Akka Stream Flows at Runtime
Asked Answered
I

2

14

I'm currently trying to dynamically create Akka Stream graph definitions at runtime. The idea being that users will be able to define flows interactively and attach them to existing/running BroadcastHubs. This means I don't know which flows or even how many flows will be used at compile time.

Unfortunately, I'm struggling with generics/type erasure. Frankly, I'm not even sure what I'm attempting to do is possible on the JVM.

I have a function that will return an Akka Streams Flow representing two connected Flows. It uses Scala's TypeTags to get around type erasure. If the output type of the first flow is the same as the input type of the second flow, it can be successfully connected. This works just fine.

import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Flow, GraphDSL}

import scala.reflect.runtime.universe._
import scala.util.{Failure, Success, Try}

def connect[A: TypeTag, B: TypeTag, C: TypeTag, D: TypeTag](a: Flow[A, B, NotUsed],
                                                            b: Flow[C, D, NotUsed]): Try[Flow[A, D, NotUsed]] = {
  Try {
    if (typeOf[B] =:= typeOf[C]) {
      val c = b.asInstanceOf[Flow[B, D, NotUsed]]

      Flow.fromGraph {
        GraphDSL.create(a, c)((m1, m2) => NotUsed.getInstance()) { implicit b =>
          (s1, s2) =>
            s1 ~> s2
            FlowShape(s1.in, s2.out)
        }
      }
    }
    else
      throw new RuntimeException(s"Connection failed. Incompatible types: ${typeOf[B]} and ${typeOf[C]}")
  }
}

So If I have Flow[A,B] and Flow[C,D], the result would be Flow[A,D] assuming that B and C are the same type.

I also have function that attempts to merge/reduce a List of Flows down to a single Flow. Lets assume that this list is derived from a list of flow definitions from a file or web request.

def merge(fcs: List[Flow[_, _, NotUsed]]): Try[Option[Flow[_, _, NotUsed]]] = {
  fcs match {
    case Nil => Success(None)
    case h :: Nil => Success(Some(h))
    case h :: t =>
      val n = t.head

      connect(h, n) match {
        case Success(fc) => merge(fc :: t)
        case Failure(e) => Failure(e)
      }
  }
}

Unfortunately, since the Flows are stored inside a List, due to type erasure on standard Lists, I lose all of the type information and therefore am unable to connect the Flows at runtime. Here's an example:

def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)

def flowI2S() = Flow.fromFunction[Int, String](_.toString)

val a = flowIdentity[Int]()
val b = flowIdentity[Int]()
val c = flowI2S()
val d = flowIdentity[String]()

val fcs: List[Flow[_, _, NotUsed]] = List(a, b, c, d)

val y = merge(fcs)

This results in the exception:

Failure(java.lang.RuntimeException: Connection failed. Incompatible types _$4 and _$3)

I've been looking into Miles Sabin'sShapeless, and thought I might be able to use HLists to retain type information. Unfortunately, that seems to work only if I know the individual types and length of the list at compile time. If I upcast a specific HList to just HList, it looks like I lose the type information again.

val fcs: HList = a :: b :: c :: d :: HNil

So my question is... is this even possible? Is there a way to do this with Shapeless generics magic (preferably without the need to use specific non-existential type extractors)? I'd like to find as generic a solution as possible, and any help would be appreciated.

Thanks!

Interoceptor answered 26/4, 2017 at 16:30 Comment(3)
First of all, in the first code example all the types are known at compile-time, so it can be simplified to 3 types instead of 4: Flow[A, B] and Flow[B, C]Valarievalda
Just curious: Have you considered compiling the user created graphs? i.e. generating scala code from some DSL, then compiling, then loading into runtime?Arriviste
That's actually a neat idea... I hadn't though of that. This would provide the needed type safety at the time someone is creating the graph. Now I just need to deal with class loader magic... thanks!Interoceptor
C
0

I know this is an old post. As I had some time I gave it a try. Not sure this is exactly the solution, but I thought would post and get suggestions.

  type FlowN[A, B] = Flow[A, B, NotUsed]

  trait FlowMerger[L <: HList] {
    type A
    type D
    def merge(flow: L): Option[FlowN[A, D]]
  }

  object FlowMerger extends LowPriorityImplicits {
    def apply[L <: HList](v: L)(implicit ev: FlowMerger[L]): Option[FlowN[ev.A, ev.D]] = ev.merge(v)

    type Aux[L <: HList, A1, D1] = FlowMerger[L] {
      type A = A1
      type D = D1
    }

    implicit def h1Instance[A1, D1]: FlowMerger.Aux[FlowN[A1, D1] :: HNil, A1, D1] = new FlowMerger[FlowN[A1, D1] :: HNil] {
      override type A = A1
      override type D = D1

      override def merge(flow: FlowN[A1, D1] :: HNil): Option[FlowN[A, D]] = Option(flow.head)
    }

  }

  trait LowPriorityImplicits {
    implicit def hMulInstance[A1, B1, D1, E1, F1, L <: HList, T <: HList, T1 <: HList]
     (implicit
     isHC1: IsHCons.Aux[L, FlowN[A1, B1], T],
     isHC2: IsHCons.Aux[T, FlowN[E1, F1], T1],
     lx: Lazy[FlowMerger[T]],
     typeableB: Lazy[Typeable[B1]],
     typeableE: Lazy[Typeable[E1]]
    ): FlowMerger.Aux[L, A1, D1] = {
      new FlowMerger[L] {
        override type A = A1
        override type D = D1

        override def merge(flow: L): Option[FlowN[A, D]] = {
          if (typeableB.value == typeableE.value) {
            lx.value.merge(isHC1.tail(flow)).map(t => isHC1.head(flow) via t.asInstanceOf[FlowN[B1, D]])
          } else None
        }
      }
    }
  }

You can use it as:

  FlowMerger(fcs).map(flow => Source(List(1, 2, 3)) via flow runForeach println)
Causeway answered 21/1, 2023 at 7:33 Comment(0)
P
-1

As you already noticed, the reason it didn't work was that the list erases the types you had. Therefore it is impossible. If you know all of the types that can be used as intermediate types, you can solve that by adding a resolving function. Adding such a function will also simplify your connect method. I'll add a code snippet. I hope it will be clear.

def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)

def main(args: Array[String]): Unit = {
    val idInt1 = flowIdentity[Int]()
    val idInt2 = flowIdentity[Int]()
    val int2String = flowI2S()
    val idString = flowIdentity[String]()
    val fcs = List(idInt1, idInt2, int2String, idString)

    val source = Source(1 to 10)
    val mergedGraph = merge(fcs).get.asInstanceOf[Flow[Int, String, NotUsed]]
    source.via(mergedGraph).to(Sink.foreach(println)).run()
}

def merge(fcs: List[Flow[_, _, NotUsed]]): Option[Flow[_, _, NotUsed]] = {
    fcs match {
      case Nil => None
      case h :: Nil => Some(h)
      case h :: t =>
        val n = t.head

        val fc = resolveConnect(h, n)
        merge(fc :: t.tail)
    }
}

def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] = {
    if (a.isInstanceOf[Flow[_, Int, NotUsed]] && b.isInstanceOf[Flow[Int, _, NotUsed]]) {
      connectInt(a.asInstanceOf[Flow[_, Int, NotUsed]], b.asInstanceOf[Flow[Int, _, NotUsed]])
    } else if (a.isInstanceOf[Flow[_, String, NotUsed]] && b.isInstanceOf[Flow[String, _, NotUsed]]) {
      connectString(a.asInstanceOf[Flow[_, String, NotUsed]], b.asInstanceOf[Flow[String, _, NotUsed]])
    } else {
      throw new UnsupportedOperationException
    }
}

def connectInt(a: Flow[_, Int, NotUsed], b: Flow[Int, _, NotUsed]): Flow[_, _, NotUsed] = {
    a.via(b)
}

def connectString(a: Flow[_, String, NotUsed], b: Flow[String, _, NotUsed]): Flow[_, _, NotUsed] = {
   a.via(b)
}

p.s

There is another bug hiding there, of an endless loop. When calling the merge recursion, the first element should be dropped, as it was already merged into the main flow.

Polydactyl answered 14/7, 2020 at 7:39 Comment(7)
Your connect(a,b) already exists in Akka streams, it is called a.via(b). As for the resolveConnect function you provide, the casts you provide are actually all meaningless since they are casting erased types. In other words: those casts won't help you at all, even at runtime. For the casts to be useful, you'd have to map over the Flow and call asInstanceOf.Generalist
Regarding the via method. You are right. Thanks. I'll update the code. Regarding the "meaningless casting", please try to run the code before you state statements that are wrong.Polydactyl
I think I explained why the casts are meaningless: they are on erased types. Here's an example: val flowInt: Flow[_, _, NotUsed] = Flow[Int]; val flowString: Flow[_, _, NotUsed] = Flow[String]; resolveConnect(flowInt, flowString) causes no cast exception, despite the fact that a Flow[Int, Int, NotUsed] can definitely not be connected to a Flow[String, String, NotUsed].Generalist
How does your explanations go along with the fact that the updated code runs and prints the numbers 1 to 10?Polydactyl
The casts aren't checking/doing anything. You can cast even to a dummy type (and skip isInstanceOf) and your code will work: def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] = a.asInstanceOf[Flow[_, Nothing, NotUsed]].via(b.asInstanceOf[Flow[Nothing, _, NotUsed]]). The instanceOf checks bring a false sense of security: they aren't doing any runtime checks (hence why I call them "meaningless".Generalist
I am pretty sure that this is exactly the point of that question. The types were erased, and therefore the code did not compile. The fact that I checked the types and extracted it into a method, let's the compiler know that those are the correct types that are expected.Polydactyl
I don't think this discussion is going anywhere: in your question, no Int/String/etc. types are actually checked at runtime! The OP clearly cares about checking the types match since their runtime solution involves TypeTag and their compile time question uses HList. Compilation != type safety/soundness when you use _ and casts on erased types.Generalist

© 2022 - 2024 — McMap. All rights reserved.