I know there are plenty of questions on SO about out of memory errors on Spark but I haven't found a solution to mine.
I have a simple workflow:
- read in ORC files from Amazon S3
filter
down to a small subset of rowsselect
a small subset of columnscollect
into the driver node (so I can do additional operations inR
)
When I run the above and then cache
the table to spark memory it takes up <2GB - tiny compared to the memory available to my cluster - then I get an OOM error when I try to collect
the data to my driver node.
I have tried running on the following setups:
- local mode on a computer with 32 cores and 244GB of ram
- standalone mode with 10 x 6.2 GB executors and a 61GB driver node
For each of these I have played with numerous configurations of executor.memory
, driver.memory
, and driver.maxResultSize
to cover the full range of possible values within my available memory, but always I end up with an out of memory error at the collect
stage; either
java.lang.OutOfMemoryError: Java heap space
,
java.lang.OutOfMemoryError : GC overhead limit exceeded
, or
Error in invoke_method.spark_shell_connection(spark_connection(jobj), :
No status is returned.
(a sparklyr
error indicative of memory issues).
Based on my [limited] understanding of Spark, caching a table prior to collecting should force all calculations - i.e. if the table is sitting happily in memory after caching at <2GB, then I shouldn't need much more than 2GB of memory to collect it into the driver node.
Note that answers to this question have some suggestions I am yet to try, but these are likely to impact performance (e.g. serialising the RDD) so would like to avoid using if possible.
My questions:
- how it can be that a dataframe that takes up so little space after it has been cached can cause memory problems?
- is there something obvious for me to check/change/troubleshoot to help fix the problem, before I move on to additional options that may compromise performance?
Thank you
Edit: note in response to @Shaido's comment below, calling cache
via Sparklyr "forces data to be loaded in memory by executing a count(*)
over the table" [from Sparklyr documentation] - i.e. the table should be sitting in memory and all the calculations run (I believe) prior to calling collect
.
Edit: some additional observations since following the suggestions below:
- As per the comments below, I have now tried writing the data to csv instead of collecting to get an idea of likely file size. This operation creates a set of csvs amounting to ~3GB, and takes only 2 seconds when run after caching.
- If I set
driver.maxResultSize
to <1G I get an error stating that the size of the serialized RDD is 1030 MB, larger than driver.maxResultSize. - If I watch memory usage in Task Manager after calling
collect
I see that usage just keeps going up until it reaches ~ 90GB, at which point the OOM error occurs. So for whatever reason the amount of RAM being used to perform thecollect
operation is ~100x greater than the size of the RDD I'm trying to collect.
Edit: code added below, as requested in comments.
#__________________________________________________________________________________________________________________________________
# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________
firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'
library(dplyr)
library(stringr)
library(sparklyr)
#__________________________________________________________________________________________________________________________________
# Configure & connect to spark
#__________________________________________________________________________________________________________________________________
Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop")
config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions
# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g'
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')
#__________________________________________________________________________________________________________________________________
# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________
#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++
spark_session(sc) %>%
invoke("read") %>%
invoke("format", "orc") %>%
invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>%
invoke("createOrReplaceTempView", "alldatadf")
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory
#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++
# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1
# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2)
# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7)
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)
#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++
# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory
# filter by month and year, using ORC partitions for extra speed
filter(((date_year==year_y1 & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
(date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
(date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
(date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%
# filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%
# filter by advertiser ID
filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) &
!is.na(advertiser_id)) |
((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 |
floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%
# Define cols to keep
transmute(time=as.numeric(event_time/1000000),
user_id=as.character(user_id),
action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
activity_lookup=as.character(activity_id),
sv1=as.character(segment_value_1),
other_data=as.character(other_data)) %>%
mutate(time_char=as.character(from_unixtime(time)))
# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")
#__________________________________________________________________________________________________________________________________
# Collect out of spark
#__________________________________________________________________________________________________________________________________
myDF <- collect(dftbl)
cache()
won't actually force any calculations, it will only mark the dataframe as to-be-cahced. All calculations as well as the caching will happen after you perform an action on the dataframe, e.g.count()
orfirst()
. – Incongruitycache()
itself is not an action and is computed lazily, see e.g. the answer here: https://mcmap.net/q/119124/-why-do-we-need-to-call-cache-or-persist-on-a-rdd. I myself use Scala with spark so I'm not that familiar with the sparklyr, so there could be differences. I saw that thetbl_cache()
can force the data to be loaded into memory, however, it is done by callingcount()
once. – Incongruityspark.yarn.executor.memoryOverhead
parameter (even if you are running locally). – Incongruity