How to read Avro file in PySpark
Asked Answered
V

4

17

I am writing a spark job using python. However, I need to read in a whole bunch of avro files.

This is the closest solution that I have found in Spark's example folder. However, you need to submit this python script using spark-submit. In the command line of spark-submit, you can specify the driver-class, in that case, all your avrokey, avrovalue class will be located.

avro_rdd = sc.newAPIHadoopFile(
        path,
        "org.apache.avro.mapreduce.AvroKeyInputFormat",
        "org.apache.avro.mapred.AvroKey",
        "org.apache.hadoop.io.NullWritable",
        keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
        conf=conf)

In my case, I need to run everything within the Python script, I have tried to create an environment variable to include the jar file, finger cross Python will add the jar to the path but clearly it is not, it is giving me unexpected class error.

os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"

Can anyone help me how to read avro file in one python script?

Vex answered 20/4, 2015 at 22:57 Comment(0)
W
16

Spark >= 2.4.0

You can use built-in Avro support. The API is backwards compatible with the spark-avro package, with a few additions (most notably from_avro / to_avro function).

Please note that module is not bundled with standard Spark binaries and has to be included using spark.jars.packages or equivalent mechanism.

See also Pyspark 2.4.0, read avro from kafka with read stream - Python

Spark < 2.4.0

You can use spark-avro library. First lets create an example dataset:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter

schema_string ='''{"namespace": "example.avro",
 "type": "record",
 "name": "KeyValue",
 "fields": [
     {"name": "key", "type": "string"},
     {"name": "value",  "type": ["int", "null"]}
 ]
}'''

schema = avro.schema.parse(schema_string)

with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
    wrt.append({"key": "foo", "value": -1})
    wrt.append({"key": "bar", "value": 1})

Reading it using spark-csv is as simple as this:

df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()

## +---+-----+
## |key|value|
## +---+-----+
## |foo|   -1|
## |bar|    1|
## +---+-----+ 
Whole answered 16/9, 2015 at 19:27 Comment(3)
Could you please provide an example of pyspark with from_avro ?Mesial
Please correct me if I'm wrong, but it looks like the built-in from_avro and to_avro functions are not yet available in PySpark 2.4.x. Looks like these are being added in PySpark 3.0, as per the @since tags here.Hautboy
@Hautboy Pyspark 2.4.0, read avro from kafka with read stream - PythonHuntington
H
6

The former solution requires to install a third-party Java dependency, which is not something most Python devs are happy with. But you don't really need an external library if all you want to do is parse your Avro files with a given schema. You can just read the binary files and parse them with your favorite python Avro package.

For instance, this is how you can load Avro files using fastavro:

from io import BytesIO
import fastavro

schema = {
    ...
}

rdd = sc.binaryFiles("/path/to/dataset/*.avro")\
    .flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema))

print(rdd.collect())
Hefty answered 12/7, 2017 at 14:45 Comment(0)
E
2

For Spark < 2.4.0, PySpark can create the dataframe by reading the avro file and its respective schema(.avsc) without any external python module by using the JAR "com.databricks.spark.avro" and python's "subprocess" module

Below is the solution:

avsc_location = hdfs://user/test/test.avsc
avro_location = hdfs://user/test/test.avro

#use subprocess module
import subproccess as SP

load_avsc_file = SP.Popen(["hdfs", "dfs", "-cat", avsc_location], stdout=SP.PIPE, stderr=SP.PIPE)
(avsc_file_output, avsc_file_error) = load_avsc_file.communicate()

avro_df = spark.read.format("com.databricks.spark.avro").option("avroSchema", avsc_file_output).load(avro_location)
Elayneelazaro answered 21/5, 2020 at 9:29 Comment(0)
J
2

We can read the Avro files data into spark dataframe. Refer this link and below code to read Avro file using PySpark.

df = spark.read.format("avro").load("<avro_file_location>")
Johnettajohnette answered 26/10, 2022 at 11:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.