Getting the connected component related to a specific vertex can be done using a BFS traversal that starts from this vertex and collects all its neighbors on several hops.
This can be simply done through the Pregel API offered by GraphX, where we should implement a vertexProgram, sendMessage and mergeMessages functions. The algorithm is triggered on the reception of an initial message. The center sends a message to its neighbors that will propagate it to their neighbors and so on till covering the connected component. Every vertex that receives a msg is checked so that it won't be activated in the following iterations.
Here is the implementation of this approach:
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}
object ConnectedComponent extends Serializable {
def main(args = Array[String]) = {
val conf = new SparkConf().setAppName("ConnectedComponent").setMaster("local")
val sc = new SparkContext(conf)
val vRDD = sc.objectFile[(VertexId,Int)]("/path/to/vertex/rdd/file/")
val eRDD = sc.objectFile[Edge[Int]]("/path/to/edge/rdd/file/")
val graph = Graph(vRDD, eRDD)
val centerOfCC = graph.pickRandomVertex()
var cc = extractCC(graph, center)
cc.vertices.collect.foreach(println)
sc.stop()
}
def extractCC(g: Graph[Int, Int], center: VertexId): Graph[Int, Int] = {
/* Return a subgraph of the input graph containing 'center' with the connected component
*/
val initialGraph = g.mapVertices((id, attr) => VertexData(attr, false, false, center))
val connectedComponent = initialGraph.pregel(initialMsg = 0)(vprog, sendMsg, mergeMsgs)
.subgraph(vpred = (id, attr) => attr.checked == true)
.mapVertices((id, vdata) => vdata.attr)
connectedComponent
}
case class VertexData( var attr : Int, // label of the vertex
var checked : Boolean, // check visited vertices
var propagate : Boolean, // allow forwarding msgs or not
var center: VertexId) // ID of the connectedComponent center
def vprog(id:VertexId, vdata: VertexData, msg: Int): VertexData = {
val attr : Int = vdata.attr
var checked : Boolean = vdata.checked
var propagate : Boolean = vdata.propagate
val center : VertexId = vdata.center
if (checked==false && msg == 0 && id==center) {
propagate = true
checked = true
}
else if(checked==false && msg == 1) {
propagate = true
checked = true
}
else if(checked == true && msg == 1){
propagate = false
}
new VertexData(attr, checked, propagate, center)
}
def sendMsg(triplet: EdgeTriplet[VertexData, Int]):Iterator[(VertexId, Int)] = {
var it : Iterator[(VertexId, Int)] = Iterator()
if(triplet.dstAttr.propagate==true)
it = it ++ Iterator((triplet.srcId, 1))
if(triplet.srcAttr.propagate==true)
it = it ++ Iterator((triplet.dstId, 1))
it
}
def mergeMsgs(a: Int, b: Int): Int = math.max(a, b)
}
connectedComponents
algorithm works, it makes it very easy to find the disconnected subgraphs within the original graph. Basically, everything in the same subgraph gets itsVertex.attr
set to the smallestVertexID
in the subgraph. At that point, finding all the connected components to a given node is as easy finding all the vertices in theconnectedComponents
result graph that have the samevertex.attr
value. – RipplyconnectedComponents
, every vertex in the left-hand subgraph would have itsattr
set to1L
. Every vertex in the right-hand subgraph would have itsattr
set to4L
. Then you can just use simpleRDD
operations tofilter
out the nodes that are in the correct subgraph. – RipplyconnectedComponents
is really slow when you have billion of nodes and you need to find just three or four of such subgraphs. In such cases, computingconnectedComponents
is really costly. – ScadconnectedComponents
. That being said it works just fine for me. Are you sure you didn't mess the types for in filter expression? – Tholetestgraph
? – Cobblestone