How to read parquet files using `ssc.fileStream()`? What are the types passed to `ssc.fileStream()`?
Asked Answered
S

2

6

My understanding of Spark's fileStream() method is that it takes three types as parameters: Key, Value, and Format. In case of text files, the appropriate types are: LongWritable, Text, and TextInputFormat.

First, I want to understand the nature of these types. Intuitively, I would guess that the Key in this case is the line number of the file, and the Value is the text on that line. So, in the following example of a text file:

Hello
Test
Another Test

The first row of the DStream would have a Key of 1 (0?) and a Value of Hello.

Is this correct?


Second part of my question: I looked at the decompiled implementation of ParquetInputFormat and I noticed something curious:

public class ParquetInputFormat<T>
       extends FileInputFormat<Void, T> {
//...

public class TextInputFormat
       extends FileInputFormat<LongWritable, Text>
       implements JobConfigurable {
//...

TextInputFormat extends FileInputFormat of types LongWritable and Text, whereas ParquetInputFormat extends the same class of types Void and T.

Does this mean that I must create a Value class to hold an entire row of my parquet data, and then pass the types <Void, MyClass, ParquetInputFormat<MyClass>> to ssc.fileStream()?

If so, how should I implement MyClass?


EDIT 1: I have noticed a readSupportClass which is to be passed to ParquetInputFormat objects. What kind of class is this, and how is it used to parse the parquet file? Is there some documentation that covers this?


EDIT 2: As far as I can tell, this is impossible. If anybody knows how to stream in parquet files to Spark then please feel free to share...

Segarra answered 15/2, 2016 at 15:49 Comment(1)
Please don't annotate editsTryck
O
7

My sample to read parquet files in Spark Streaming is below.

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
  directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)

val lines = stream.map(row => {
  println("row:" + row.toString())
  row
})

Some points are ...

  • record type is GenericRecord
  • readSupportClass is AvroReadSupport
  • pass Configuration to fileStream
  • set parquet.read.support.class to the Configuration

I referred to source codes below for creating sample.
And I also could not find good examples.
I would like to wait better one.

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Ownership answered 10/3, 2016 at 11:12 Comment(5)
Do you know how to fix the parquet.avro.AvroReadSupport not found error at runtime? How are you running your job? It is included in my deps in build.sbt and yet spark can't find it.Afton
Did you set parquet-avro.jar classpath at driver and executors?Ownership
My environment is CDH 5.6.x and parquet-avro.jar has already been included in hadoop lib dir.Ownership
hmm I downloaded and compiled parquet-mr and then included it on the classpath. Now I keep getting exception: org.apache.parquet.hadoop.BadConfigurationException: class parquet.avro.AvroReadSupport set in job conf at parquet.read.support.class is not a subclass of org.apache.parquet.hadoop.api.ReadSupport Think this is version mismatch?Afton
In my case actually, I did not need to add parquet jar to build.sbt, assembly jar and runtime classpath manually. This seems to be because hadoop lib classpath had parquet jar and spark classpath had hadoop classpath. If you can't use parquet jar included in hadoop, you might as well make the same version between parquet in hadoop and built one at least.Ownership
D
1

You can access the parquet by adding some parquet specific hadoop settings :

val ssc = new StreamingContext(conf, Seconds(5))
var schema =StructType(Seq(
      StructField("a", StringType, nullable = false),
      ........

     ))
val schemaJson=schema.json

val fileDir="/tmp/fileDir"
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport")  ssc.sparkContext.hadoopConfiguration.set("org.apache.spark.sql.parquet.row.requested_schema", schemaJson)
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")

val streamRdd = ssc.fileStream[Void, UnsafeRow, ParquetInputFormat[UnsafeRow]](fileDir,(t: org.apache.hadoop.fs.Path) => true, false)

streamRdd.count().print()

ssc.start()
ssc.awaitTermination()

This code was prepared with Spark 2.1.0.

Defunct answered 3/6, 2018 at 9:5 Comment(1)
Welcome to Stack Overflow! Please don't just throw your source code here. Be nice and try to give a nice description to your answer, so that others will like it and upvote it. See: How do I write a good answer?Wildfowl

© 2022 - 2024 — McMap. All rights reserved.