Spark - write Avro file
Asked Answered
F

3

8

What are the common practices to write Avro files with Spark (using Scala API) in a flow like this:

  1. parse some logs files from HDFS
  2. for each log file apply some business logic and generate Avro file (or maybe merge multiple files)
  3. write Avro files to HDFS

I tried to use spark-avro, but it doesn't help much.

val someLogs = sc.textFile(inputPath)

val rowRDD = someLogs.map { line =>
  createRow(...)
}

val sqlContext = new SQLContext(sc)
val dataFrame = sqlContext.createDataFrame(rowRDD, schema)
dataFrame.write.avro(outputPath)

This fails with error:

org.apache.spark.sql.AnalysisException: 
      Reference 'StringField' is ambiguous, could be: StringField#0, StringField#1, StringField#2, StringField#3, ...
Facetious answered 23/11, 2015 at 18:53 Comment(3)
Could you be more specific? For example why ` spark-avro` doesn't work for you?Lippe
I did not succeed using Avro generated java code with spark-avro. Also when I am using the Schema API I get this kind of errors: org.apache.spark.sql.AnalysisException: Reference 'StringField' is ambiguous, could be: StringField#0, StringField#1, StringField#2, StringField#3,Facetious
@d4rkang3l Are you sure that the problem is with avro serialization? Is dataFrame generated without problems?Bomke
C
15

Databricks provided library spark-avro, which helps us in reading and writing Avro data.

dataframe.write.format("com.databricks.spark.avro").save(outputPath)
Catechol answered 23/5, 2016 at 8:58 Comment(0)
D
3

Spark 2 and Scala 2.11

import com.databricks.spark.avro._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

// Do all your operations and save it on your Dataframe say (dataFrame)

dataFrame.write.avro("/tmp/output")

Maven dependency

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>4.0.0</version> 
</dependency>
Duchamp answered 17/12, 2017 at 6:48 Comment(2)
In Java 8: df2.write().format("com.databricks.spark.avro").mode(SaveMode.Overwrite).save(f.getOutputPath());Emission
I doubt .avro("/tmp/output") works. I think this way should used: write.format("avro").save(/output/path)Larousse
P
2

You need to start spark shell to include avro package..recommended for lower versions

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0

Then use to df to write as avro file-

dataframe.write.format("com.databricks.spark.avro").save(outputPath)

And write as avro table in hive -

dataframe.write.format("com.databricks.spark.avro").saveAsTable(hivedb.hivetable_avro)
Phlebotomize answered 16/12, 2017 at 23:52 Comment(2)
Paul The syntax for "append" to table, I cannot seem to get it correct. Can you verify if possible and how please?Bendite
The '.saveAsTable(hivedb.hivetable_avro)' part will have problems with below warning: WARN hive.HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider com.databricks.spark.avro. Persisting data source table dbName.Suchas.hivedb.tableName.suchas.hivetable_avro into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. At least this was happening for spark2Glisson

© 2022 - 2024 — McMap. All rights reserved.