Writing R data frames returned from SparkR:::map
Asked Answered
P

1

2

I am using SparkR:::map and my function returns a large-ish R dataframe for each input row, each of the same shape. I would like to write these dataframes as parquet files without 'collect'ing them. Can I map write.df over my output list? Can I get the worker tasks to write the parquet instead?

I now have a working. example. I am happy with this other than I did not expect the reduce to implicitly 'collect' as I wanted to write the resultant DF as Parquet.

Also, I'm not convinced that :::map actually does anything in parallel. Do I need always to call 'parallelise' as well?

#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")

source("jdbc-utils.R")

options(stringsAsFactors = FALSE)

# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
                         sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
                         appName = "Peter Spark test",
                         list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)

#### MAP function ####
run.model <- function(v) {
  x <- v$xs[1]
  y <- v$ys[1]
  startTime     <- format(Sys.time(), "%F %T")
  xs <- c(1:x)
  endTime <- format(Sys.time(), "%F %T")
  hostname <- system("hostname", intern = TRUE)
  xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
  return(xys)
}

# HERE BE THE SCRIPT BIT
main <- function() {

  # Make unique identifiers for each run
  xs <- c(1:365)
  ys <- c(1:1)
  xys <- data.frame(xs,ys,stringsAsFactors = FALSE)

  # Convert to Spark dataframe for mapping
  sqlContext <- get("sqlContext", envir = .GlobalEnv)
  xys.sdf <- createDataFrame(sqlContext, xys)

  # Let Spark do what Spark does
  output.list <- SparkR:::map(xys.sdf, run.model)

  # Reduce gives us a single R dataframe, which may not be what we want.
  output.redux <- SparkR:::reduce(output.list, rbind)

  # Or you can have it as a list of data frames.
  output.col <- collect(output.list)

  return(NULL)
}
Petrel answered 27/11, 2015 at 16:4 Comment(2)
You could at least provide a Minimal, Complete, and Verifiable example.Bigotry
I can't help but feel that something somewhere has gone horribly wrong if we have to access private methods to accomplish something so simple. Is there a history behind this someone can refer to?Minter
B
3

Assuming your data looks more or less like this:

rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])

and all columns have supported types you can convert it to the row-wise format:

rows <- SparkR:::flatMap(dfs, function(x) {
  data <- as.list(x)
  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
  do.call(mapply, append(args, data))
})

call createDataFrame:

sdf <- createDataFrame(sqlContext, rows)
head(sdf)

##    mpg cyl  disp  hp drat   wt  qsec vs am gear carb
## 1 18.7   8 360.0 175 3.15 3.44 17.02  0  0    3    2
## 2 18.1   6 225.0 105 2.76 3.46 20.22  1  0    3    1
## 3 14.3   8 360.0 245 3.21 3.57 15.84  0  0    3    4
## 4 24.4   4 146.7  62 3.69 3.19 20.00  1  0    4    2
## 5 22.8   4 140.8  95 3.92 3.15 22.90  1  0    4    2
## 6 19.2   6 167.6 123 3.92 3.44 18.30  1  0    4    4

printSchema(sdf)

## root
##  |-- mpg: double (nullable = true)
##  |-- cyl: double (nullable = true)
##  |-- disp: double (nullable = true)
##  |-- hp: double (nullable = true)
##  |-- drat: double (nullable = true)
##  |-- wt: double (nullable = true)
##  |-- qsec: double (nullable = true)
##  |-- vs: double (nullable = true)
##  |-- am: double (nullable = true)
##  |-- gear: double (nullable = true)
##  |-- carb: double (nullable = true)

and simply use write.df / saveDF.

Problem is you shouldn't use an internal API in the first place. One of the reasons it was removed in the initial release is not robust enough to be used directly. Not to mention it is still not clear if it will be supported or even available in the future. Just saying...

Bigotry answered 28/11, 2015 at 8:36 Comment(3)
Many thanks. I tried flatMap but you've supplied an excellent example.Petrel
Just to check - is :::map not parallel by default? I must use :::parallelize first to create an RDD and then massage my input data frame into it? I'm clearly missing (or mis-understanding) some fundamental point here.Petrel
It is. But you need a distributed data structure.Bigotry

© 2022 - 2024 — McMap. All rights reserved.