Spark Dataset : Example : Unable to generate an encoder issue
Asked Answered
B

1

8

New to spark world and trying a dataset example written in scala that I found online

On running it through SBT , i keep on getting the following error

org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class

Any idea what am i overlooking

Also feel free to point out better way of writing the same dataset example

Thanks

> sbt>  runMain DatasetExample

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/10/25 01:06:39 INFO Remoting: Starting remoting
16/10/25 01:06:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:50555]
[error] (run-main-6) org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:306)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:302)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:302)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:90)
at org.apache.spark.sql.DataFrame.as(DataFrame.scala:209)
at DatasetExample$.main(DatasetExample.scala:45)
at DatasetExample.main(DatasetExample.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
[trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output.
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output.
[error] (sparkExamples/compile:runMain) Nonzero exit code: 1
[error] Total time: 127 s, completed Oct 25, 2016 1:08:09 AM

Code :

import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._

object DatasetExample  {
   // Create data sets 
   case class Student(name: String, dept: String, age:Long )
   case class Department(abbrevName: String, fullName: String)

   org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Not sure what exactly is the purpose

   def main(args: Array[String]) {
      Logger.getLogger("org").setLevel(Level.OFF)
      Logger.getLogger("akka").setLevel(Level.OFF)
      // initialise spark context
      val conf = new SparkConf().setAppName("SetsExamples").setMaster("local")
      val sc = new SparkContext(conf)
      val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

      import sqlcontext.implicits._   // Not sure what exactly is the purpose

      // Read JSON objects into a Dataset[Student].
      val students = sqlcontext.read.json("student.json").as[Student]
      students.show()

      // Select two columns and filter on one column.
      // Each argument of "select" must be a "TypedColumn".
      students.select($"name".as[String], $"dept".as[String]).
                   filter(_._2 == "Math").  // Filter on _2, the second selected column
                   collect()

      // Group by department and count each group.
      students.groupBy(_.dept).count().collect()

      // Group and aggregate in each group.
      students.groupBy(_.dept).
                  agg(avg($"age").as[Double]).
                  collect()

      // Initialize a Seq and convert to a Dataset.
      val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()

      // Show the contents of the Dataset.
      depts.show()

      // Join two datasets with "joinWith".
      val joined = students.joinWith(depts, $"dept" === $"abbrevName")

      // Show the contents of the joined Dataset.
      // Note that the original objects are nested into tuples under the _1 and _2 columns.
      joined.show()

      // terminate spark context
      sc.stop()

      }        
}

JSON file ( student.json) :

{"id" : "1201", "name" : "Kris", "age" : "25"}
{"id" : "1202", "name" : "John", "age" : "28"}
{"id" : "1203", "name" : "Chet", "age" : "39"}
{"id" : "1204", "name" : "Mark", "age" : "23"}
{"id" : "1205", "name" : "Vic", "age" : "23"}
Buddhology answered 25/10, 2016 at 1:23 Comment(0)
W
11

This line is what is causing the problem :

org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

This means that you are adding a new outer scope to this context that can be used when instantiating an inner class during deserialization.

Inner classes are created when a case class is defined in the Spark REPL and registering the outer scope that this class was defined in allows us to create new instances on the spark executors.

In normal use (your case), you shouldn't need to call this function.

EDIT: You'll also need to move your case classes outside of the DatasetExample object.

Note:

import sqlContext.implicits._ is a scala-specific call for implicit methods available for converting common scala RDD objects into DataFrames.

More on that here.

Warenne answered 25/10, 2016 at 6:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.