How to create a graph from Array[(Any, Any)] using Graph.fromEdgeTuples
Asked Answered
E

1

6

I am very new to spark but I want to create a graph from relations that I get from a Hive table. I found a function that is supposed to allow this without defining the vertices but I can't get it to work.

I know this isn't a reproducible example but here is my code :

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3)) //country to country 

val graph = Graph.fromEdgeTuples(couples, 1)

The last line generates the following error :

val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1)
<console>:31: error: type mismatch;
found   : Array[(Any, Any)]
required: Seq[(org.apache.spark.graphx.VertexId,org.apache.spark.graphx.VertexId)]
Error occurred in an application involving default arguments.
val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1)

couples look like this :

couples: Array[(Any, Any)] = Array((MWI,MOZ), (WSM,AUS), (MDA,CRI), (KNA,HTI), (PER,ERI), (SWE,CUB), (DEU,PRK), (THA,DJI), (BIH,SVK), (RUS,THA), (SGP,BLR), (MEX,TGO), (TUR,ZAF), (ZWE,SYC), (UGA,GHA), (OMN,SVN), (NZL,SYR), (CHE,SLV), (CZE,LUX), (TGO,COM), (TTO,WLF), (NGA,PAN), (FJI,UKR), (BRA,ECU), (EGY,SWE), (ITA,ARG), (MUS,MLT), (MDG,DZA), (ARE,SUR), (CAN,GUY), (OMN,COG), (NAM,FIN), (ITA,HMD), (SWE,CHE), (SDN,NER), (TUN,USA), (THA,GMB), (HUN,TTO), (FRA,BEN), (NER,TCD), (CHN,JPN), (DNK,ZAF), (MLT,UKR), (ARM,OMN), (PRT,IDN), (BEN,PER), (TTO,BRA), (KAZ,SMR), (CPV,""), (ARG,ZAF), (BLR,TJK), (AZE,SVK), (ITA,STP), (MDA,IRL), (POL,SVN), (PRY,ETH), (HKG,MOZ), (QAT,GAB), (THA,MUS), (PHL,MOZ), (ITA,SGS), (ARM,KHM), (ARG,KOR), (AUT,GMB), (SYR,COM), (CZE,GBR), (DOM,USA), (CYP,LAO), (USA,LBR)

How can I convert to the suitable format ?

Evermore answered 10/8, 2015 at 20:2 Comment(0)
V
7

First of all you cannot use String as a VertexId so you have to map labels to Long. Then, we need to prepare a mapping from label to id. As long as the number of unique values is relatively small, the simplest approach is to create a broadcast variable:

val idMap = sc.broadcast(couples // -> Array[(Any, Any)]
  // Make sure we use String not Any returned from Row.apply
  // And convert to Seq so we can flatten results
  .flatMap{case (x: String, y: String) => Seq(x, y)} // -> Array[String]
  // Get different keys
  .distinct // -> Array[String]
  // Create (key, value) pairs
  .zipWithIndex  // -> Array[(String, Int)]
  // Convert values to Long so we can use it as a VertexId
  .map{case (k, v) => (k, v.toLong)}  // -> Array[(String, Long)]
  // Create map
  .toMap) // -> Map[String,Long]

Next we can use the above to perform mapping:

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
  .map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))}
)

Finally we get a graph:

val graph = Graph.fromEdgeTuples(edges, 1)
Vincents answered 10/8, 2015 at 21:52 Comment(6)
Waw, thanks ! I will try this first thing tomorrow. Would you mind elaborating on the different methods you used? I get the general idea but it would be very useful for me to be able to understand every step and not just copy thisKearse
Sure, I've added some comments and type information.Vincents
Totally working, thanks a lot for the explanation. Do you know if this possible to visualise a graph with Spark, I'm working with the console so there is no graphical interface I guess ?Kearse
I doubt it. You can always collect subgraph of interest and use general purpose graph processing library like Gephi.Vincents
If you get bored I have a new question :-) #31945025Kearse
@Vincents By a "relatively small" number of values, do you mean that the data fits in memory, or something else? Also, just curious, what would be the best strategy if the number of values is not small. Thank you so much for your answers!Tyus

© 2022 - 2024 — McMap. All rights reserved.