Spark - GraphX - scaling connected components
Asked Answered
R

2

7

I am trying to use connected components but having issue with scaling. My Here is what I have -

// get vertices
val vertices = stage_2.flatMap(x => GraphUtil.getVertices(x)).cache

// get edges
val edges = stage_2.map(x => GraphUtil.getEdges(x)).filter(_ != null).flatMap(x => x).cache

// create graph  
val identityGraph = Graph(vertices, edges)

// get connected components
val cc = identityGraph.connectedComponents.vertices

Where, GraphUtil has helper functions to return vertices and edges. At this point, my graph has ~1 million nodes and ~2 million edges (btw, this is expected to grow to ~100 million nodes). My graph is pretty sparsely connected - so I expect plenty of small graphs.

When I run the above, I keep getting java.lang.OutOfMemoryError: Java heap space. I have tried with executor-memory 32g and running a cluster of 15 nodes with 45g as yarn container size.

Here is the exception detail:

16/10/26 10:32:26 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.lang.StringBuilder.toString(StringBuilder.java:405)
    at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:360)
    at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:98)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2216)
    at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
    at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:173)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

In addition, I am getting plenty of the following logs:

16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 320 is 263 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 321 is 268 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 322 is 264 bytes

My question is has anyone tried ConnectedComponents at this scale? If yes, what am I doing wrong?

Relic answered 26/10, 2016 at 15:47 Comment(0)
R
6

As I posted above in the comments, I implemented connected component using map/reduce on Spark. You can find more details here - https://www.linkedin.com/pulse/connected-component-using-map-reduce-apache-spark-shirish-kumar and the source code under MIT license here - https://github.com/kwartile/connected-component.

Relic answered 26/4, 2017 at 17:4 Comment(2)
How does this compare performance/scalability-wise to the native spark connected components?Ochrea
I assume by native you mean GraphX implementation. The last time used GraphX (that was about a year back), it did not scale for us. So far in our testing, our implementation is working pretty well. I have documented that as README.Relic
W
4

The connected component algorithm does not scale very well, and its performance depends quite a lot on the topology of you graph. The sparsity of you edges doesn't mean you have small components. A long string of edges is very sparse (number of edges = number of vertices - 1), but the brute force algo implemented in GraphX wouldn't be very efficient (see source of cc and pregel).

Here is what you can try (sorted, code only):

  1. Checkpoint your vertices and edges in parquet (on disk), then load them again to build you graph. Caching sometimes just doesn't cut it when you execution plan grows too big.
  2. Transform your graph in a way that will leave the result of the algorithm unchanged. For instance, you can see in the code that the algo is propagating the information in both directions (as it should, by default). So if you have several edges connecting the same two vertices, filter them out from your graph on which you apply the algo.
  3. Optimize GraphX code yourself (it is really quite straightforward), using either generic optimisation saving memory (i.e. checkpointing on disk at each iteration to avoid OOM), or domain specific optimisation (similar to point 2)

If you are ok to leave GraphX (which is becoming somewhat legacy) behind, you can consider GraphFrames (package, blog ). I never tried, so I don't know if it has CC.

I'm certain you can find other possibilities among spark packages, but maybe you will even want to use something outside of Spark. But this is out of scope of the question.

Best of luck!

Wanting answered 27/10, 2016 at 8:42 Comment(6)
GraphFrames uses DataFrames and GraphX under the hood so I don't understand how this will help the OP.Glennglenna
@Glennglenna I hope GraphFrames will be more optimised than GraphX. The fact that they run on DataFrames is a good sign because then they could leverage the catalyst optimizer and tungsten. As I said, I didn't try, I just hold an informed hope.Wanting
GraphX is a project that nobody wants to work on because the underlying graph theory is hard to scale. Unfortunately it's "almost" a dead project. I don't think that GraphFrames will go much further than GraphX nevertheless.Glennglenna
Unfortunately, you might be right. But more and more effort are put in optimising Spark computations (within the project, and outside of it, e.g. flare), also leveraging GPU. So maybe a solution integrated in Spark might become competitive, leveraging it's nice and integrated UI (GraphFrames)Wanting
Then, you'll have to read this apache-spark-developers-list.1001551.n3.nabble.com/… There is lots of things happening around spark that I'm not much happy about and I consider myself a spark fanatic.Glennglenna
Thanks for the response. I have decided to switched from GraphX ConnectedComponent to my own version on ConnectedComponent built with map/reduce - so far performing pretty well.Relic

© 2022 - 2024 — McMap. All rights reserved.