How can I build a CoordinateMatrix in Spark using a DataFrame?
Asked Answered
H

2

5

I am trying to use the Spark implementation of the ALS algorithm for recommendation systems, so I built the DataFrame depicted below, as training data:

|--------------|--------------|--------------|
|    userId    |    itemId    |    rating    |
|--------------|--------------|--------------|

Now, I would like to create a sparse matrix, to represent the interactions between every user and every item. The matrix will be sparse because if there is no interaction between a user and an item, the corresponding value in the matrix will be zero. Thus, in the end, most values will be zero.

But how can I achieve this, using a CoordinateMatrix? I'm saying CoordinateMatrix because I'm using Spark 2.1.1, with python, and in the documentation, I saw that a CoordinateMatrix should be used only when both dimensions of the matrix are huge and the matrix is very sparse.

In other words, how can I get from this DataFrame to a CoordinateMatrix, where the rows would be users, the columns would be items and the ratings would be the values in the matrix?

Haiphong answered 28/6, 2017 at 12:57 Comment(0)
A
8

A CoordinateMatrix is just a wrapper for an RDD of MatrixEntrys. A MatrixEntry is just a wrapper over a (long, long, float) tuple. Pyspark allows you to create a CoordinateMatrix from an RDD of such tuples. If the userId and itemId fields are both IntegerTypes and the rating is something like a FloatType, then creating the desired matrix is very straightforward.

from pyspark.mllib.linalg.distributed import CoordinateMatrix

cmat=CoordinateMatrix(df.rdd.map(tuple))

It is only slightly more complicated if you have StringTypes for the userId and itemId fields. You would need to index those strings first and then pass the indices to the CoordinateMatrix.

Apparently answered 28/6, 2017 at 13:22 Comment(9)
So, in the map function, for every Row of the DataFrame, I return a tuple of (long, long, float)?Haiphong
df.rdd is an rdd of Rows. When you apply .map(tuple), it converts those Rows to tuples matching the order of your df. So you would end up with tuples of the form (userId,itemId,rating). If you need to ensure the correct datatypes then you can apply another transformation such as .map(lambda r: (int(r[0]),int(r[1]),float(r[2]))), or alternatively, you can ensure the correct order and coerce to the correct datatypes by mapping and converting directly from the Row rdd entries with df.rdd.map(lambda r: (int(r['userId']),int(r['itemId']),float(r['rating']))).Apparently
One last question. I have 4342 unique users and 2798 unique items in my dataset. The sparse matrix that is constructed is 18288 by 90209. How is this happening? Shouldn't a sparse matrix drop every duplicate? I have used a grouping function to group my dataframe: groupped_df = data_df.groupBy(["userId", "itemId"]).agg(sf.sum("rating").alias("rating"))Haiphong
I tested it on my end, and it does seem like you can have multiple entries with the same indices. There doesn't appear to be any check for duplicate entries in the source code, which is a little strange, but not too surprising given that such a check would be computationally expensive to do in a distributed environment. However, once you actually need to do computations with the matrix you need to convert it to other matrix types, such as a RowMatrix. Trying to convert to those types will throw a java.lang.IllegalArgumentException: requirement failed: Found duplicate indices error.Apparently
Maybe then we should use pyspark.ml.linalg.SparseMatrix, but in order to do that we need colPtrs and rowIndices, which I guess are column pointers and row indices. Any ideas how to get those?Haiphong
Several problems with this approach. First, a pyspark SparseMatrix is a local matrix, meaning that it has to all fit onto a single node, and isn't leveraging the distributed nature of spark. Second, a SparseMatrix is basically a SciPy’s csc_matrix, and getting your data into a csc (compressed sparse column) format is a little more complicated. Finally, I am not sure of any matrix object that will automatically remove duplicates for you unless you tell it how, most of them will just throw an error telling you that there are duplicates and expect you to handle it.Apparently
Why don't you just remove the duplicates yourself with df=df.distinct() or if you have different ratings and you want the max or something you import max with from pyspark.sql.functions import max and then get the distinct userId and itemId and max rating with df=df.groupby('userId','itemId').agg(max('rating').alias('rating'))Apparently
Cannot do that. A user might have bought several items and an item might have been bought by several users. I have already do df=df.groupby('userId','itemId').agg(sum('rating').alias('ra‌​ting')), because the rating is implicit, thus how many times a user have bought an item. But this does not remove duplicates...Haiphong
That code you posted will absolutely "group", or remove, duplicate occurences of userId and itemId pairs. I am not sure that we are going to solve this here in the comments. And the topic of this has drifted away from the original question towards a new question of how to remove duplicates from a dataframe. Perhaps it makes sense to move it to another question so that you can post more information.Apparently
H
1

With Spark 2.4.0, I am showing the whole example that I hope to meet your need. Create dataframe using dictionary and pandas:

my_dict = {
    'userId': [1,2,3,4,5,6],
    'itemId': [101,102,103,104,105,106],
    'rating': [5.7, 8.8, 7.9, 9.1, 6.6, 8.3]
}
import pandas as pd
pd_df = pd.DataFrame(my_dict)
df = spark.createDataFrame(pd_df)

See the dataframe:

df.show()
+------+------+------+
|userId|itemId|rating|
+------+------+------+
|     1|   101|   5.7|
|     2|   102|   8.8|
|     3|   103|   7.9|
|     4|   104|   9.1|
|     5|   105|   6.6|
|     6|   106|   8.3|
+------+------+------+

Create CoordinateMatrix from dataframe:

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
coorRDD = df.rdd.map(lambda x: MatrixEntry(x[0], x[1], x[2]))
coorMatrix = CoordinateMatrix(coorRDD)

Now see the data type of result:

type(coorMatrix)
pyspark.mllib.linalg.distributed.CoordinateMatrix
Huntsman answered 4/2, 2020 at 3:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.