how to build a graph from tuples in graphx and label the nodes after ?
Asked Answered
N

1

7

Some context can be found here, the idea is that I have created a graph from tuples collected from a request on a Hive table. Those correspond to trade relations between countries. Having built the graph this way, the vertices are not labelled. I want to study the distribution of degrees and get the most connected countries' names. I tried 2 options :

  • First : I tried to map the index of the vertices with the string names of the vertices with the function idMapbis inside the function which is collecting and printing the ten top connected degrees.
  • Second : I tried to add label to the vertices of the graph itself.

In both cases I get the following error : the task is not serializable

Global code :

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays 

couples look like this: Array[(Any, Any)] = Array((MWI,MOZ), (WSM,AUS), (MDA,CRI), (KNA,HTI), (PER,ERI), (SWE,CUB),...

val idMap = sc.broadcast(couples 
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct 
.zipWithIndex  
.map{case (k, v) => (k, v.toLong)}  
.toMap) 

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
.map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))})

val graph = Graph.fromEdgeTuples(edges, 1)

built this way, vertices look like (68,1) for example

val degrees: VertexRDD[Int] = graph.degrees.cache()

//Most connected vertices 
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(Int, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, k) => (id.toInt, degree)}
val ord = Ordering.by[(Int, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

We get : (79,1016),(64,912),(55,889)...

First option to retrieve the names :

val idMapbis = sc.parallelize(couples
.flatMap{case (x: String, y: String) => Seq(x, y)} 
.distinct 
.zipWithIndex  
.map{case (k, v) => (v,k)}  
.toMap)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]):  Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (idMapbis.value(id.toInt), degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

The task is not serializable but the function idMapbis is working since there is no error with idMapbis.value(graph.vertices.take(1)(0)._1.toInt)

Option 2:

graph.vertices.map{case (k, v) => (k,idMapbis.value(k.toInt))}

The task is not serializable again (for context here is how topNamesAndDegrees is modified to obtain the names of the most connected vertices in this option)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (name, degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)

I am interested in understanding how to improve one of this option, maybe both if someone see how.

Neysa answered 11/8, 2015 at 14:27 Comment(0)
H
4

Problem with your attempts is that idMapbis is an RDD. Since we already know your data fits into memory you can simply use a broadcast variable as before:

val idMapRev = sc.broadcast(idMap.value.map{case (k, v) => (v, k)}.toMap)
graph.mapVertices{case (id, _) => idMapRev.value(id)}

Alternatively you could use the correct labels from the beginning:

val countries: RDD[(VertexId, String)] = sc
  .parallelize(idMap.value.map(_.swap).toSeq)

val relationships: RDD[Edge[Int]] = sc.parallelize(couples
 .map{case (x: String, y: String) => Edge(idMap.value(x), idMap.value(y), 1)}
)

val graph = Graph(countries, relationships)

The second approach has one important advantage - if graph is large you relatively easily replace broadcast variables with joins.

Hilly answered 11/8, 2015 at 16:15 Comment(10)
Hum ok, I thought idMap could be seen as a function but I am not familiar with scala and we didn't use def so, I will try this, thank you for your reactivity, this is really helpful !Remnant
I added .value for the first solution you proposed, like that : graph.mapVertices{case (id, _) => idMapRev.value(id)}, but this is not working.Remnant
Fundamental problem is not how you use idMapBis (although for a PairRDD you should use lookup method), or even that it is not serializable, but a simple fact that it is a RDD. To access it you have to trigger action and you cannot do it inside transformation.Hilly
sc.parallelize is not the same as sc.broadcast.Hilly
your explanation was that idMapbis is an RDD, but idMapRev too no ? I mean they are the same type of object and I get the same error. Does the underscore in case(id,_) mean that only the first element is changed to idMapRev(id) or the couple is transformed is a singleton ?Remnant
second solution works perfectly but still I wish I could understand why the first doesn't work (thanks again though)Remnant
but thanks to you, I found how to make the first option work with topNamesAndDegrees(degrees, graph).map{case (id,degree) => (idMapRev.value(id),degree)} Remnant
idMap from my previous answer is Broadcast[Map[String,Long]] and idMapRev I created here is Broadcast[Map[Long,String]] . If you want to use RDD[String,Long] and RDD[String,Long] respectively you'll need to go through a series of join operations.Hilly
I have the feeling that graph doesn't like mapVertices or vertices.map because I found a tutorial which uses this method and it provokes the same error (task not serializable) for me val triCountGraph = graph.triangleCount() triCountGraph.vertices.map(x => x._2).stats(). Maybe this is a problem of version or packages ?Remnant
Let us continue this discussion in chat.Hilly

© 2022 - 2024 — McMap. All rights reserved.