How to use spark quantilediscretizer on multiple columns
Asked Answered
C

2

6

All,

I have a ml pipeline setup as below

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)

When i run this, spark seems to setup each discretizer as a separate job. Is there a way to run all the discretizers as a single job with or without a pipeline? Thanks for the help, appreciate it.

Charnel answered 26/4, 2017 at 16:1 Comment(0)
R
7

support for this feature has been added in Spark 2.3.0. See release docs

  • Multiple column support for several feature transformers:
    • [SPARK-13030]: OneHotEncoderEstimator (Scala/Java/Python)
    • [SPARK-22397]: QuantileDiscretizer (Scala/Java)
    • [SPARK-20542]: Bucketizer (Scala/Java/Python)

You can now use setInputCols and setOutputCols to specify multiple columns, although it seems not to be yet reflected in the official docs. The performance has been greatly increased with this new patch when compared to dealing with each column one job at a time.

Your example may be adapted as follows:

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)

val discretizer = new QuantileDiscretizer()
  .setInputCols(continuous)
  .setOutputCols(continuous.map(c => s"${c}_disc"))
  .setNumBuckets(3)

val pipeline = new Pipeline().setStages(Array(discretizer))
val model = pipeline.fit(df)
model.transform(df)
Revolving answered 7/5, 2018 at 10:41 Comment(3)
Cool, now if they can add support for string indexer it will help ppl develop some fast ml modelsCharnel
Hi,How is the number of buckets determined? How do you determine that it's optimal?Hypothesize
can you show how to do this in pyspark?Sonority
C
0
import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show()

taken from quantilediscretizer

it runs as a single job for a single column, below it also runs as a single job but for multiple columns:

def discretizerFun (col: String, bucketNo: Int): 
 org.apache.spark.ml.feature.QuantileDiscretizer = {

val discretizer = new QuantileDiscretizer()

discretizer
.setInputCol(col)
.setOutputCol(s"${col}_result")
.setNumBuckets(bucketNo)
}


val data = Array((0, 18.0, 2.1), (1, 19.0, 14.1), (2, 8.0, 63.7), (3, 5.0, 
88.3), (4, 2.2, 0.8))

val df = spark.createDataFrame(data).toDF("id", "hour", "temp")

df.show

val res = discretizerFun("temp", 4).fit(discretizerFun("hour", 2).fit(df).transform(df)).transform(discretizerFun("hour", 2).fit(df).transform(df))

df.show

best way is to convert that function into udf however it might be the issue dealing with org.apache.spark.ml.feature.QuantileDiscretizer - type, if it can be done, then you will have nice and clean way of doing lazy transformation

Chronological answered 26/4, 2017 at 17:19 Comment(4)
That is a single column, i am talking about multiple columnsCharnel
ah, sorry, edited the code, it works, not very neat though, you will need to wrap it again with something to make it more reusableChronological
It functions the same as using a ml pipeline but good effort. Doesn't seem like there is anyway, hope they add setInputCols to it in near future or will have to write your own feature selection.Charnel
how can we do this in pyspark?Hectoliter

© 2022 - 2024 — McMap. All rights reserved.