How to unnest data with SparkR?
Asked Answered
A

2

0

Using SparkR how can nested arrays be "exploded along"? I've tried using explode like so:

 dat <- nested_spark_df %>% 
     mutate(a=explode(metadata)) %>%
     head()

but though the above does not cause an exception to be thrown, it does not promote the nested fields in metadata to the top level. Essentially I'm seeking behavior similar to that of Hive's LATERAL VIEW explode() functionality without relying on a HiveContext.

Note that in the code snippet I'm using the NSE enabled via SparkRext. I think the equivalent straight-SparkR would be something like ... %>% mutate(a=explode(nested_spark_df$metadata)) ... or something along those lines.

EDIT

I've tried using LATERAL VIEW explode(...) in the SparkR::sql function. It seems to work great with Parquet and ORC data. However when working with nested Avro data I tried:

dat <- collect(sql(HiveContext,
                   paste0("SELECT a.id, ax.arrival_airport, x.arrival_runway ",
                          "FROM avrodb.flight a ",  
                             "LATERAL VIEW explode(a.metadata) a AS ax ",
                          "WHERE ax.arrival_airport='ATL'")))

Only to get the following error, though when a swap out avrodb with parquetdb containing equivalent data it does what I expect.

Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 1345, dev-dn04.myorg.org): org.apache.avro.AvroTypeException: Found metadata, expecting union
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avr
Calls: <Anonymous> ... collect -> collect -> .local -> callJStatic -> invokeJava

This despite the fact that I included the DataBricks Avro package when starting Spark. Reading the same data with spark using a SQLContext (instead of the HiveContext) works fine except that I haven't been able to figure out how to effectively use the explode() function. I've also confirmed that this is not an issue with the data itself by successfully querying the same files via Hive using the same HQL statement I tried running with SparkR::sql(HiveContext, hql)

Anstus answered 27/7, 2016 at 0:58 Comment(1)
It would really help if this example were fully reproducible with a small dummy dataset.Digamy
A
1

Thanks much to @Sim. I finally figured out a sane approach though. The key is that after the explode operation when all the exploded values are still nested one level deep a select must be performed. For example:

dat <- nested_spark_df %>% 
 mutate(a=explode(nested_spark_df$metadata)) %>%
 select("id", "a.fld1", "a.fld2")

which will result in a SparkR DataFrame object with 3 columns: id, fld1, and fld2 (no a. prepended).

My mental block was that I was trying to get explode to act like PIG's flatten where it would create a bunch of new field names at the top level of the schema.

Anstus answered 9/9, 2016 at 15:4 Comment(0)
S
0

At this point, working with array columns in dplyr is tricky, e.g., see this issue. Probably best to use explode() via Spark. Also note that there is overhead associated with using the DSL version of explode (see this answer) so you may want to use the SQL form via sql().

Shipway answered 27/7, 2016 at 4:14 Comment(2)
Thanks very much. My question is how to properly use explode() such that nested records are promoted. What goes on the LHS of the mutate(LHS=explode(RHS)) expression such that several fields are promoted to the top level? See my question edit addressing why the sql() approach has not been effective for me.Anstus
Please, include the exact SparkSQL LATERAL VIEW statement you use from the Spark Shell (or a notebook) but not SparkR. Add to that the output of df.printSchema.Shipway

© 2022 - 2024 — McMap. All rights reserved.