Out of memory error when collecting data out of Spark cluster
Asked Answered
W

2

21

I know there are plenty of questions on SO about out of memory errors on Spark but I haven't found a solution to mine.

I have a simple workflow:

  1. read in ORC files from Amazon S3
  2. filter down to a small subset of rows
  3. select a small subset of columns
  4. collect into the driver node (so I can do additional operations in R)

When I run the above and then cache the table to spark memory it takes up <2GB - tiny compared to the memory available to my cluster - then I get an OOM error when I try to collect the data to my driver node.

I have tried running on the following setups:

  • local mode on a computer with 32 cores and 244GB of ram
  • standalone mode with 10 x 6.2 GB executors and a 61GB driver node

For each of these I have played with numerous configurations of executor.memory, driver.memory, and driver.maxResultSize to cover the full range of possible values within my available memory, but always I end up with an out of memory error at the collect stage; either java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded, or Error in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned. (a sparklyr error indicative of memory issues).

Based on my [limited] understanding of Spark, caching a table prior to collecting should force all calculations - i.e. if the table is sitting happily in memory after caching at <2GB, then I shouldn't need much more than 2GB of memory to collect it into the driver node.

Note that answers to this question have some suggestions I am yet to try, but these are likely to impact performance (e.g. serialising the RDD) so would like to avoid using if possible.

My questions:

  1. how it can be that a dataframe that takes up so little space after it has been cached can cause memory problems?
  2. is there something obvious for me to check/change/troubleshoot to help fix the problem, before I move on to additional options that may compromise performance?

Thank you

Edit: note in response to @Shaido's comment below, calling cache via Sparklyr "forces data to be loaded in memory by executing a count(*) over the table" [from Sparklyr documentation] - i.e. the table should be sitting in memory and all the calculations run (I believe) prior to calling collect.

Edit: some additional observations since following the suggestions below:

  • As per the comments below, I have now tried writing the data to csv instead of collecting to get an idea of likely file size. This operation creates a set of csvs amounting to ~3GB, and takes only 2 seconds when run after caching.
  • If I set driver.maxResultSize to <1G I get an error stating that the size of the serialized RDD is 1030 MB, larger than driver.maxResultSize.
  • If I watch memory usage in Task Manager after calling collect I see that usage just keeps going up until it reaches ~ 90GB, at which point the OOM error occurs. So for whatever reason the amount of RAM being used to perform the collect operation is ~100x greater than the size of the RDD I'm trying to collect.

Edit: code added below, as requested in comments.

#__________________________________________________________________________________________________________________________________

# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________

firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'

library(dplyr)
library(stringr)
library(sparklyr)

#__________________________________________________________________________________________________________________________________

# Configure & connect to spark
#__________________________________________________________________________________________________________________________________

Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop") 

config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions

# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g' 
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')

#__________________________________________________________________________________________________________________________________

# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________

#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++

spark_session(sc) %>%
  invoke("read") %>% 
  invoke("format", "orc") %>%
  invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>% 
  invoke("createOrReplaceTempView", "alldatadf") 
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory

#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++

# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1

# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2) 

# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7) 
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)

#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++

# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory

  # filter by month and year, using ORC partitions for extra speed
  filter(((date_year==year_y1  & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
            (date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
            (date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
            (date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%

  # filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
  filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%

  # filter by advertiser ID
  filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) & 
            !is.na(advertiser_id)) |
           ((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 | 
               floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%

  # Define cols to keep
  transmute(time=as.numeric(event_time/1000000),
            user_id=as.character(user_id),
            action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
            lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
            activity_lookup=as.character(activity_id),
            sv1=as.character(segment_value_1),
            other_data=as.character(other_data))  %>%
  mutate(time_char=as.character(from_unixtime(time)))

# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")

#__________________________________________________________________________________________________________________________________

# Collect out of spark
#__________________________________________________________________________________________________________________________________

myDF <- collect(dftbl)
Wales answered 25/8, 2017 at 1:35 Comment(18)
cache() won't actually force any calculations, it will only mark the dataframe as to-be-cahced. All calculations as well as the caching will happen after you perform an action on the dataframe, e.g. count() or first().Incongruity
About that, is your code has some transformations in top of SparklyR? Like map or reduce functions? Or you are using the DataFrame API?Portsmouth
how you execute your application and what is resource manger for your purpose (spark self/yarn) ?Indoiranian
@Thiago Baldim there aren't any transformations aside from what I mentioned above. These are written in Sparklyr, which as I understand it then translates into Spark SQL.Wales
@Indoiranian running in standalone mode, no yarn.Wales
@Incongruity are you sure? For me, cache() forces the computation to store a computed DataFrame in memory, to be used later, as cache is a final action (not a transformation, those are actually delayed)Ernest
@BelkacemLahouel cache() itself is not an action and is computed lazily, see e.g. the answer here: https://mcmap.net/q/119124/-why-do-we-need-to-call-cache-or-persist-on-a-rdd. I myself use Scala with spark so I'm not that familiar with the sparklyr, so there could be differences. I saw that the tbl_cache() can force the data to be loaded into memory, however, it is done by callingcount() once.Incongruity
@Wales Have you looked at the answer to this question? #41384836 It looks quite similar to your one.Incongruity
@Incongruity thanks for the suggestion, yes looks like a very similar problem however the suggested solutions did not work for me - not in local mode at least (my standalone cluster is temporarily down so can't test there).Wales
@Wales I read here (github.com/rstudio/sparklyr/issues/287) that the 'g' sometimes is case-sensitive. Feels a bit silly, but doesn't hurt to try to change to 'G'. You could also try setting the spark.yarn.executor.memoryOverhead parameter (even if you are running locally).Incongruity
you mention, after caching it;s <2GB. Can you tell what's the total data size that you are reading ?Katsuyama
@jay, as an investigation exercise, could you please do the following: instead of collecting your data to driver, first try to write it back to s3, then check what is the real volume of data(yes, in memory it will be different, but it will give you basic sense)Hutchins
@jd_247 the Spark UI shows 12.2 GB of data read inWales
@IgorBerman unfortunately haven't been able to get write.csv working - will keep trying, but in the meantime note that a total 12.2 GB of data was read in, as per comment above.Wales
@Incongruity thanks but those suggestions did not help.Wales
If the error is definitely in the shell / REPL, then I believe you just want to set SPARK_REPL_OPTS: SPARK_REPL_OPTS="-XX:MaxPermSize=256m" spark-shellRetract
@Wales we have very similar issues. can i conclude from your edits that by using cache() prior to the save, your performance improved? I am assuming your original dataset is quite large.Grapeshot
@Renée apologies, this was a while ago and I no longer have access to the code. Can't remember if performance improved with cache().Wales
R
8

When you say collect on the dataframe there are 2 things happening,

  1. First is all the data has to be written to the output on the driver.
  2. The driver has to collect the data from all nodes and keep in its memory.

Answer:

If you are looking to just load the data into memory of the exceutors, count() is also an action that will load the data into the executor's memory which can be used by other processes.

If you want to extract the data, then try this along with other properties when puling the data "--conf spark.driver.maxResultSize=10g".

Rhine answered 31/8, 2017 at 14:35 Comment(1)
Thanks for the info on what happens when collect is called. As for the suggestions, I've already implemented both of these as described in my original question.Wales
L
2

As mentioned above, "cache" is not action, check RDD Persistence:

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. 

But "collect" is an action, and all computations (including "cache") will be started when "collect" is called.

You run application in standalone mode, it means, initial data loading and all computations will be performed in the same memory.

Data downloading and other computations are used most memory, not "collect".

You can check it by replacing "collect" with "count".

Lemuelah answered 5/9, 2017 at 9:45 Comment(4)
as noted in the original question the Sparklyr version of cache executes a count over the table. cache [i.e. count] runs fine - it's only when I call collect right afterwards that the OOM error occurs. Can you shed any light on this?Wales
Question title looks like about Spark, in reality it is about separate engine Sparklyr, which make own "cache/collect" computations. Guess, it is confusing.Lemuelah
Not a separate engine, just a front end to Spark. But agree it's confusing that cache in Sparklyr calls count in Spark.Wales
Maybe, "dataFrame.rdd.count" will also lead to OutOfMemory instead of "collect"? #42714791Lemuelah

© 2022 - 2024 — McMap. All rights reserved.