Spark java.lang.StackOverflowError
Asked Answered
S

5

14

I'm using spark in order to calculate the pagerank of user reviews, but I keep getting Spark java.lang.StackOverflowError when I run my code on a big dataset (40k entries). when running the code on a small number of entries it works fine though.

Entry Example :

product/productId: B00004CK40   review/userId: A39IIHQF18YGZA   review/profileName: C. A. M. Salas  review/helpfulness: 0/0 review/score: 4.0   review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.

The Code:

public void calculatePageRank() {
    sc.clearCallSite();
    sc.clearJobGroup();

    JavaRDD < String > rddFileData = sc.textFile(inputFileName).cache();
    sc.setCheckpointDir("pagerankCheckpoint/");

    JavaRDD < String > rddMovieData = rddFileData.map(new Function < String, String > () {

        @Override
        public String call(String arg0) throws Exception {
            String[] data = arg0.split("\t");
            String movieId = data[0].split(":")[1].trim();
            String userId = data[1].split(":")[1].trim();
            return movieId + "\t" + userId;
        }
    });

    JavaPairRDD<String, Iterable<String>> rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {

        @Override
        public Tuple2 < String, String > call(String arg0) throws Exception {
            String[] data = arg0.split("\t");
            return new Tuple2 < String, String > (data[0], data[1]);
        }
    }).groupByKey().cache();


    JavaRDD<Iterable<String>> cartUsers = rddPairReviewData.map(f -> f._2());
      List<Iterable<String>> cartUsersList = cartUsers.collect();
      JavaPairRDD<String,String> finalCartesian = null;
      int iterCounter = 0;
      for(Iterable<String> out : cartUsersList){
          JavaRDD<String> currentUsersRDD = sc.parallelize(Lists.newArrayList(out));
          if(finalCartesian==null){
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);
          }
          else{
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);
              if(iterCounter % 20 == 0) {
                  finalCartesian.checkpoint();
              }
          }
      }
      JavaRDD<Tuple2<String,String>> finalCartesianToTuple = finalCartesian.map(m -> new Tuple2<String,String>(m._1(),m._2()));

      finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);
      JavaPairRDD<String, String> userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2<String,String>(m._1(),m._2()));

      JavaRDD<String> userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {

        //Tuple2<Tuple2<MovieId, userId>, Tuple2<movieId, userId>>
          @Override
          public String call (Tuple2<String, String> t) throws Exception {
            return t._1 + " " + t._2;
          }
      });

    try {

//calculate pagerank using this https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
        JavaPageRank.calculatePageRank(userIdPairsString, 100);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    sc.close();

}
Sverre answered 19/6, 2016 at 16:32 Comment(0)
N
9

I have multiple suggestions which will help you to greatly improve the performance of the code in your question.

  1. Caching: Caching should be used on those data sets which you need to refer to again and again for same/ different operations (iterative algorithms.

An example is RDD.count — to tell you the number of lines in the file, the file needs to be read. So if you write RDD.count, at this point the file will be read, the lines will be counted, and the count will be returned.

What if you call RDD.count again? The same thing: the file will be read and counted again. So what does RDD.cache do? Now, if you run RDD.count the first time, the file will be loaded, cached, and counted. If you call RDD.count a second time, the operation will use the cache. It will just take the data from the cache and count the lines, no recomputing.

Read more about caching here.

In your code sample you are not reusing anything that you've cached. So you may remove the .cache from there.

  1. Parallelization: In the code sample, you've parallelized every individual element in your RDD which is already a distributed collection. I suggest you to merge the rddFileData, rddMovieData and rddPairReviewData steps so that it happens in one go.

Get rid of .collect since that brings the results back to the driver and maybe the actual reason for your error.

Nazler answered 22/6, 2017 at 8:31 Comment(0)
R
6

This problem will occur when your DAG grows big and too many level of transformations happening in your code. The JVM will not be able to hold the operations to perform lazy execution when an action is performed in the end.

Checkpointing is one option. I would suggest to implement spark-sql for this kind of aggregations. If your data is structured, try to load that into dataframes and perform grouping and other mysql functions to achieve this.

Remus answered 25/2, 2017 at 2:29 Comment(0)
H
4

When your for loop grows really large, Spark can no longer keep track of the lineage. Enable checkpointing in your for loop to checkpoint your rdd every 10 iterations or so. Checkpointing will fix the problem. Don't forget to clean up the checkpoint directory after.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

Hakeem answered 19/6, 2016 at 21:30 Comment(0)
S
0

Below things fixed stackoverflow error, as others pointed it's because of lineage that spark keeps building, specially when you have loop/iteration in code.

Set checkpoint directory

spark.sparkContext.setCheckpointDir("./checkpoint")

checkpoint dataframe/Rdd you are modifying/operating in iteration

modifyingDf.checkpoint()

Cache Dataframe which are reused in each iteration

reusedDf.cache()
Snake answered 12/7, 2021 at 8:9 Comment(0)
G
0

Add this config:

--conf "spark.executor.extraJavaOptions=-Xss512m"
--conf "spark.driver.extraJavaOptions=-Xss512m"
Gallicism answered 4/8, 2023 at 20:14 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.