Scala - How to split the probability column (column of vectors) that we obtain when we fit the GMM model to the data in to two separate columns? [duplicate]
Asked Answered
O

1

6

I am trying to do the following:

+-----+-------------------------+----------+-------------------------------------------+
|label|features                 |prediction|probability                                |
+-----+-------------------------+----------+-------------------------------------------+
|0.0  |(3,[],[])                |0         |[0.9999999999999979,2.093996169658831E-15] |
|1.0  |(3,[0,1,2],[0.1,0.1,0.1])|0         |[0.999999999999999,9.891337521299582E-16]  |
|2.0  |(3,[0,1,2],[0.2,0.2,0.2])|0         |[0.9999999999999979,2.0939961696578572E-15]|
|3.0  |(3,[0,1,2],[9.0,9.0,9.0])|1         |[2.093996169659668E-15,0.9999999999999979] |
|4.0  |(3,[0,1,2],[9.1,9.1,9.1])|1         |[9.89133752128275E-16,0.999999999999999]   |
|5.0  |(3,[0,1,2],[9.2,9.2,9.2])|1         |[2.0939961696605603E-15,0.9999999999999979]|
+-----+-------------------------+----------+-------------------------------------------+

Convert the above dataframe to have two more columns: prob1 & prob2 Each column having the corresponding values as present in the probability column.

I found similar questions - one in PySpark and the other in Scala. I do not know how to translate the PySpark code and I am getting an error with the Scala code.

PySpark Code:

split1_udf = udf(lambda value: value[0].item(), FloatType())
split2_udf = udf(lambda value: value[1].item(), FloatType())

output2 = randomforestoutput.select(split1_udf('probability').alias('c1'), split2_udf('probability').alias('c2'))

Or to append these columns to the original dataframe:

randomforestoutput.withColumn('c1', split1_udf('probability')).withColumn('c2', split2_udf('probability'))

Scala Code:

import org.apache.spark.sql.functions.udf

val getPOne = udf((v: org.apache.spark.mllib.linalg.Vector) => v(1))
model.transform(testDf).select(getPOne($"probability"))

I get the following error when I run the Scala code:

scala> predictions.select(getPOne(col("probability"))).show(false)
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(probability)' due to data type mismatch: argument 1 requires vector type, however, '`probability`' is of vector type.;;
'Project [UDF(probability#39) AS UDF(probability)#135]
+- Project [label#0, features#1, prediction#34, UDF(features#1) AS probability#39]
   +- Project [label#0, features#1, UDF(features#1) AS prediction#34]
      +- Relation[label#0,features#1] libsvm

I am currently using Scala 2.11.11 and Spark 2.1.1

Opportuna answered 13/6, 2017 at 21:29 Comment(0)
E
7

What I understand from your question is that you are trying to split probability column into two columns prob1 and prob2. If thats the case then a simple array functionality with withColumn should solve your issue.

predictions
  .withColumn("prob1", $"probability"(0))
  .withColumn("prob2", $"probability"(1))
  .drop("probability")

You can find more functions that can help you in the future to be applied to dataframes.

Edited

I created a temp dataframe to match with your column as

val predictions = Seq(Array(1.0,2.0), Array(2.0939961696605603E-15,0.9999999999999979), Array(Double.NaN,Double.NaN)).toDF("probability")
+--------------------------------------------+
|probability                                 |
+--------------------------------------------+
|[1.0, 2.0]                                  |
|[2.0939961696605603E-15, 0.9999999999999979]|
|[NaN, NaN]                                  |
+--------------------------------------------+

And I applied the above withColumns which resulted

+----------------------+------------------+
|prob1                 |prob2             |
+----------------------+------------------+
|1.0                   |2.0               |
|2.0939961696605603E-15|0.9999999999999979|
|NaN                   |NaN               |
+----------------------+------------------+

Schema mismatch Edit

Now that since Vector schema of your probability column doesn't match with above solution of arrayType schema, above solution shall not work in your condition. Please use the following solution.

You will have to create udf functions and return the value as expected as

   val first = udf((v: Vector) => v.toArray(0))
    val second = udf((v: Vector) => v.toArray(1))
    predictions
      .withColumn("prob1", first($"probability"))
      .withColumn("prob2", second($"probability"))
      .drop("probability")

I hope you get the desired result.

Enrico answered 14/6, 2017 at 2:47 Comment(1)
The correct Vector data type to use is org.apache.spark.ml.linalg.Vector Thanks a lot for your help and patience! Really appreciate it!Opportuna

© 2022 - 2024 — McMap. All rights reserved.