Using DataFrame with MLlib
Asked Answered
S

3

12

Let's say I have a DataFrame (that I read in from a csv on HDFS) and I want to train some algorithms on it via MLlib. How do I convert the rows into LabeledPoints or otherwise utilize MLlib on this dataset?

Sidero answered 31/3, 2015 at 20:17 Comment(1)
You haven't mentioned the data type of your columns but if they are numeric (integer, double, etc) you can use VectorAssembler to convert the feature columns into one column of Vector.Phylactery
F
6

Assuming you're using Scala:

Let's say your obtain the DataFrame as follows:

val results : DataFrame = sqlContext.sql(...)

Step 1: call results.printSchema() -- this will show you not only the columns in the DataFrame and (this is important) their order, but also what Spark SQL thinks are their types. Once you see this output things get a lot less mysterious.

Step 2: Get an RDD[Row] out of the DataFrame:

val rows: RDD[Row] = results.rdd

Step 3: Now it's just a matter of pulling whatever fields interest you out of the individual rows. For this you need to know the 0-based position of each field and it's type, and luckily you obtained all that in Step 1 above. For example, let's say you did a SELECT x, y, z, w FROM ... and printing the schema yielded

root
|-- x double (nullable = ...)
|-- y string (nullable = ...)
|-- z integer (nullable = ...)
|-- w binary (nullable = ...)

And let's say all you wanted to use x and z. You can pull them out into an RDD[(Double, Integer)] as follows:

rows.map(row => {
    // x has position 0 and type double
    // z has position 2 and type integer
    (row.getDouble(0), row.getInt(2))
})

From here you just use Core Spark to create the relevant MLlib objects. Things could get a little more complicated if your SQL returns columns of array type, in which case you'll have to call getList(...) for that column.

Fayina answered 1/4, 2015 at 0:19 Comment(0)
E
2

Assuming you're using JAVA (Spark version 1.6.2):

Here is a simple example of JAVA code using DataFrame for machine learning.

  • It loads a JSON with the following structure,

    [{"label":1,"att2":5.037089672359123,"att1":2.4100883023159456}, ... ]

  • splits the data into training and testing,

  • train the model using the train data,
  • apply the model to the test data and
  • stores the results.

Moreover according to the official documentation the "DataFrame-based API is primary API" for MLlib since the current version 2.0.0. So you can find several examples using DataFrame.

The code:

SparkConf conf = new SparkConf().setAppName("MyApp").setMaster("local[2]");
SparkContext sc = new SparkContext(conf);
String path = "F:\\SparkApp\\test.json";
String outputPath = "F:\\SparkApp\\justTest";

System.setProperty("hadoop.home.dir", "C:\\winutils\\");

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame df = sqlContext.read().json(path);
df.registerTempTable("tmp");
DataFrame newDF = df.sqlContext().sql("SELECT att1, att2, label FROM tmp");
DataFrame  dataFixed = newDF.withColumn("label", newDF.col("label").cast("Double"));

VectorAssembler assembler = new VectorAssembler().setInputCols(new String[]{"att1", "att2"}).setOutputCol("features");
StringIndexer indexer = new StringIndexer().setInputCol("label").setOutputCol("labelIndexed");

// Split the data into training and test
DataFrame[] splits = dataFixed.randomSplit(new double[] {0.7, 0.3});
DataFrame trainingData = splits[0];
DataFrame testData = splits[1];

DecisionTreeClassifier dt = new DecisionTreeClassifier().setLabelCol("labelIndexed").setFeaturesCol("features");
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {assembler, indexer, dt});
// Train model
PipelineModel model = pipeline.fit(trainingData);

// Make predictions
DataFrame predictions = model.transform(testData);
predictions.rdd().coalesce(1,true,null).saveAsTextFile("justPlay.txt"  +System.currentTimeMillis());
Excrete answered 4/8, 2016 at 11:24 Comment(0)
B
1

RDD based Mllib is on its way to be deprecated, so you should rather use DataFrame based Mllib.

Generally the input to these MLlib apis is a DataFrame containing 2 columns - label and feature. There are various methods to build this DataFrame - low level apis like org.apache.spark.mllib.linalg.{Vector, Vectors}, org.apache.spark.mllib.regression.LabeledPoint, org.apache.spark.mllib.linalg.{Matrix, Matrices} etc. They all take numeric values for feature and label. Words can be converted to vectors using - org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}. This documentation explains more - https://spark.apache.org/docs/latest/mllib-data-types.html

Once input dataframe with label and feature is created, instantiate the MLlib api and pass in the DataFrame to 'fit' function to get the model and then call 'transform' or 'predict' function on the model to get the results.

Example -

training file looks like - <numeric label> <a string separated by space>

//Build word vector
    val trainingData = spark.read.parquet(<path to training file>)
    val sampleDataDf = trainingData
      .map { r =>
        val s = r.getAs[String]("value").split(" ")
        val label = s.head.toDouble
        val feature = s.tail
        (label, feature)
      }.toDF("lable","feature_words")

    val word2Vec = new Word2Vec()
      .setInputCol("feature_words")
      .setOutputCol("feature_vectors")
      .setMinCount(0)
      .setMaxIter(10)

    //build word2Vector model
    val model = word2Vec.fit(sampleDataDf) 
    //convert training text data to vector and labels
    val wVectors = model.transform(sampleDataDf)

    //train LinearSVM model
    val svmAlgorithm = new LinearSVC()
                    .setFeaturesCol("feature_vectors")
                    .setMaxIter(100)
                    .setLabelCol("lable")
                    .setRegParam(0.01)
                    .setThreshold(0.5)
                    .fit(wVectors) //use word vectors created

    //Predict new data, same format as training data containing words
     val predictionData = spark.read.parquet(<path to prediction file>)

    val pDataDf = predictionData
      .map { r =>
        val s = r.getAs[String]("value").split(" ")
        val label = s.head.toDouble
        val feature = s.tail
        (label, feature)
      }.toDF("lable","feature_words")

    val pVectors = model.transform(pDataDf)
    val predictionlResult = pVectors.map{ r =>
        val s = r.getAs[Seq[String]]("feature_words")
        val v = r.getAs[Vector]("feature_vectors")
        val c = svmAlgorithm.predict(v) // predict using trained SVM

        s"$c ${s.mkString(" ")}"
      }
Barron answered 22/12, 2019 at 3:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.