How to prepare data into a LibSVM format from DataFrame?
Asked Answered
H

3

18

I want to make libsvm format, so I made dataframe to the desired format, but I do not know how to convert to libsvm format. The format is as shown in the figure. I hope that the desired libsvm type is user item:rating . If you know what to do in the current situation :

val ratings = sc.textFile(new File("/user/ubuntu/kang/0829/rawRatings.csv").toString).map { line =>
     val fields = line.split(",")
      (fields(0).toInt,fields(1).toInt,fields(2).toDouble)
}
val user = ratings.map{ case (user,product,rate) => (user,(product.toInt,rate.toDouble))}
val usergroup = user.groupByKey 

val data =usergroup.map{ case(x,iter) => (x,iter.map(_._1).toArray,iter.map(_._2).toArray)}

val data_DF = data.toDF("user","item","rating")

DATAFRAME FIGURE

I am using Spark 2.0.

Heald answered 1/1, 2017 at 14:44 Comment(0)
C
19

The issue you are facing can be divided into the following :

  • Converting your ratings (I believe) into LabeledPoint data X.
  • Saving X in libsvm format.

1. Converting your ratings into LabeledPoint data X

Let's consider the following raw ratings :

val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")

You can handle those raw ratings as a coordinate list matrix (COO).

Spark implements a distributed matrix backed by an RDD of its entries : CoordinateMatrix where each entry is a tuple of (i: Long, j: Long, value: Double).

Note : A CoordinateMatrix should be used only when both dimensions of the matrix are huge and the matrix is very sparse. (which is usually the case of user/item ratings.)

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD

val data: RDD[MatrixEntry] = 
      sc.parallelize(rawRatings).map {
            line => {
                  val fields = line.split(",")
                  val i = fields(0).toLong
                  val j = fields(1).toLong
                  val value = fields(2).toDouble
                  MatrixEntry(i, j, value)
            }
      }

Now let's convert that RDD[MatrixEntry] to a CoordinateMatrix and extract the indexed rows :

val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
                .toIndexedRowMatrix().rows // Extract indexed rows
                .toDF("label", "features") // Convert rows

2. Saving LabeledPoint data in libsvm format

Since Spark 2.0, You can do that using the DataFrameWriter . Let's create a small example with some dummy LabeledPoint data (you can also use the DataFrame we created earlier) :

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

val df = Seq(neg,pos).toDF("label","features")

Unfortunately we still can't use the DataFrameWriter directly because while most pipeline components support backward compatibility for loading, some existing DataFrames and pipelines in Spark versions prior to 2.0, that contain vector or matrix columns, may need to be migrated to the new spark.ml vector and matrix types.

Utilities for converting DataFrame columns from mllib.linalg to ml.linalg types (and vice versa) can be found in org.apache.spark.mllib.util.MLUtils. In our case we need to do the following (for both the dummy data and the DataFrame from step 1.)

import org.apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)

Now let's save the DataFrame :

convertedVecDF.write.format("libsvm").save("data/foo")

And we can check the files contents :

$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0

EDIT: In current version of spark (2.1.0) there is no need to use mllib package. You can simply save LabeledPoint data in libsvm format like below:

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")
Citric answered 1/1, 2017 at 17:29 Comment(12)
Hi eliasah, I've tried your example under Spark 2.2.0, however it throws exception "java.util.NoSuchElementException: key not found: numFeatures". How did you solve it? Thanks.Udder
But you can't really apply that to a large dataset can you? Since you can't just map a current dataframe to a labeledPointUnseen
I'm not sure I understand your comment @big_mike_boiiiCitric
Ok you are hard coding the values for pos and neg. suppose you have a Dataset or DataFrame with 10,000,000 rows though, how would you avoid the MLlib then?Unseen
@big_mike_boiii That was an example to illustrate my point. It all goes back to the idea of having a vector column and a label column. So whether you have 10 points or 10M, my answer remains the same.Citric
Yeah but is there a way to make a DataFrame of LabeledPoints from a DataFrame of just Rows. Spark doesn’t allow to use a map on DataFrame.Unseen
@big_mike_boiii You can use Dataset then or you can always pull back to rdd if you are stuck map and then convert to DFCitric
I don’t think you can use Dataset so easily. And I was trying to avoid using RDD altogether. Right now I’m only able to to use the Dataset if I map every field with a .asInstanceOf[Double] which is a bit of a hack.Unseen
Let us continue this discussion in chat.Citric
please ,can you explain how to make pos, neg for 20m movielens.Mckenzie
@Salmaz I'm not sure I understand your questionCitric
How to make label column for large dataset like ratings in 20M movielens.Mckenzie
A
1

In order to convert an existing to a typed DataSet I suggest the following; Use the following case class:

case class LibSvmEntry (
   value: Double,
   features: L.Vector)

The you can use the map function to convert it to a LibSVM entry like so: df.map[LibSvmEntry](r: Row => /* Do your stuff here*/)

Arguseyed answered 8/11, 2018 at 12:24 Comment(0)
C
0

libsvm datatype features is a sparse vector, u can use pyspark.ml.linalg.SparseVector to solve the problem

a = SparseVector(4, [1, 3], [3.0, 4.0])

def sparsevecfuc(len,index,score):
    """
     args: len int, index array, score array
    """
    return SparseVector(len,index,score)
trans_sparse = udf(sparsevecfuc,VectorUDT())
Capp answered 14/8, 2019 at 6:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.