Spark cosine distance between rows using Dataframe
Asked Answered
P

1

7

I have to compute a cosine distance between each rows but I have no idea how to do it using Spark API Dataframes elegantly. The idea is to compute similarities for each rows(items) and take top 10 similarities by comparing their similarities between rows. --> This is need for Item-Item Recommender System.

All that I've read about it is referred to computing similarity over columns Apache Spark Python Cosine Similarity over DataFrames May someone say is it possible to compute a cosine distance elegantly between rows using PySpark Dataframe's API or RDD's or I have to do it manually?

That's just some code to show what I intend to do

def cosineSimilarity(vec1, vec2):
    return vec1.dot(vec2) / (LA.norm(vec1) * LA.norm(vec2))


#p.s model is ALS
Pred_Factors = model.itemFactors.cache() #Pred_Factors = DataFrame[id: int, features: array<float>]

sims = []

for _id,_feature in Pred_Factors.toLocalIterator():
    for id, feature in Pred_Factors.toLocalIterator():

        itemFactor = _feature

        sims = sims.append(_id, cosineSimilarity(asarray(feature),itemFactor))

sims = sc.parallelize(l)
sortedSims = sims.takeOrdered(10, key=lambda x: -x[1])

Thanks in Advance for all the help

Polytonality answered 10/10, 2017 at 9:53 Comment(1)
where you able to apply columnsimilarities on a dataframe?Cushing
C
5

You can use mllib.feature.IndexedRowMatrix's columnSimilarities function. It uses cosine metrics as distance function. It computes similarities between columns so, you have to take transpose before applying this function.

pred_ = IndexedRowMatrix(Pred_Factors.rdd.map(lambda x: IndexedRow(x[0],x[1]))).toBlockMatrix().transpose().toIndexedRowMatrix()
pred_sims = pred.columnSimilarities()
Canning answered 11/10, 2017 at 0:13 Comment(4)
how where you able to use this to get the top 10 similar items? @IvanShelonikCushing
can this be used to any df?Cushing
you have to convert your dataframe to rdd to apply this method.Canning
the transformations in RDD reduce drastically the number of partitions, how can i make this more efficient for parallelization? calling repartition on the underlying rdd of the matrixes means i than have to convert it again to a rowMatrix..is there any other way?Nies

© 2022 - 2024 — McMap. All rights reserved.