"GC overhead limit exceeded" on cache of large dataset into spark memory (via sparklyr & RStudio)
Asked Answered
P

1

7

I am very new to the Big Data technologies I am attempting to work with, but have so far managed to set up sparklyr in RStudio to connect to a standalone Spark cluster. Data is stored in Cassandra, and I can successfully bring large datsets into Spark memory (cache) to run further analysis on it.

However, recently I have been having a lot of trouble bringing in one particularly large dataset into Spark memory, even though the cluster should have more than enough resources (60 cores, 200GB RAM) to handle a dataset of its size.

I thought that by limiting the data being cached to just a few select columns of interest I could overcome the issue (using the answer code from my previous query here), but it does not. What happens is the jar process on my local machine ramps up to take over up all the local RAM and CPU resources and the whole process freezes, and on the cluster executers keep getting dropped and re-added. Weirdly, this happens even when I select only 1 row for cacheing (which should make this dataset much smaller than other datasets which I have had no problem cacheing into Spark memory).

I've had a look through the logs, and these seem to be the only informative errors/warnings early on in the process:

17/03/06 11:40:27 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 33813 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) or its executor has been marked as failed.
17/03/06 11:40:27 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 8167), so marking it as still running
...
17/03/06 11:46:59 WARN TaskSetManager: Lost task 3927.3 in stage 0.0 (TID 54882, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 3863), so marking it as still running
17/03/06 11:46:59 WARN TaskSetManager: Lost task 4300.3 in stage 0.0 (TID 54667, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 14069), so marking it as still running

And then after 20min or so the whole job crashes with:

java.lang.OutOfMemoryError: GC overhead limit exceeded

I've changed my connect config to increase the heartbeat interval ( spark.executor.heartbeatInterval: '180s' ), and have seen how to increase memoryOverhead by changing settings on a yarn cluster ( using spark.yarn.executor.memoryOverhead ), but not on a standalone cluster.

In my config file, I have experimented by adding each of the following settings one at a time (none of which have worked):

spark.memory.fraction: 0.3
spark.executor.extraJavaOptions: '-Xmx24g'
spark.driver.memory: "64G"
spark.driver.extraJavaOptions: '-XX:MaxHeapSize=1024m'
spark.driver.extraJavaOptions: '-XX:+UseG1GC'

UPDATE: and my full current yml config file is as follows:

default:
# local settings
  sparklyr.sanitize.column.names: TRUE
  sparklyr.cores.local: 3
  sparklyr.shell.driver-memory: "8G"

# remote core/memory settings
  spark.executor.memory: "32G"
  spark.executor.cores: 5
  spark.executor.heartbeatInterval: '180s'
  spark.ext.h2o.nthreads: 10
  spark.cores.max: 30
  spark.memory.storageFraction: 0.6
  spark.memory.fraction: 0.3
  spark.network.timeout: 300
  spark.driver.extraJavaOptions: '-XX:+UseG1GC'

# other configs for spark
  spark.serializer: org.apache.spark.serializer.KryoSerializer
  spark.executor.extraClassPath: /var/lib/cassandra/jar/guava-18.0.jar

# cassandra settings
  spark.cassandra.connection.host: <cassandra_ip>
  spark.cassandra.auth.username: <cassandra_login>
  spark.cassandra.auth.password: <cassandra_pass>
  spark.cassandra.connection.keep_alive_ms: 60000

# spark packages to load
  sparklyr.defaultPackages: 
  - "com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M1"
  - "com.databricks:spark-csv_2.11:1.3.0"
  - "com.datastax.cassandra:cassandra-driver-core:3.0.2"
  - "com.amazonaws:aws-java-sdk-pom:1.10.34"

So my question are:

  1. Does anyone have any ideas about what to do in this instance?
    Are
  2. Are there config settings I can change to help with this issue?
  3. Alternatively, is there a way to import the cassandra data in batches with RStudio/sparklyr as the driver?
  4. Or alternatively again, is there a way to munge/filter/edit data as it is brought into cache so that the resulting table is smaller (similar to using SQL querying, but with more complex dplyr syntax)?
Protozoal answered 6/3, 2017 at 12:12 Comment(9)
Have you tried to increase also spark.executor.memory ? Also try to increase the number of executorsBabettebabeuf
Yes - I upped the executor memory to 64gb per node (for a total of 384GB RAM) and the same thing happens. Have also tried doubling the executors (to 12 executors on 6 nodes) and am having the same issues.Protozoal
"Weirdly, this happens even when I select only 1 row for cacheing" this suggests the pushdown predicate might not be being applied correctly. How big is your dataset and approximately how many (cassandra) partitions do you have? Can you post your whole config file?Surra
I have now updated original post to show full config file. The full dataset is ~70GB on disk, though I'm only trying to pull about half of that. Not sure how to get the number of cassandra partitions? Using nodetool cfstats is states that the number of keys is 4156, and dividing total size / partition mean bytes gives ~1000. The data is distributed over 6 nodes.Protozoal
How your cluster memory is distributed among executors? It looks like you have huge heap sizes. Did you tried to tune GC for Spark? Simplest option is to turn on G1GC. Check details hereNam
Have you looked at the SparkUI ? It will be useful.Abisha
@VitaliyKotlyarenko - I have attempted to tuned the GC, one of my settings above uses G1GC. Each executor is allocated 32GB of RAM (out f 200GB on each note)Protozoal
I see you use G!GC only for driver (spark.driver.extraJavaOptions). You need to do the same for executors (spark.executor.extraJavaOptions)Nam
Aha! OK, I will try this also.Protozoal
P
1

OK, I've finally managed to make this work!

I'd initially tried the suggestion of @user6910411 to decrease the cassandra input split size, but this failed in the same way. After playing around with LOTS of other things, today I tried changing that setting in the opposite direction:

spark.cassandra.input.split.size_in_mb: 254 

By INCREASING the split size, there were fewer spark tasks, and thus less overhead and fewer calls to the GC. It worked!

Protozoal answered 22/3, 2017 at 16:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.