Jaccard Similarity of an RDD with the help of Spark and Scala without Cartesian?
Asked Answered
B

1

2

I am working on pair RDDs. My aim is to calculate jaccard similarity between the set of rdd values and cluster them according to the jaccard similarity threshold value.Structure of my RDD is :

val a= [Key,Set(String)]   //Pair RDD

For example:-    
India,[Country,Place,....]  
USA,[Country,State,..]  
Berlin,[City,Popluatedplace,..]   

After finding jaccard similarity, I will cluster the similar entities into one cluster. In the above example, India and USA will be cluster into one cluster based on some threshold value whereas Berlin will be in the other cluster.

So I took the Cartesian product of rdd a

val filterOnjoin = a.cartesian(a).filter(f => 
(!f._1._1.toString().contentEquals(f._2._1.toString()))) 
//Cartesianproduct of rdd a and filtering rows with same key at both 
//the position.
//e.g. ((India,Set[Country,Place,....]),(USA,Set[Country,State,..])) 

and compare the set of values with the help of jaccard similarity.

val Jsim = filterOnjoin.map(f => (f._1._1, (f._2._1, 
Similarity.sim(f._1._2, f._2._2)))) //calculating jaccard similarity.
//(India,USA,0.8)

The code is running fine on smaller dataset. As the size of dataset is increased, Cartesian product is taking too much time. For 100 MB data(size of rdd "a"), its doing data shuffle read around 25 GB. For 3.5 GB data, its in TB.

I have gone through various links. Like spark tuning methods and some on stack overflow. But most of the post it is written that broadcast the smaller RDD. But here the size of both the rdd is the same and its big.

Links which I followed :-
Spark: produce RDD[(X, X)] of all possible combinations from RDD[X] of-all-possible-combinations-from-rddx

Spark repartition is slow and shuffles too much data

Map key, value pair based on similarity of their value in Spark

I am new to Spark and Scala. I am unable to think beyond Cartesian product which is bottleneck here. Is it possible to solve this problem without Cartesian product.

Biotope answered 9/3, 2018 at 1:56 Comment(0)
B
3

As Cartesian product is an expensive operation on rdd, I tried to solve above problem by using HashingTF and MinHashLSH library present in Spark MLib for finding jaccard similarity. Steps to find Jaccard similarity in rdd "a" mentioned in the question:

  • Convert the rdd into dataframe

     import sparkSession.implicits._  
     val dfA = a.toDF("id", "values")
    
  • Create the feature vector with the help of HashingTF

      val hashingTF = new HashingTF()
     .setInputCol("values").setOutputCol("features").setNumFeatures(1048576)  
    
  • Feature transformation

    val featurizedData = hashingTF.transform(dfA) //Feature Transformation  
    
  • Creating minHash table. More is the value of number of table, more accurate results will be, but high communication cost and run time.

     val mh = new MinHashLSH()
            .setNumHashTables(3) 
            .setInputCol("features")
            .setOutputCol("hashes")
    
  • Approximate similarity join takes two datasets and approximately returns pairs of rows in the datasets whose distance is smaller than a user-defined threshold. Approximate similarity join supports both joining two different datasets and self-joining. Self-joining will produce some duplicate pairs.

      val model = mh.fit(featurizedData)  
      //Approximately joining featurizedData with Jaccard distance smaller 
      //than 0.45
     val dffilter = model.approxSimilarityJoin(featurizedData, featurizedData, 
                    0.45)    
    

Since in spark, we have to do manual optimization in our code like setting of number of partition, setting persist level etc. I have configured these parameters also.

  • Changing storaagelevel from persist() to persist(StorageLevel.MEMORY_AND_DISK), it help me to remove OOM error.
  • Also while doing join operation, re-partitioned the data according to the rdd size. On 16.6 GB data set, while doing simple join operation, I was using 200 partition. On increase it to 600, it also solves my problem related to OOM.

PS: the constant parameters setNumFeatures(1048576) and setNumHashTables(3) are configured while experimenting on 16.6 data set. You can increase or decrease these value according to your data set. Also the number of partition depends upon your data set size. With these optimization, I got my desired results.

Useful links:-
[https://spark.apache.org/docs/2.2.0/ml-features.html#locality-sensitive-hashing]
[https://eng.uber.com/lsh/]
[https://data-flair.training/blogs/limitations-of-apache-spark/]

Biotope answered 18/6, 2018 at 23:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.