Calculating the cosine similarity between all the rows of a dataframe in pyspark
Asked Answered
A

2

13

I have a dataset containing workers with their demographic information like age gender,address etc and their work locations. I created an RDD from the dataset and converted it into a DataFrame.

There are multiple entries for each ID. Hence, I created a DataFrame which contained only the ID of the worker and the various office locations' that he/she had worked.

    |----------|----------------|
    | **ID**    **Office_Loc**  |
    |----------|----------------|
    |   1      |Delhi, Mumbai,  |
    |          | Gandhinagar    |
    |---------------------------|
    |   2      | Delhi, Mandi   | 
    |---------------------------|
    |   3      |Hyderbad, Jaipur|
    -----------------------------

I want to calculate the cosine similarity between each worker with every other worker based on their office locations'.

So, I iterated through the rows of the DataFrame, retrieving a single row from the DataFrame :

myIndex = 1
values = (ID_place_df.rdd.zipWithIndex()
            .filter(lambda ((l, v), i): i == myIndex)
            .map(lambda ((l,v), i): (l, v))
            .collect())

and then using map

    cos_weight = ID_place_df.select("ID","office_location").rdd\
  .map(lambda x: get_cosine(values,x[0],x[1]))

to calculated the cosine similarity between the extracted row and the whole DataFrame.

I do not think my approach is a good one since I am iterating through the rows of the DataFrame, it defeats the whole purpose of using spark. Is there a better way to do it in pyspark? Kindly advise.

Acapulco answered 15/10, 2017 at 18:50 Comment(1)
I thibk it a bit long question. Usually it's a good practice to ask the qeustion with the most simple case the you getting the same issue.Inflate
O
38

You can use the mllib package to compute the L2 norm of the TF-IDF of every row. Then multiply the table with itself to get the cosine similarity as the dot product of two by two L2norms:

1. RDD

rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]])
  • Compute TF-IDF:

    documents = rdd.map(lambda l: l[1].replace(" ", "").split(","))
    
    from pyspark.mllib.feature import HashingTF, IDF
    hashingTF = HashingTF()
    tf = hashingTF.transform(documents)
    

You can specify the number of features in HashingTF to make the feature matrix smaller (fewer columns).

    tf.cache()
    idf = IDF().fit(tf)
    tfidf = idf.transform(tf)
  • Compute L2norm:

    from pyspark.mllib.feature import Normalizer
    labels = rdd.map(lambda l: l[0])
    features = tfidf
    
    normalizer = Normalizer()
    data = labels.zip(normalizer.transform(features))
    
  • Compute cosine similarity by multiplying the matrix with itself:

    from pyspark.mllib.linalg.distributed import IndexedRowMatrix
    mat = IndexedRowMatrix(data).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()
    
        array([[ 0.        ,  0.        ,  0.        ,  0.        ],
               [ 0.        ,  1.        ,  0.10794634,  0.        ],
               [ 0.        ,  0.10794634,  1.        ,  0.        ],
               [ 0.        ,  0.        ,  0.        ,  1.        ]])
    

    OR: Using a Cartesian product and the function dot on numpy arrays:

    data.cartesian(data)\
        .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
        .sortByKey()\
        .collect()
    
        [((1, 1), 1.0),
         ((1, 2), 0.10794633570596117),
         ((1, 3), 0.0),
         ((2, 1), 0.10794633570596117),
         ((2, 2), 1.0),
         ((2, 3), 0.0),
         ((3, 1), 0.0),
         ((3, 2), 0.0),
         ((3, 3), 1.0)]
    

2. DataFrame

Since you seem to be using dataframes, you can use the spark mlpackage instead:

import pyspark.sql.functions as psf
df = rdd.toDF(["ID", "Office_Loc"])\
    .withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ','))
  • Compute TF-IDF:

    from pyspark.ml.feature import HashingTF, IDF
    hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf")
    tf = hashingTF.transform(df)
    
    idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
    tfidf = idf.transform(tf)
    
  • Compute L2 norm:

    from pyspark.ml.feature import Normalizer
    normalizer = Normalizer(inputCol="feature", outputCol="norm")
    data = normalizer.transform(tfidf)
    
  • Compute matrix product:

    from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
    mat = IndexedRowMatrix(
        data.select("ID", "norm")\
            .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()
    

    OR: using a join and a UDF for function dot:

    dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
    data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\
        .select(
            psf.col("i.ID").alias("i"), 
            psf.col("j.ID").alias("j"), 
            dot_udf("i.norm", "j.norm").alias("dot"))\
        .sort("i", "j")\
        .show()
    
        +---+---+-------------------+
        |  i|  j|                dot|
        +---+---+-------------------+
        |  1|  2|0.10794633570596117|
        |  1|  3|                0.0|
        |  2|  3|                0.0|
        +---+---+-------------------+
    

This tutorial lists different methods to multiply large scale matrices: https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e

Ophiuchus answered 16/10, 2017 at 6:43 Comment(10)
Thank you for the answer. I really appreciate the help. But the code is giving me an error requirement failed: The input column must be ArrayType, but got StringType.'. during the hashingTF transformation while using dataframe.Acapulco
You have to split the string list into a word list first. I added the part on how to create dfOphiuchus
Hi, it works when I use data.cartesian(data)\ .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\ .sortByKey()\ .take(5). but when I use the mllib code and convert the blockMatrix to a LocalMatrix, it gives me u'requirement failed: The length of the values array must be less than Int.MaxValue. Currently numRows * numCols: 1006095879729669481' which I donot understand as I am taking a small subset of the data (about 10 ID's) so the numRows * numCols:100.Acapulco
try setting numFeatures to the number of distinct cities you have in your dataframe, by default it's 262144 which will be the number of columns in your block matrix (I set it it to 10 for the sample data you provided). a cartesian join with a dot product also works. Check out the link for big matrix multiplicationOphiuchus
How do you set numFeatures? I set it in hashingTF = HashingTF(numFeatures=20,inputCol="Business", outputCol="tf"). but the Block matrix still has 1003043309L cols and rows. But for the small example that given in the question I donot have that problemAcapulco
When I do the catesian join with dot product with 288 ID's and convert it to a DataFrame result=data.cartesian(data)\ .map(lambda l: ((l[0][0]), (l[1][0]), l[0][1].dot(l[1][1])))\ .toDF() I get the following error not supported type: <type 'numpy.float64'>`. When I try of small example of 10 entries I do not get this errorAcapulco
numpy int or float are not supported types in pyspark. Spark ML library is based on numpy arrays this is why they convert to numpy floats. You can bypass this by casting it as float. I added the cartesian product part for both RDD and dataframesOphiuchus
why does dot.toLocalMatrix().toArray() produce 4 x 4 array size instead of 3 x 3 as there are three labels (1,2,3)?Comehither
Hi, I've done this excercise using sklearn with differents results 0.28671097 between the first and the second row, you get ~0.107, and then I realized that you are applying the similiarity of the cosine as the product of the L2 Norm, instead of the cartesian product of the vectors divide by the product of the L2 Norm. Why have you used this way?. Regards.Vav
Indeed, you are really calculating the linear kernel function, due to the fact that the magnitude of a TF-IDF Vector is always 1.Vav
V
3

About this issue, due to the fact that I'm working in a project with pyspark where I have to use cosine similarity, I have to say that the code of @MaFF is correct, indeed, I hesitated when I see his code, due to the fact he was using the dot product of the vectors' L2 Norm, and the theroy says: Mathematically, it is the ratio of the dot product of the vectors and the product of the magnitude of the two vectors.

And here is my code adapted with the same results, so I came to the conclusion that SKLearn caculates tfidf in a different way, so if you try to replay this excersice using sklearn, you will get a different result.

d = [{'id': '1', 'office': 'Delhi, Mumbai, Gandhinagar'}, {'id': '2', 'office': 'Delhi, Mandi'}, {'id': '3', 'office': 'Hyderbad, Jaipur'}]
df_fussion = spark.createDataFrame(d)
df_fussion = df_fussion.withColumn('office', F.split('office', ', '))


from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="office", outputCol="tf")
tf = hashingTF.transform(df_fussion)

idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
data = idf.transform(tf)   

@udf
def sim_cos(v1,v2):
    try:
        p = 2
        return float(v1.dot(v2))/float(v1.norm(p)*v2.norm(p))
    except:
        return 0

result = data.alias("i").join(data.alias("j"), F.col("i.ID") < F.col("j.ID"))\
    .select(
        F.col("i.ID").alias("i"),
        F.col("j.ID").alias("j"),
        sim_cos("i.feature", "j.feature").alias("sim_cosine"))\
    .sort("i", "j")
result.show()

I also want to share with you some simply test that I did with simply vectors where the results are corrects:

cosine similarity in pyspark for two simply vectors

Kind regards,

Vav answered 29/10, 2020 at 9:49 Comment(2)
Thank you for sharing the code. May I ask how large is the dataset you've been working on? It took quite some time to calculate the similarity of a df with 1000 features and 10000 rows. I wonder is there a way to speed this up?Unexperienced
I had to deal with a dataset of roughly 100k rows. If you want to compare they all each other, you know that the number of possible partners of N rows are N*(N-1)/2, it means, 5*(10**9) possible combinations. So, I recommend you group by son features that you think are quite representative for isolate small groups of rows, and apply this thecnique to those groups.Vav

© 2022 - 2024 — McMap. All rights reserved.