As you already found out yourself, local Person
doesn't have TypeTag
. But it has WeakTypeTag
(and ClassTag
). Let's try to define Encoder
for such class.
Naive approach with constructing TypeTag
doesn't work
How to create a TypeTag manually?
In scala 2.12, why none of the TypeTag created in runtime is serializable?
Scala Spark Encoders.product[X] (where X is a case class) keeps giving me "No TypeTag available for X" error
Spark: DF.as[Type] fails to compile
implicit def ttag[A: WeakTypeTag]: TypeTag[A] = {
val ttag = null // hiding implicit by name
val wttagImpl = weakTypeTag[A].asInstanceOf[WeakTypeTag[A] {val mirror: Mirror; val tpec: TypeCreator}]
TypeTag[A](wttagImpl.mirror, wttagImpl.tpec)
}
java.lang.NoClassDefFoundError: no Java class corresponding to Person found
https://gist.github.com/DmytroMitin/41b7439d2e504e37f29b02e3500d24b1
Similar results is for
def typeToTypeTag[T](
tpe: Type,
mirror: api.Mirror[universe.type]
): TypeTag[T] = {
TypeTag(mirror, new TypeCreator {
def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) = {
assert(m eq mirror, s"TypeTag[$tpe] defined in $mirror cannot be migrated to $m.")
tpe.asInstanceOf[U#Type]
}
})
}
implicit def ttag[T: WeakTypeTag]: TypeTag[T] = {
val ttag = null
typeToTypeTag(weakTypeOf[T], mirror)
}
java.lang.NoClassDefFoundError: no Java class corresponding to Person found
https://gist.github.com/DmytroMitin/c7a24abf1ff1011a1c87aa9d161d6395
implicit val personTtag: TypeTag[Person] = {
val personTtag = null
tb.eval(q"org.apache.spark.sql.catalyst.ScalaReflection.universe.typeTag[${weakTypeOf[Person]}]")
.asInstanceOf[TypeTag[Person]]
}
scala.tools.reflect.ToolBoxError: reflective toolbox failed due to unresolved free type variables
https://gist.github.com/DmytroMitin/6e35c0332f845fcd227d35ec49d4122f
This is how Encoder[T]
is defined for T
having TypeTag
implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]
object Encoders {
def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
}
object ExpressionEncoder {
def apply[T : TypeTag](): ExpressionEncoder[T] = {
val mirror = ScalaReflection.mirror
val tpe = typeTag[T].in(mirror).tpe
val cls = mirror.runtimeClass(tpe)
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
ClassTag[T](cls)
)
}
}
Let's try to modify it for T
having WeakTypeTag
and ClassTag
implicit def apply[T: WeakTypeTag /*: ClassTag*/]: Encoder[T] = {
val tpe = weakTypeTag[T].in(mirror).tpe
val cls = mirror.runtimeClass(tpe)
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
ClassTag[T](cls)
)
}
java.lang.NoClassDefFoundError: no Java class corresponding to Person found
https://gist.github.com/DmytroMitin/b58848fa6575b6fab0e9b8285095cc60
// (*)
implicit def apply[T/*: WeakTypeTag*/ : ClassTag]: Encoder[T] = {
val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: Main
https://gist.github.com/DmytroMitin/0c86933f96e136d44fff555295ce01dd
So finally let's make Main
extend Serializable
+---+----+
| id|name|
+---+----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
+---+----+
https://gist.github.com/DmytroMitin/0e9b0bd2ed6237a4a1e1c40d620a9d88
So (*) is correct Encoder
.
This doesn't seem to work with generic local Person
case class Person[T](id: Long, name: String, t: T)
java.lang.UnsupportedOperationException: No Encoder found for Person$1
https://gist.github.com/DmytroMitin/69496ce257fc9a3a7a5fbd004c52dcc0
scala.ScalaReflectionException: free type Person is not a class
https://gist.github.com/DmytroMitin/07bfe954dca677f0a39c06779b94280e
For generic local class the encoder should be (using both WeakTypeTag
and ClassTag
)
implicit def apply[T: WeakTypeTag : ClassTag]: Encoder[T] = {
val tpe0 = weakTypeTag[T].in(mirror).tpe
val typeArgs = tpe0/*.dealias*/.typeArgs
val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
val tpe1 = appliedType(tpe.typeConstructor, typeArgs)
val serializer = ScalaReflection.serializerForType(tpe1)
val deserializer = ScalaReflection.deserializerForType(tpe1)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}
https://gist.github.com/DmytroMitin/08c8f21ffb1427bfa15dd21fbdfb77fa
Well, now this doesn't work for a generic local class with type parameter that is a generic local class
val df: Dataset[Person[Person[Int]]] =
spark.range(10).map(i => Person(i, i.toString, Person(i, i.toString, i.toInt)))
scala.ScalaReflectionException: free type Person is not a class
https://gist.github.com/DmytroMitin/5bceb2b81f2391c5c312a045edb827a8
Improved version of codec:
case class Application(tycon: ClassTag[_], targs: List[Application])
class DeepClassTag[T](val classTags: Application)
object DeepClassTag {
def apply[T: DeepClassTag]: DeepClassTag[T] = implicitly[DeepClassTag[T]]
implicit def deepClassTag0[A: ClassTag]: DeepClassTag[A] =
new DeepClassTag(Application(classTag[A], List()))
implicit def deepClassTag11[A[_], B1](implicit tycon: ClassTag[A[_]], dct1: DeepClassTag[B1]): DeepClassTag[A[B1]] =
new DeepClassTag(Application(tycon, List(dct1.classTags)))
implicit def deepClassTag12[A[_,_], B1, B2](implicit tycon: ClassTag[A[_,_]], dct1: DeepClassTag[B1], dct2: DeepClassTag[B1]): DeepClassTag[A[B1, B2]] =
new DeepClassTag(Application(tycon, List(dct1.classTags, dct2.classTags)))
// ...
implicit def deepClassTag2[A[_[_]], B1[_]](implicit tycon: ClassTag[A[B1]], dct1: DeepClassTag[B1[_]]): DeepClassTag[A[B1]] =
new DeepClassTag(Application(tycon, List(dct1.classTags)))
// ...
}
def improveStaticType[T: WeakTypeTag : DeepClassTag]: Type =
improveDynamicType(weakTypeOf[T], DeepClassTag[T].classTags)
def improveDynamicType(tpe: Type, classTags: Application): Type = {
val newTycon = improveFreeType(tpe, classTags.tycon.runtimeClass)
val targs = tpe.dealias.typeArgs
assert(targs.length == classTags.targs.length, s"( $targs ).length == ( ${classTags.targs} ).length")
val newArgs = targs.zip(classTags.targs).map((improveDynamicType _).tupled)
appliedType(newTycon, newArgs)
}
def improveFreeType(tpe: Type, cls: Class[_]): Type =
if (internal.isFreeType(tpe.typeSymbol)) {
val typeArgs = tpe.dealias.typeArgs
val typeConstructor = mirror.classSymbol(cls).toType.typeConstructor
appliedType(typeConstructor, typeArgs)
} else tpe
implicit def enc[T: WeakTypeTag : ClassTag : DeepClassTag]: Encoder[T] = {
val tpe = improveStaticType[T]
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}
https://gist.github.com/DmytroMitin/56044515e031fcf1e977ab213013861d
DeepClassTag
seems not to work with higher-kinded classes
https://gist.github.com/DmytroMitin/6388a437507e8389f30230e08382d9ff
Improved version but still not always working (there are too many shapes of type constructors)
https://gist.github.com/DmytroMitin/2625ee20695404c6fc118ab8680808f2
Instead of manual definition of type-class instances for different shapes of type constructors, the type class DeepClassTag
can be defined with macros as follows
import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.macros.whitebox
case class Application(tycon: ClassTag[_], targs: List[Application])
class DeepClassTag[T](val classTags: Application)
object DeepClassTag {
def apply[T: DeepClassTag]: DeepClassTag[T] = implicitly[DeepClassTag[T]]
implicit def mkDeepClassTag[T]/*(implicit tCtag: ClassTag[T])*/: DeepClassTag[T] =
macro DeepClassTagMacros.mkDeepClassTagImpl[T]
}
class DeepClassTagMacros(val c: whitebox.Context) {
import c.universe._
def findInstance[TC[_]](tpe: Type)(implicit wttag: WeakTypeTag[TC[_]]): Tree =
c.inferImplicitValue(
appliedType(weakTypeOf[TC[_]].typeConstructor, tpe),
silent = false
)
def mkDeepClassTagImpl[T: WeakTypeTag]/*(tCtag: c.Tree)*/ : Tree = {
val T = weakTypeOf[T]
val tCtag = findInstance[ClassTag](T)
val targCtags = T.dealias.typeArgs.map(arg => {
val argInst = findInstance[DeepClassTag](arg)
q"$argInst.classTags"
})
val targClassTags = q"_root_.scala.List.apply[Application](..$targCtags)"
q"new DeepClassTag[$T](Application($tCtag, $targClassTags))"
}
}
(Is it working?)
My PR to Spark to support local classes: https://github.com/apache/spark/pull/38740
case class
definition being outside of the method than the dataframe – ScrambleTypeTag
. And Spark encoder derivation requiresTypeTag
because it also needs the serde which are being acuired by using reflection based onTypeTag
. – SpectroscopyPerson
hasWeakTypeTag
. – FantomweakTypeOf[Person] =:= weakTypeOf[Person]
andsymbolOf[Person] == symbolOf[Person]
arefalse
scastie.scala-lang.org/DmytroMitin/69tMXa7SR4iF30qmGqxfzg – Fantom