Finding connected components of a particular node instead of the whole graph (GraphFrame/GraphX)
Asked Answered
S

1

6

I have created a GraphFrame in Spark and the graph currently looks as following:

Basically, there will be lot of such subgraphs where each of these subgraphs will be disconnected to each other. Given a particular node ID I want to find all the other nodes within the subgraph. For instance, if the node ID 1 is given then the graph will traverse and return 2,10,20,3,30.

I have created a motif but it doesn't give the right result.

testgraph.find("(a)-[]->(b); (c)-[]->(b)").filter("(a.id = '1')").show()

Unfortunately the connected component function consider the whole graph. Is it possible to get all the nodes within a disconnected subgraph given a particular node ID using GraphFrame/GraphX?

Scad answered 26/5, 2016 at 14:41 Comment(10)
But what's the issue with working on the whole graph? The way the connectedComponents algorithm works, it makes it very easy to find the disconnected subgraphs within the original graph. Basically, everything in the same subgraph gets its Vertex.attr set to the smallest VertexID in the subgraph. At that point, finding all the connected components to a given node is as easy finding all the vertices in the connectedComponents result graph that have the same vertex.attr value.Ripply
For example, in your graph pictured above, after running connectedComponents, every vertex in the left-hand subgraph would have its attr set to 1L. Every vertex in the right-hand subgraph would have its attr set to 4L. Then you can just use simple RDD operations to filter out the nodes that are in the correct subgraph.Ripply
Hi @DavidGriffin I know that. However, running connectedComponents is really slow when you have billion of nodes and you need to find just three or four of such subgraphs. In such cases, computing connectedComponents is really costly.Scad
In generally it won't be cheaper than connectedComponents. That being said it works just fine for me. Are you sure you didn't mess the types for in filter expression?Thole
Hi @Thole Can you explain why you think it won't be cheaper? My understanding is that GraphFrame is based on DataFrame when I want to traverse based on one particular Node ID, It will first search it from the Nodes DataFrame and then traverse to find the other nodes associated with it. Unlike a normal graph search, where may need to start randomly and to keep traversing until you find the Node ID and then you traverse to get the other nodes associated with it.Scad
Problem is that search is performed using joins. Assuming that initial node has low cardinality it will be handled by a broadcast joins but in the worst case scenario it'll require full blown joins.Thole
can you provide the code that you use to generate testgraph?Cobblestone
I have similar issue. I have billions of nodes and need to find connected components. I would love to get input from people who have done something similar.Lsd
@ShirishKumar did you ever find anyone who had done something similar? I ran into a similar problem in my question: #46397105Soothfast
@Soothfast I ended up writing my own version of Connected Component using Map-reduce on Spark. GraphX was not scaling well for my use case - as far as I know, GraphX is not getting much attention either. You can check it here - github.com/kwartile/connected-component. I have not thought over how I can address the the problem stated above.Lsd
P
1

Getting the connected component related to a specific vertex can be done using a BFS traversal that starts from this vertex and collects all its neighbors on several hops. This can be simply done through the Pregel API offered by GraphX, where we should implement a vertexProgram, sendMessage and mergeMessages functions. The algorithm is triggered on the reception of an initial message. The center sends a message to its neighbors that will propagate it to their neighbors and so on till covering the connected component. Every vertex that receives a msg is checked so that it won't be activated in the following iterations.

Here is the implementation of this approach:

import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

object ConnectedComponent extends  Serializable {

    def main(args = Array[String]) = {
        
        val conf = new SparkConf().setAppName("ConnectedComponent").setMaster("local")
        val sc = new SparkContext(conf)
        val vRDD = sc.objectFile[(VertexId,Int)]("/path/to/vertex/rdd/file/")
        val eRDD = sc.objectFile[Edge[Int]]("/path/to/edge/rdd/file/")
        val graph = Graph(vRDD, eRDD)
        val centerOfCC = graph.pickRandomVertex()
        var cc = extractCC(graph, center)
        cc.vertices.collect.foreach(println)

        sc.stop()
    }

    def extractCC(g: Graph[Int, Int], center: VertexId): Graph[Int, Int] = {
        /* Return a subgraph of the input graph containing 'center'  with the connected component
         */
        val initialGraph = g.mapVertices((id, attr) => VertexData(attr, false, false, center))
        val connectedComponent = initialGraph.pregel(initialMsg = 0)(vprog, sendMsg, mergeMsgs)
                                .subgraph(vpred = (id, attr) => attr.checked == true)
                                .mapVertices((id, vdata) => vdata.attr)
        connectedComponent
    }


    case class VertexData( var attr : Int, // label of the vertex
                    var checked : Boolean, // check visited vertices 
                    var propagate : Boolean, // allow forwarding msgs or not
                    var center: VertexId) // ID of the connectedComponent center
    def vprog(id:VertexId, vdata: VertexData, msg: Int): VertexData = {

        val attr : Int = vdata.attr 
        var checked : Boolean = vdata.checked
        var propagate : Boolean = vdata.propagate
        val center : VertexId = vdata.center

        if (checked==false && msg == 0 && id==center) {
          propagate = true
          checked = true
        }
        else if(checked==false && msg == 1) {
          propagate = true
          checked = true
        }
        else if(checked == true && msg == 1){
          propagate = false
        }
        new VertexData(attr, checked, propagate, center)
    }

    def sendMsg(triplet: EdgeTriplet[VertexData, Int]):Iterator[(VertexId, Int)] = {
        var it : Iterator[(VertexId, Int)] = Iterator()
        if(triplet.dstAttr.propagate==true)
          it = it ++ Iterator((triplet.srcId, 1))
        if(triplet.srcAttr.propagate==true)
          it = it ++ Iterator((triplet.dstId, 1))
        it
    }

    def mergeMsgs(a: Int, b: Int): Int = math.max(a, b)
}
Pinery answered 1/11, 2020 at 12:26 Comment(1)
Please forgive my lack of Scala knowledge, What would be the Java implementation?Lord

© 2022 - 2024 — McMap. All rights reserved.