Spark: Writing to Avro file
Asked Answered
S

3

17

I am in Spark, I have an RDD from an Avro file. I now want to do some transformations on that RDD and save it back as an Avro file:

val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
   .saveAsNewAPIHadoopFile(outputPath, 
  classOf[AvroKey[GenericRecord]], 
  classOf[org.apache.hadoop.io.NullWritable], 
  classOf[AvroKeyOutputFormat[GenericRecord]], 
  job.getConfiguration)

When running this Spark complains that Schema$recordSchema is not serializable.

If I uncomment the .map call (and just have rdd.saveAsNewAPIHadoopFile), the call succeeds.

What am I doing wrong here?

Any idea?

Scrupulous answered 16/12, 2013 at 13:51 Comment(2)
Could you please provide the exception stack trace? Spark, Hadoop and Avro version numbers might be useful too.Blaney
Please forgive my naiveness. May I ask what's the job doing here? Looks like it's a map reduce job? If we use spark to write out, why do we need a the map reduce job?Electro
E
5

The issue here is related to the non-serializability of the avro.Schema class used in the Job. The exception is thrown when you try to reference the schema object from the code inside the map function.

For instance, if you try to do as follows, you will get the "Task not serializable" exception:

val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
  // reference to the schema object declared outside
  val record = new GenericData.Record(schema)
})

You can make everything to work by just creating a new instance of the schema inside the function block:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
  val record = new GenericData.Record(innserSchema)
  ...
})

Since you would not like parsing the avro schema for every record you handle, a better solution will be to parse the schema at partition level. The following also works:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))

  tuples.map(t => {
    val record = new GenericData.Record(innserSchema)
    ...
    // this closure will be bundled together with the outer one 
    // (no serialization issues)
  })
})

The code above works as long as you provide a portable reference to the jsonSchema file, since the map function is going to be executed by multiple remote executors. It can be a reference to a file in HDFS or it can be packaged along with the application in the JAR (you will use the class-loader functions to get its contents in the latter case).

For those who are trying to use Avro with Spark, notice that there are still some unresolved compilation problems and you have to use the following import on Maven POM:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-mapred</artifactId>
  <version>1.7.7</version>
  <classifier>hadoop2</classifier>
<dependency>

Note the "hadoop2" classifier. You can track the issue at https://issues.apache.org/jira/browse/SPARK-3039.

Eno answered 11/12, 2014 at 22:50 Comment(1)
This method works fine when there are no external dependencies inside our map function. Is there any way to make schema serializable?Crozier
B
2

The default serializer used by Spark is Java serialization. So for all java types it will try to serialize using Java serialization. AvroKey is not serializable, so you are getting errors.

You can use KryoSerializer, or plugin in your custom serialization (like Avro). You can read more about serialization here. http://spark-project.org/docs/latest/tuning.html

You can also wrap your object by something that is externalizable. Check out for example the SparkFlumeEvent that wraps AvroFlumeEvent here: https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

Buoyage answered 13/5, 2014 at 23:52 Comment(0)
T
0

With dataframe it is very simple to create avro using databrics library.

dataframe.write.format("com.databricks.spark.avro").avro($hdfs_path)

In your case, input is avro so it will have schema associated with it so you can directly read avro into dataframe and after your transformation you can write into avro using above code.

To read avro into dataframe :

Spark 1.6

val dataframe =sqlContext.read.avro($hdfs_path) OR val dataframe = sqlContext.read.format("com.databricks.spark.avro").load($hdfs_path)

Spark 2.1

val dataframe =sparkSession.read.avro($hdfs_path) OR val dataframe = sparkSession.read.format("com.databricks.spark.avro").load($hdfs_path)

Toleration answered 27/3, 2018 at 9:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.