Pyspark 2.4.0, read avro from kafka with read stream - Python
Asked Answered
C

1

8

I am trying to read avro messages from Kafka, using PySpark 2.4.0.

The spark-avro external module can provide this solution for reading avro files:

df = spark.read.format("avro").load("examples/src/main/resources/users.avro") 
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

However, I need to read streamed avro messages. The library documentation suggests using the from_avro() function, which is only available for Scala and Java.

Are there any other modules that support reading avro messages streamed from Kafka?

Ceroplastics answered 14/2, 2019 at 14:48 Comment(0)
M
15

You can include spark-avro package, for example using --packages (adjust versions to match spark installation):

bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0

and provide your own wrappers:

from pyspark.sql.column import Column, _to_java_column 

def from_avro(col, jsonFormatSchema): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema)) 


def to_avro(col): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
    return Column(f(_to_java_column(col))) 

Example usage (adopted from the official test suite):

from pyspark.sql.functions import col, struct


avro_type_struct = """
{
  "type": "record",
  "name": "struct",
  "fields": [
    {"name": "col1", "type": "long"},
    {"name": "col2", "type": "string"}
  ]
}"""


df = spark.range(10).select(struct(
    col("id"),
    col("id").cast("string").alias("id2")
).alias("struct"))
avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))
avro_struct_df.show(3)
+----------+
|      avro|
+----------+
|[00 02 30]|
|[02 02 31]|
|[04 02 32]|
+----------+
only showing top 3 rows
avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)
+------------------------------------------------+
|from_avro(avro, struct<col1:bigint,col2:string>)|
+------------------------------------------------+
|                                          [0, 0]|
|                                          [1, 1]|
|                                          [2, 2]|
+------------------------------------------------+
only showing top 3 rows
Minify answered 14/2, 2019 at 18:11 Comment(2)
Just to note here an issue I had with with importing the package with spark-submit is the $spark-submit job.py --packages org.apache.spark:spark-avro_2.11:2.4.0 does not work. Instead it should be writen like this $spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 job.pyCeroplastics
This doesn't work with Avro from Confluent Schema Registry. For that, this answer seems to be better https://mcmap.net/q/440668/-integrating-spark-structured-streaming-with-the-confluent-schema-registryStitch

© 2022 - 2024 — McMap. All rights reserved.