I am using SparkR:::map and my function returns a large-ish R dataframe for each input row, each of the same shape. I would like to write these dataframes as parquet files without 'collect'ing them. Can I map write.df over my output list? Can I get the worker tasks to write the parquet instead?
I now have a working. example. I am happy with this other than I did not expect the reduce to implicitly 'collect' as I wanted to write the resultant DF as Parquet.
Also, I'm not convinced that :::map actually does anything in parallel. Do I need always to call 'parallelise' as well?
#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")
source("jdbc-utils.R")
options(stringsAsFactors = FALSE)
# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
appName = "Peter Spark test",
list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)
#### MAP function ####
run.model <- function(v) {
x <- v$xs[1]
y <- v$ys[1]
startTime <- format(Sys.time(), "%F %T")
xs <- c(1:x)
endTime <- format(Sys.time(), "%F %T")
hostname <- system("hostname", intern = TRUE)
xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
return(xys)
}
# HERE BE THE SCRIPT BIT
main <- function() {
# Make unique identifiers for each run
xs <- c(1:365)
ys <- c(1:1)
xys <- data.frame(xs,ys,stringsAsFactors = FALSE)
# Convert to Spark dataframe for mapping
sqlContext <- get("sqlContext", envir = .GlobalEnv)
xys.sdf <- createDataFrame(sqlContext, xys)
# Let Spark do what Spark does
output.list <- SparkR:::map(xys.sdf, run.model)
# Reduce gives us a single R dataframe, which may not be what we want.
output.redux <- SparkR:::reduce(output.list, rbind)
# Or you can have it as a list of data frames.
output.col <- collect(output.list)
return(NULL)
}