KryoException: Unable to find class with spark structured streaming
Asked Answered
P

1

7

1-The Problem

I have a Spark program that make use of Kryo but not as part of the Spark Mechanics. More specifically I am using Spark Structured Streaming connected to Kafka.

I read binary values coming from Kafka and decode it on my own.

I am faced with an Exception while attempting to deserialized Data with Kryo. This issue however only happens when I package my program and run it on a Spark Standalone Cluster. That is, it does not happen when I run it, within intellij i.e. as in Spark Local Mode (dev mode).

The exception that I get is as follow:

Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.elsevier.entellect.commons.package$RawData

Note that RawData is a case Class of my own, situated in one of the sub-project of my multi-project build.

To understand the context please find more details below:

2-build.sbt:

lazy val commonSettings = Seq(
  organization  := "com.elsevier.entellect",
  version       := "0.1.0-SNAPSHOT",
  scalaVersion  := "2.11.12",
  resolvers     += Resolver.mavenLocal,
  updateOptions := updateOptions.value.withLatestSnapshots(false)
)

lazy val entellectextractors = (project in file("."))
  .settings(commonSettings).aggregate(entellectextractorscommon, entellectextractorsfetchers, entellectextractorsmappers, entellectextractorsconsumers)

lazy val entellectextractorscommon = project
  .settings(
    commonSettings,
    libraryDependencies ++= Seq(
      "com.esotericsoftware" % "kryo" % "5.0.0-RC1",
      "com.github.romix.akka" %% "akka-kryo-serialization" % "0.5.0" excludeAll(excludeJpountz),
      "org.apache.kafka" % "kafka-clients" % "1.0.1",
      "com.typesafe.akka" %% "akka-stream" % "2.5.16",
      "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.4",
      "com.typesafe.akka" % "akka-slf4j_2.11" % "2.5.16",
      "ch.qos.logback" % "logback-classic" % "1.2.3"
    )
  )

lazy val entellectextractorsfetchers = project
  .settings(
    commonSettings,
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
      "com.typesafe.slick" %% "slick" % "3.2.3",
      "com.typesafe.slick" %% "slick-hikaricp" % "3.2.3",
      "com.lightbend.akka" %% "akka-stream-alpakka-slick" % "0.20") 
  )
  .dependsOn(entellectextractorscommon)

lazy val entellectextractorsconsumers = project
  .settings(
    commonSettings,
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-stream-kafka" % "0.22")
  )
  .dependsOn(entellectextractorscommon)

lazy val entellectextractorsmappers = project
  .settings(
      commonSettings,
      mainClass in assembly := Some("entellect.extractors.mappers.NormalizedDataMapper"),
      assemblyMergeStrategy in assembly := {
        case PathList("META-INF", "services", "org.apache.spark.sql.sources.DataSourceRegister") => MergeStrategy.concat
        case PathList("META-INF", xs @ _*) => MergeStrategy.discard
        case x => MergeStrategy.first},
      dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.5",
      dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.5",
      dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.9.5",
      dependencyOverrides += "org.apache.jena" % "apache-jena" % "3.8.0",
      libraryDependencies ++= Seq(
      "org.apache.jena" % "apache-jena" % "3.8.0",
      "edu.isi" % "karma-offline" % "0.0.1-SNAPSHOT",
      "org.apache.spark" % "spark-core_2.11" % "2.3.1" % "provided",
      "org.apache.spark" % "spark-sql_2.11" % "2.3.1" % "provided",
      "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1"
      //"com.datastax.cassandra" % "cassandra-driver-core" % "3.5.1"
    ))
  .dependsOn(entellectextractorscommon)



lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")

The sub-project that contains spark code is entellectextractorsmappers. The sub-project that contains the case class RawData that can't be found is entellectextractorscommon. entellectextractorsmappers explicitly depends on entellectextractorscommon.

3- The Difference between when I submit on a local standalone cluster and when I run on local development mode:

When I submit to the cluster my spark dependency are as follow:

  "org.apache.spark" % "spark-core_2.11" % "2.3.1" % "provided",
  "org.apache.spark" % "spark-sql_2.11" % "2.3.1" % "provided",

When I run in local Development mode (no submit script) they turn as such

  "org.apache.spark" % "spark-core_2.11" % "2.3.1",
  "org.apache.spark" % "spark-sql_2.11" % "2.3.1",

That is, in local dev I need to have the dependencies, while when submitting to the cluster in standalone mode, they are in the cluster already so I put them as provided.

4-How to I submit:

spark-submit --class entellect.extractors.mappers.DeNormalizedDataMapper --name DeNormalizedDataMapper --master spark://MaatPro.local:7077  --deploy-mode cluster --executor-memory 14G --num-executors 1 --conf spark.sql.shuffle.partitions=7 "/Users/maatari/IdeaProjects/EntellectExtractors/entellectextractorsmappers/target/scala-2.11/entellectextractorsmappers-assembly-0.1.0-SNAPSHOT.jar"

5-How I use Kryo:

5.1-Declaration and Registration

In the the entellectextractorscommon project I have a package object with the following:

package object commons {

  case class RawData(modelName: String,
                     modelFile: String,
                     sourceType: String,
                     deNormalizedVal: String,
                     normalVal: Map[String, String])

  object KryoContext {
    lazy val kryoPool = new Pool[Kryo](true, false, 16) {
      protected def create(): Kryo = {
        val kryo = new Kryo()
        kryo.setRegistrationRequired(false)
        kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
        kryo
      }
    }

    lazy val outputPool = new Pool[Output](true, false, 16) {
      protected def create: Output = new Output(4096)
    }

    lazy val inputPool = new Pool[Input](true, false, 16) {
      protected def create: Input = new Input(4096)
    }
  }

  object ExecutionContext {

    implicit lazy val system  = ActorSystem()
    implicit lazy val mat     = ActorMaterializer()
    implicit lazy val ec      = system.dispatcher

  }

}

5.2-Usage

In entellectextractorsmappers (where the spark program is), I work with mapMartition. In it, I have a method to decode the Data coming from kafka that makes use of Kryo as such:

def decodeData(rowOfBinaryList: List[Row], kryoPool: Pool[Kryo], inputPool: Pool[Input]): List[RawData] = {

    val kryo = kryoPool.obtain()
    val input = inputPool.obtain()
    val data = rowOfBinaryList.map(r => r.getAs[Array[Byte]]("message")).map{ binaryMsg =>
      input.setInputStream(new ByteArrayInputStream(binaryMsg))
      val value = kryo.readClassAndObject(input).asInstanceOf[RawData]
      input.close()
      value
    }
    kryoPool.free(kryo)
    inputPool.free(input)
    data
  }

Note: The object KryoContext + Lazy val ensure, that kryoPool is instantiated once per JVM. I don't think the issue comes from that however.

I red in some other place a hint about issues of classLoaders used by spark vs Kryo? But not sure to really understand what is going on.

If someone could give me some pointers, that would help, because I have no idea of where to start. Why would it work in local mode and not in cluster mode, does the provided mess the dependency and create some issue with Kryo ? Is it the SBT Assembly merge Strategy that messes up ?

Many pointers possible, if anyone could help me narrow that, that would be awesome !

Percival answered 29/9, 2018 at 21:35 Comment(0)
P
1

So far,

I have solved that problem by picking up the "enclosing" class loader which I suppose is the one from Spark. This is after readying few comments here and there about issue with Class Loader between Kryo and Spark:

lazy val kryoPool = new Pool[Kryo](true, false, 16) {
      protected def create(): Kryo = {
        val cl = Thread.currentThread().getContextClassLoader()
        val kryo = new Kryo()
        kryo.setClassLoader(cl)
        kryo.setRegistrationRequired(false)
        kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
        kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
        kryo
      }
    }
Percival answered 3/10, 2018 at 22:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.