I would like to add some information to @Mehdi Lamrani’s answer :
If you want to do a polynomial linear regression in SparkML, you may use the class PolynomialExpansion.
For information check the class in the SparkML Doc
or in the Spark API Doc
Here is an implementation example:
Let's assume we have a train and test datasets, stocked in two csv files, with headers containing the neames of the columns (features, label).
Each data set contains three features named f1,f2,f3, each of type Double
(this is the X matrix), as well as a label feature (the Y vector) named mylabel.
For this code I used Spark+Scala:
Scala version : 2.12.8
Spark version 2.4.0.
We assume that SparkML library was already downloaded in build.sbt.
First of all, import librairies :
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.udf
import org.apache.spark.{SparkConf, SparkContext}
Create Spark Session and Spark Context :
val ss = org.apache.spark.sql
.SparkSession.builder()
.master("local")
.appName("Read CSV")
.enableHiveSupport()
.getOrCreate()
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
Instantiate the variables you are going to use :
val f_train:String = "path/to/your/train_file.csv"
val f_test:String = "path/to/your/test_file.csv"
val degree:Int = 3 // Set the degree of your choice
val maxIter:Int = 10 // Set the max number of iterations
val lambda:Double = 0.0 // Set your lambda
val alpha:Double = 0.3 // Set the learning rate
First of all, let's create first several udf-s, which will be used for the data reading and pre-processing.
The arguments' types of the udf toFeatures will be Vector
followed by the type of the arguments of the features: (Double,Double,Double
)
val toFeatures = udf[Vector, Double, Double, Double] {
(a,b,c) => Vectors.dense(a,b,c)
}
val encodeIntToDouble = udf[Double, Int](_.toDouble)
Now let's create a function which extracts data from CSV and creates, new features from the existing ones, using PolynomialExpansion:
def getDataPolynomial(
currentfile:String,
sc:SparkSession,
sco:SparkContext,
degree:Int
):DataFrame =
{
val df_rough:DataFrame = sc.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.option("inferSchema", value=true)
.load(currentfile)
.toDF("f1", "f2", "f3", "myLabel")
// you may add or not the last line
val df:DataFrame = df_rough
.withColumn("featNormTemp", toFeatures(df_rough("f1"), df_rough("f2"), df_rough("f3")))
.withColumn("label", Tools.encodeIntToDouble(df_rough("myLabel")))
val polyExpansion = new PolynomialExpansion()
.setInputCol("featNormTemp")
.setOutputCol("polyFeatures")
.setDegree(degree)
val polyDF:DataFrame=polyExpansion.transform(df.select("featNormTemp"))
val datafixedWithFeatures:DataFrame = polyDF.withColumn("features", polyDF("polyFeatures"))
val datafixedWithFeaturesLabel = datafixedWithFeatures
.join(df,df("featNormTemp") === datafixedWithFeatures("featNormTemp"))
.select("label", "polyFeatures")
datafixedWithFeaturesLabel
}
Now, run the function both for the train and test datasets, using the chosen degree for the Polynomial expansion.
val X:DataFrame = getDataPolynomial(f_train,ss,sc,degree)
val X_test:DataFrame = getDataPolynomial(f_test,ss,sc,degree)
Run the algorithm in order to get a model of linear regression, using a pipeline :
val assembler = new VectorAssembler()
.setInputCols(Array("polyFeatures"))
.setOutputCol("features2")
val lr = new LinearRegression()
.setMaxIter(maxIter)
.setRegParam(lambda)
.setElasticNetParam(alpha)
.setFeaturesCol("features2")
.setLabelCol("label")
// Fit the model:
val pipeline:Pipeline = new Pipeline().setStages(Array(assembler,lr))
val lrModel:PipelineModel = pipeline.fit(X)
// Get prediction on the test set :
val result:DataFrame = lrModel.transform(X_test)
Finally, evaluate the result using mean squared error measure :
def leastSquaresError(result:DataFrame):Double = {
val rm:RegressionMetrics = new RegressionMetrics(
result
.select("label","prediction")
.rdd
.map(x => (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))
Math.sqrt(rm.meanSquaredError)
}
val error:Double = leastSquaresError(result)
println("Error : "+error)
I hope this might be useful !