I'm currently trying to dynamically create Akka Stream graph definitions at runtime. The idea being that users will be able to define flows interactively and attach them to existing/running BroadcastHubs
. This means I don't know which flows or even how many flows will be used at compile time.
Unfortunately, I'm struggling with generics/type erasure. Frankly, I'm not even sure what I'm attempting to do is possible on the JVM.
I have a function that will return an Akka Streams Flow
representing two connected Flows
. It uses Scala's TypeTags
to get around type erasure. If the output type of the first flow is the same as the input type of the second flow, it can be successfully connected. This works just fine.
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Flow, GraphDSL}
import scala.reflect.runtime.universe._
import scala.util.{Failure, Success, Try}
def connect[A: TypeTag, B: TypeTag, C: TypeTag, D: TypeTag](a: Flow[A, B, NotUsed],
b: Flow[C, D, NotUsed]): Try[Flow[A, D, NotUsed]] = {
Try {
if (typeOf[B] =:= typeOf[C]) {
val c = b.asInstanceOf[Flow[B, D, NotUsed]]
Flow.fromGraph {
GraphDSL.create(a, c)((m1, m2) => NotUsed.getInstance()) { implicit b =>
(s1, s2) =>
s1 ~> s2
FlowShape(s1.in, s2.out)
}
}
}
else
throw new RuntimeException(s"Connection failed. Incompatible types: ${typeOf[B]} and ${typeOf[C]}")
}
}
So If I have Flow[A,B]
and Flow[C,D]
, the result would be Flow[A,D]
assuming that B and C are the same type.
I also have function that attempts to merge/reduce a List
of Flows
down to a single Flow
. Lets assume that this list is derived from a list of flow definitions from a file or web request.
def merge(fcs: List[Flow[_, _, NotUsed]]): Try[Option[Flow[_, _, NotUsed]]] = {
fcs match {
case Nil => Success(None)
case h :: Nil => Success(Some(h))
case h :: t =>
val n = t.head
connect(h, n) match {
case Success(fc) => merge(fc :: t)
case Failure(e) => Failure(e)
}
}
}
Unfortunately, since the Flows
are stored inside a List
, due to type erasure on standard Lists
, I lose all of the type information and therefore am unable to connect the Flows
at runtime. Here's an example:
def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)
val a = flowIdentity[Int]()
val b = flowIdentity[Int]()
val c = flowI2S()
val d = flowIdentity[String]()
val fcs: List[Flow[_, _, NotUsed]] = List(a, b, c, d)
val y = merge(fcs)
This results in the exception:
Failure(java.lang.RuntimeException: Connection failed. Incompatible types _$4 and _$3)
I've been looking into Miles Sabin'sShapeless, and thought I might be able to use HLists
to retain type information. Unfortunately, that seems to work only if I know the individual types and length of the list at compile time. If I upcast a specific HList
to just HList
, it looks like I lose the type information again.
val fcs: HList = a :: b :: c :: d :: HNil
So my question is... is this even possible? Is there a way to do this with Shapeless generics magic (preferably without the need to use specific non-existential type extractors)? I'd like to find as generic a solution as possible, and any help would be appreciated.
Thanks!
Flow[A, B]
andFlow[B, C]
– Valarievalda