Strange performance issue Spark LSH MinHash approxSimilarityJoin
Asked Answered
S

1

7

I'm joining 2 datasets using Apache Spark ML LSH's approxSimilarityJoin method, but I'm seeings some strange behaviour.

After the (inner) join the dataset is a bit skewed, however every time one or more tasks take an inordinate amount of time to complete.

sparkui-1

As you can see the median is 6ms per task (I'm running it on a smaller source dataset to test), but 1 task takes 10min. It's hardly using any CPU cycles, it actually joins data, but so, so slow. The next slowest task runs in 14s, has 4x more records & actually spills to disk.

If you look sparkuisql

The join itself is a inner join between the two datasets on pos & hashValue (minhash) in accordance with minhash specification & udf to calculate the jaccard distance between match pairs.

Explode the hashtables:

modelDataset.select(
      struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))

Jaccard distance function:

 override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
    val xSet = x.toSparse.indices.toSet
    val ySet = y.toSparse.indices.toSet
    val intersectionSize = xSet.intersect(ySet).size.toDouble
    val unionSize = xSet.size + ySet.size - intersectionSize
    assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
    1 - intersectionSize / unionSize
  }

Join of processed datasets :

// Do a hash join on where the exploded hash values are equal.
val joinedDataset = explodedA.join(explodedB, explodeCols)
  .drop(explodeCols: _*).distinct()

// Add a new column to store the distance of the two rows.
val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
val joinedDatasetWithDist = joinedDataset.select(col("*"),
  distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
)

// Filter the joined datasets where the distance are smaller than the threshold.
joinedDatasetWithDist.filter(col(distCol) < threshold)

I've tried combinations of caching, repartitioning and even enabling spark.speculation, all to no avail.

The data consists of shingles address text that have to be matched: 53536, Evansville, WI => 53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi will have a short distance with records where there is a typo in the city or zip.

Which gives pretty accurate results, but may be the cause of the join skew.

My question is:

  • What may cause this discrepancy? (One task taking very very long, even though it has less records)
  • How can I prevent this skew in minhash without losing accuracy?
  • Is there a better way to do this at scale? ( I can't Jaro-Winkler / levenshtein compare millions of records with all records in location dataset)
Silken answered 18/7, 2018 at 13:47 Comment(2)
did you get a solution for thisSander
Yes, but probably not the one you need :-) I processed the dataset a couple of times. First a default join where stuff matched exactly. I filtered those out for the second pass where I used simple levenstein (et such) methods to get the really close ones. The third pass contained much fewer data and worked with LSHSilken
Y
1

It might be a bit late but I will post my answer here anyways to help others out. I recently had similar issues with matching misspelled company names (All executors dead MinHash LSH PySpark approxSimilarityJoin self-join on EMR cluster). Someone helped me out by suggesting to take NGrams to reduce the data skew. It helped me a lot. You could also try using e.g. 3-grams or 4-grams.

I don’t know how dirty the data is, but you could try to make use of stats. It reduces the number of possible matches substantially already.

What really helped me improving the accuracy of the matches is to postprocess the connected components (group of connected matches made by the MinHashLSH) by running a label propagation algorithm on each component. This also allows you to increase N (of the NGrams), therefore mitigating the problem of skewed data, setting the jaccard distance parameter in approxSimilarityJoin less tightly, and postprocess using label propagation.

Finally, I am currently looking into using skipgrams to match it. I found that in some cases it works better and reduces the data skew somewhat.

Yasmineyasu answered 7/7, 2020 at 0:47 Comment(4)
Nice. I actually used n-grams already, but the connected components label propagation might be the golden idea. Did you use GraphX for that? Unfortunately I'm not working on this project right now, but might be good to revisit at some point.Silken
No actually I am doing everything in python for now. So I used the NetworkX library in python for it to run the label propagation algorithm locally for each of the connected components in the network. I think it should also work by just running the label propagation algo on the entire network, but I did not try that yet. I would use Graphframes for that then because I am working in python now. I will post my experience with that once I tried :)Yasmineyasu
Hey, can you specify what do you mean by term "states" in 2nd paragraphCalyces
It is quite some time ago, but I believe it is a typo. Re-reading my post, it must be stats instead of states. As I mention in my other post I have been able reduce the extreme skew by filtering out frequently and infrequently occurring terms.Yasmineyasu

© 2022 - 2024 — McMap. All rights reserved.