Why is the spark.implicits._ import not helping with encoder derivation inside a method?
So, importing an implicit member from a created instance works as expected,

object Test extends App {
  class Bag {
    implicit val ssss: String = "omg"

  def call(): Unit = {
    val bag = new Bag

    import bag._

    val s = implicitly[String]



But, if I try doing the same with spark.implicits._

object Test extends App {
  val spark: SparkSession = ...

  def call(): Unit = {
    import spark.implicits._

    case class Person(id: Long, name: String)

    // I can summon an existing encoder
    // val enc = implicitly[Encoder[Long]]

    // but encoder derivation is failing for some reason
    // val encP = implicitly[Encoder[Person]]

    val df: Dataset[Person] =
      spark.range(10).map(i => Person(i, i.toString))


It fails to derive the Encoder[Person],

Unable to find encoder for type Person. An implicit Encoder[Person] is needed to store Person instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
        .map(i => Person(i, i.toString)

But, it works if I create the dataframe outside the method,

object Test extends App {
  val spark: SparkSession = ...

  import spark.implicits._

  case class Person(id: Long, name: String)

  val df: Dataset[Person] =
    spark.range(10).map(i => Person(i, i.toString))


Tested with Scala version 2.13.10 and 2.12.17 with Spark version 3.3.1.

AFAIK it's more the case class definition being outside of the method than the dataframeScramble
You are right... Another piece of knowledge. Don't use local classes unless explicitly required.Spectroscopy
A general comment to put things into context. Serialization is hard, and in fact undecidable time to time, especially when you are trying to serialize functions like Spark. There are various very intelligent work out there on this but rather than dealing with this issue in depth, Spark made a set of rather arbitrary looking decisions to work around it. So it doesn't support the full set of features Scala itself offers. This is for example why, circe can auto derive a local class encoder but spark cant. Unfortunately these decisions aren't well documented and the error messages aren't helpful.Scramble
@Scramble Yes, local classes just don't get a TypeTag. And Spark encoder derivation requires TypeTag because it also needs the serde which are being acuired by using reflection based on TypeTag.Spectroscopy
@Spectroscopy You can try to add TypeTag manually #11495288 #59474234 #73836819 #74250359Fantom
Local Person has WeakTypeTag.Fantom
I submitted a PR to support local classes github.com/apache/spark/pull/38740Fantom
By the way, weakTypeOf[Person] =:= weakTypeOf[Person] and symbolOf[Person] == symbolOf[Person] are false scastie.scala-lang.org/DmytroMitin/69tMXa7SR4iF30qmGqxfzgFantom

implicit def ttag[A: WeakTypeTag]: TypeTag[A] = {
  val ttag = null // hiding implicit by name
  val wttagImpl = weakTypeTag[A].asInstanceOf[WeakTypeTag[A] {val mirror: Mirror; val tpec: TypeCreator}]
  TypeTag[A](wttagImpl.mirror, wttagImpl.tpec)

java.lang.NoClassDefFoundError: no Java class corresponding to Person found


Similar results is for

def typeToTypeTag[T](
                      tpe: Type,
                      mirror: api.Mirror[universe.type]
                    ): TypeTag[T] = {
  TypeTag(mirror, new TypeCreator {
    def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) = {
      assert(m eq mirror, s"TypeTag[$tpe] defined in $mirror cannot be migrated to $m.")

implicit def ttag[T: WeakTypeTag]: TypeTag[T] = {
  val ttag = null
  typeToTypeTag(weakTypeOf[T], mirror)

java.lang.NoClassDefFoundError: no Java class corresponding to Person found


implicit val personTtag: TypeTag[Person] = {
  val personTtag = null

scala.tools.reflect.ToolBoxError: reflective toolbox failed due to unresolved free type variables


Well, now this doesn't work for a generic local class with type parameter that is a generic local class

val df: Dataset[Person[Person[Int]]] =
  spark.range(10).map(i => Person(i, i.toString, Person(i, i.toString, i.toInt)))

scala.ScalaReflectionException: free type Person is not a class


The local case class is the reason for provided behaviour. Local class has so called free type and more about that you can check here. You may try to experiment adding TypeTag for Person in local scope to see it it may help.

Tanning answered 8/11, 2022 at 15:37 Comment(10)
can you elaborate a little bit more on the connection of free type and typetag pls?Obstetric
Interestingly... I am not getting any compiler message despite having both -Xlog-free-types and -Xlog-free-terms. Do these compiler flags even work in Scala 2.12.x or 2.13.x ?Spectroscopy
@Spectroscopy You can just call the method isFreeType scastie.scala-lang.org/DmytroMitin/0fzp6dbiS4uvw9MuYcMw6A/3Fantom
@Spectroscopy "Do these compiler flags even work in Scala 2.12.x or 2.13.x" docs.scala-lang.org/overviews/compiler-options/index.html "-Vfree-terms or -Xlog-free-terms Print a message when reification creates a free term. -Vfree-types or -Xlog-free-types Print a message when reification resorts to generating a free type."Fantom
@DmytroMitin I think the Person case class in second example should be resulting in both free term message and a free type message. Or at least one of these. But I am not seeing any.Spectroscopy
@Spectroscopy Pay attention to the word "reification" in these settings description I quoted. These settings are for debugging reification i.e. reify{...} or qusiquote q"..." in macros (or maybe also runtime toolbox-generated ASTs), not in ordinary code.Fantom
@Spectroscopy In ordinary code you can try the runtime-reflection method isFreeType(..) that I mentioned (maybe something about terms too). Or you can try these settings in reified code.Fantom
@Spectroscopy For example without -Xlog-free-types in the following code scastie.scala-lang.org/DmytroMitin/1xt3JhbPQLGP9zyddhAdyw/7 the macro doesn't produce warnings but the code calling the macro doesn't compile (Macro expansion contains free type variable Person2 defined by myMacroImpl in main.scala:12:16. Have you forgotten to use c.WeakTypeTag annotation for this type parameter? If you have troubles tracking free type variables, consider using -Xlog-free-types).Fantom
@Spectroscopy But with -Xlog-free-types the macro itself produces a warning scastie.scala-lang.org/DmytroMitin/1xt3JhbPQLGP9zyddhAdyw/8 (free type: Ident(TypeName("Person2")) defined by myMacroImpl in main.scala:14:16)Fantom
@Spectroscopy Also #38206171Fantom

As you already found out yourself, local Person doesn't have TypeTag. But it has WeakTypeTag (and ClassTag). Let's try to define Encoder for such class.

Naive approach with constructing TypeTag doesn't work

How to create a TypeTag manually?

In scala 2.12, why none of the TypeTag created in runtime is serializable?

Scala Spark Encoders.product[X] (where X is a case class) keeps giving me "No TypeTag available for X" error

Spark: DF.as[Type] fails to compile

implicit def ttag[A: WeakTypeTag]: TypeTag[A] = {
  val ttag = null // hiding implicit by name
  val wttagImpl = weakTypeTag[A].asInstanceOf[WeakTypeTag[A] {val mirror: Mirror; val tpec: TypeCreator}]
  TypeTag[A](wttagImpl.mirror, wttagImpl.tpec)

java.lang.NoClassDefFoundError: no Java class corresponding to Person found


Similar results is for

def typeToTypeTag[T](
                      tpe: Type,
                      mirror: api.Mirror[universe.type]
                    ): TypeTag[T] = {
  TypeTag(mirror, new TypeCreator {
    def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) = {
      assert(m eq mirror, s"TypeTag[$tpe] defined in $mirror cannot be migrated to $m.")

implicit def ttag[T: WeakTypeTag]: TypeTag[T] = {
  val ttag = null
  typeToTypeTag(weakTypeOf[T], mirror)

java.lang.NoClassDefFoundError: no Java class corresponding to Person found


implicit val personTtag: TypeTag[Person] = {
  val personTtag = null

scala.tools.reflect.ToolBoxError: reflective toolbox failed due to unresolved free type variables


This is how Encoder[T] is defined for T having TypeTag

implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]

object Encoders {
  def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()

object ExpressionEncoder {
  def apply[T : TypeTag](): ExpressionEncoder[T] = {
    val mirror = ScalaReflection.mirror
    val tpe = typeTag[T].in(mirror).tpe

    val cls = mirror.runtimeClass(tpe)
    val serializer = ScalaReflection.serializerForType(tpe)
    val deserializer = ScalaReflection.deserializerForType(tpe)

    new ExpressionEncoder[T](

Let's try to modify it for T having WeakTypeTag and ClassTag

implicit def apply[T: WeakTypeTag /*: ClassTag*/]: Encoder[T] = {
  val tpe = weakTypeTag[T].in(mirror).tpe

  val cls = mirror.runtimeClass(tpe)
  val serializer = ScalaReflection.serializerForType(tpe)
  val deserializer = ScalaReflection.deserializerForType(tpe)

  new ExpressionEncoder[T](

java.lang.NoClassDefFoundError: no Java class corresponding to Person found


// (*)
implicit def apply[T/*: WeakTypeTag*/ : ClassTag]: Encoder[T] = {
  val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
  val serializer = ScalaReflection.serializerForType(tpe)
  val deserializer = ScalaReflection.deserializerForType(tpe)

  new ExpressionEncoder[T](

org.apache.spark.SparkException: Task not serializable

Caused by: java.io.NotSerializableException: Main


So finally let's make Main extend Serializable

| id|name|
|  0|   0|
|  1|   1|
|  2|   2|
|  3|   3|
|  4|   4|
|  5|   5|
|  6|   6|
|  7|   7|
|  8|   8|
|  9|   9|


So (*) is correct Encoder.

This doesn't seem to work with generic local Person

case class Person[T](id: Long, name: String, t: T)

java.lang.UnsupportedOperationException: No Encoder found for Person$1


scala.ScalaReflectionException: free type Person is not a class


For generic local class the encoder should be (using both WeakTypeTag and ClassTag)

implicit def apply[T: WeakTypeTag : ClassTag]: Encoder[T] = {
  val tpe0 = weakTypeTag[T].in(mirror).tpe
  val typeArgs = tpe0/*.dealias*/.typeArgs
  val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
  val tpe1 = appliedType(tpe.typeConstructor, typeArgs)
  val serializer = ScalaReflection.serializerForType(tpe1)
  val deserializer = ScalaReflection.deserializerForType(tpe1)

  new ExpressionEncoder[T](


Well, now this doesn't work for a generic local class with type parameter that is a generic local class

val df: Dataset[Person[Person[Int]]] =
  spark.range(10).map(i => Person(i, i.toString, Person(i, i.toString, i.toInt)))

scala.ScalaReflectionException: free type Person is not a class


Improved version of codec:

case class Application(tycon: ClassTag[_], targs: List[Application])

class DeepClassTag[T](val classTags: Application)
object DeepClassTag {
  def apply[T: DeepClassTag]: DeepClassTag[T] = implicitly[DeepClassTag[T]]
  implicit def deepClassTag0[A: ClassTag]: DeepClassTag[A] =
    new DeepClassTag(Application(classTag[A], List()))
  implicit def deepClassTag11[A[_], B1](implicit tycon: ClassTag[A[_]], dct1: DeepClassTag[B1]): DeepClassTag[A[B1]] =
    new DeepClassTag(Application(tycon, List(dct1.classTags)))
  implicit def deepClassTag12[A[_,_], B1, B2](implicit tycon: ClassTag[A[_,_]], dct1: DeepClassTag[B1], dct2: DeepClassTag[B1]): DeepClassTag[A[B1, B2]] =
    new DeepClassTag(Application(tycon, List(dct1.classTags, dct2.classTags)))
  // ...
  implicit def deepClassTag2[A[_[_]], B1[_]](implicit tycon: ClassTag[A[B1]], dct1: DeepClassTag[B1[_]]): DeepClassTag[A[B1]] =
    new DeepClassTag(Application(tycon, List(dct1.classTags)))
  // ...

def improveStaticType[T: WeakTypeTag : DeepClassTag]: Type =
  improveDynamicType(weakTypeOf[T], DeepClassTag[T].classTags)

def improveDynamicType(tpe: Type, classTags: Application): Type = {
  val newTycon = improveFreeType(tpe, classTags.tycon.runtimeClass)
  val targs = tpe.dealias.typeArgs
  assert(targs.length == classTags.targs.length, s"( $targs ).length == ( ${classTags.targs} ).length")
  val newArgs = targs.zip(classTags.targs).map((improveDynamicType _).tupled)
  appliedType(newTycon, newArgs)

def improveFreeType(tpe: Type, cls: Class[_]): Type =
  if (internal.isFreeType(tpe.typeSymbol)) {
    val typeArgs = tpe.dealias.typeArgs
    val typeConstructor = mirror.classSymbol(cls).toType.typeConstructor
    appliedType(typeConstructor, typeArgs)
  } else tpe

implicit def enc[T: WeakTypeTag : ClassTag : DeepClassTag]: Encoder[T] = {
  val tpe = improveStaticType[T]
  val serializer = ScalaReflection.serializerForType(tpe)
  val deserializer = ScalaReflection.deserializerForType(tpe)

  new ExpressionEncoder[T](


DeepClassTag seems not to work with higher-kinded classes


Improved version but still not always working (there are too many shapes of type constructors)


Instead of manual definition of type-class instances for different shapes of type constructors, the type class DeepClassTag can be defined with macros as follows

import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.macros.whitebox

case class Application(tycon: ClassTag[_], targs: List[Application])

class DeepClassTag[T](val classTags: Application)

object DeepClassTag {
  def apply[T: DeepClassTag]: DeepClassTag[T] = implicitly[DeepClassTag[T]]

  implicit def mkDeepClassTag[T]/*(implicit tCtag: ClassTag[T])*/: DeepClassTag[T] =
    macro DeepClassTagMacros.mkDeepClassTagImpl[T]

class DeepClassTagMacros(val c: whitebox.Context) {
  import c.universe._

  def findInstance[TC[_]](tpe: Type)(implicit wttag: WeakTypeTag[TC[_]]): Tree =
      appliedType(weakTypeOf[TC[_]].typeConstructor, tpe),
      silent = false

  def mkDeepClassTagImpl[T: WeakTypeTag]/*(tCtag: c.Tree)*/ : Tree = {
    val T = weakTypeOf[T]

    val tCtag = findInstance[ClassTag](T)

    val targCtags = T.dealias.typeArgs.map(arg => {
      val argInst = findInstance[DeepClassTag](arg)

    val targClassTags = q"_root_.scala.List.apply[Application](..$targCtags)"

    q"new DeepClassTag[$T](Application($tCtag, $targClassTags))"

(Is it working?)

My PR to Spark to support local classes: https://github.com/apache/spark/pull/38740

Fantom answered 9/11, 2022 at 15:18 Comment(0)

