How to handle categorical features with spark-ml?
Asked Answered
W

5

49

How do I handle categorical data with spark-ml and not spark-mllib ?

Thought the documentation is not very clear, it seems that classifiers e.g. RandomForestClassifier, LogisticRegression, have a featuresCol argument, which specifies the name of the column of features in the DataFrame, and a labelCol argument, which specifies the name of the column of labeled classes in the DataFrame.

Obviously I want to use more than one feature in my prediction, so I tried using the VectorAssembler to put all my features in a single vector under featuresCol.

However, the VectorAssembler only accepts numeric types, boolean type, and vector type (according to the Spark website), so I can't put strings in my features vector.

How should I proceed?

Wheezy answered 28/8, 2015 at 18:28 Comment(2)
medium.com/@roshinijohri/…Ladino
I've added some examples on how categorical features can be handled with sparkLadino
M
59

I just wanted to complete Holden's answer.

Since Spark 2.3.0,OneHotEncoder has been deprecated and it will be removed in 3.0.0. Please use OneHotEncoderEstimator instead.

In Scala:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, StringIndexer}

val df = Seq((0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)).toDF("id", "category1", "category2")

val indexer = new StringIndexer().setInputCol("category1").setOutputCol("category1Index")
val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array(indexer.getOutputCol, "category2"))
  .setOutputCols(Array("category1Vec", "category2Vec"))

val pipeline = new Pipeline().setStages(Array(indexer, encoder))

pipeline.fit(df).transform(df).show
// +---+---------+---------+--------------+-------------+-------------+
// | id|category1|category2|category1Index| category1Vec| category2Vec|
// +---+---------+---------+--------------+-------------+-------------+
// |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
// |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
// |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
// |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
// +---+---------+---------+--------------+-------------+-------------+

In Python:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

df = spark.createDataFrame([(0, "a", 1), (1, "b", 2), (2, "c", 3), (3, "a", 4), (4, "a", 4), (5, "c", 3)], ["id", "category1", "category2"])

indexer = StringIndexer(inputCol="category1", outputCol="category1Index")
inputs = [indexer.getOutputCol(), "category2"]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["categoryVec1", "categoryVec2"])
pipeline = Pipeline(stages=[indexer, encoder])
pipeline.fit(df).transform(df).show()
# +---+---------+---------+--------------+-------------+-------------+
# | id|category1|category2|category1Index| categoryVec1| categoryVec2|
# +---+---------+---------+--------------+-------------+-------------+
# |  0|        a|        1|           0.0|(2,[0],[1.0])|(4,[1],[1.0])|
# |  1|        b|        2|           2.0|    (2,[],[])|(4,[2],[1.0])|
# |  2|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# |  3|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  4|        a|        4|           0.0|(2,[0],[1.0])|    (4,[],[])|
# |  5|        c|        3|           1.0|(2,[1],[1.0])|(4,[3],[1.0])|
# +---+---------+---------+--------------+-------------+-------------+

Since Spark 1.4.0, MLLib also supplies OneHotEncoder feature, which maps a column of label indices to a column of binary vectors, with at most a single one-value.

This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features

Let's consider the following DataFrame:

val df = Seq((0, "a"),(1, "b"),(2, "c"),(3, "a"),(4, "a"),(5, "c"))
            .toDF("id", "category")

The first step would be to create the indexed DataFrame with the StringIndexer:

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
                   .setInputCol("category")
                   .setOutputCol("categoryIndex")
                   .fit(df)

val indexed = indexer.transform(df)

indexed.show
// +---+--------+-------------+                                                    
// | id|category|categoryIndex|
// +---+--------+-------------+
// |  0|       a|          0.0|
// |  1|       b|          2.0|
// |  2|       c|          1.0|
// |  3|       a|          0.0|
// |  4|       a|          0.0|
// |  5|       c|          1.0|
// +---+--------+-------------+

You can then encode the categoryIndex with OneHotEncoder :

import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
                   .setInputCol("categoryIndex")
                   .setOutputCol("categoryVec")

val encoded = encoder.transform(indexed)

encoded.select("id", "categoryVec").show
// +---+-------------+
// | id|  categoryVec|
// +---+-------------+
// |  0|(2,[0],[1.0])|
// |  1|    (2,[],[])|
// |  2|(2,[1],[1.0])|
// |  3|(2,[0],[1.0])|
// |  4|(2,[0],[1.0])|
// |  5|(2,[1],[1.0])|
// +---+-------------+
Macromolecule answered 28/8, 2015 at 19:35 Comment(19)
Thanks, but I have 2 concerns: 1) Suppose I want to use decision trees, random forests, or anything else that can naturally handle categorical variables without binarizing them. What do I do in that case? 2) If I'm not wrong, StringIndexer assigns indices based on the frequency of each term. Does this mean that the training and testing sets will have different labels, making predictions meaningless?Wheezy
You have other kind of indexers. Try to look for what you need in the official documentation concerning feature extraction with MLlib! You can find, per example, VectorIndexerMacromolecule
Ok it seems that VectorIndexer is what I was looking for. I wanted a RandomForestClassifier to treat categorical and continuous variables differently without explicitly creating binary vectors from the categorical variables. Also it seems that my second concern was just wrong. StringIndexer assigns indices based on the frequency of each term in the training set. When the StringIndexerModel is used to transform the testing set, it retains the same index mappings from the training set, regardless of the frequency of terms in the testing set. Thanks for the help!Wheezy
if you are familiar with R it behaves like as.factor but a string is just given a numeric correspond to the string.Macromolecule
The solution works great...but is there a way to bucket less frequent categories in OTHER, to keep number of features in manageable size?Neologize
@Rainmaker: I had a follow up question, imagine a model is trained and is used in production, every new data needs to be hot encoding and then given to the model, will the index to category remain same? if a = 1 in model, if a = 1 for new data when the model is loaded in sparkWatercourse
@user3875610 I don't believe that Rainmaker will answer you. He hasn't been active since almost a year now.Macromolecule
@Macromolecule can you please explain the structure of categoryVec columnCooperstein
@AmirChoubani I’ll refer you to my answer here https://mcmap.net/q/356315/-spark-ml-vectorassembler-returns-strange-outputMacromolecule
@Macromolecule so, it is a sparse representation. But I think that a should have ( 2, [0], [0.0] ).Doesn't ??Cooperstein
@AmirChoubani no, zero elements are removed. Ref. en.m.wikipedia.org/wiki/Sparse_matrixMacromolecule
@DavidArenburg Could you please explain the difference between StringIndexer and HotEncoderFoam
@BasilPaul StringIndexer creates just an index that maps to the string whereas the OHE creates a sparse vector in which one dimension maps to the stringMacromolecule
@Macromolecule Thankyou for the valuable information. I have a dataset which Iam trying the excute using StringIndexer and OHE. I got stuck at a point. Could you help me out with that. Iam an beginner. Your help would reallly be appreaciatedFoam
@BasilPaul can you try to post your question so I can see how I can help ?Macromolecule
Sure. #55238909Foam
@Macromolecule . I have posted the question. Kindly checkFoam
@Macromolecule In your example you should mention that the OneHotEncoderEstimator assumes that the categories start from 0. Hence the first category vector size is 2 and the second is 4. Also it worth mentioning that both are constructed with the default option dropLast = true - not an obvious assumption in general...Doriandoric
With a column of strings that are pre-formatted, lets' say we have 5 different strings in the column. Do you have to run StringIndexer() as well as OneHotEncoderEstimator()?Abrahamsen
L
50

I am going to provide an answer from another perspective, since I was also wondering about categorical features with regards to tree-based models in Spark ML (not MLlib), and the documentation is not that clear how everything works.

When you transform a column in your dataframe using pyspark.ml.feature.StringIndexer extra meta-data gets stored in the dataframe that specifically marks the transformed feature as a categorical feature.

When you print the dataframe you will see a numeric value (which is an index that corresponds with one of your categorical values) and if you look at the schema you will see that your new transformed column is of type double. However, this new column you created with pyspark.ml.feature.StringIndexer.transform is not just a normal double column, it has extra meta-data associated with it that is very important. You can inspect this meta-data by looking at the metadata property of the appropriate field in your dataframe's schema (you can access the schema objects of your dataframe by looking at yourdataframe.schema)

This extra metadata has two important implications:

  1. When you call .fit() when using a tree based model, it will scan the meta-data of your dataframe and recognize fields that you encoded as categorical with transformers such as pyspark.ml.feature.StringIndexer (as noted above there are other transformers that will also have this effect such as pyspark.ml.feature.VectorIndexer). Because of this, you DO NOT have to one-hot encode your features after you have transformed them with StringIndxer when using tree-based models in spark ML (however, you still have to perform one-hot encoding when using other models that do not naturally handle categoricals like linear regression, etc.).

  2. Because this metadata is stored in the data frame, you can use pyspark.ml.feature.IndexToString to reverse the numeric indices back to the original categorical values (which are often strings) at any time.

Loculus answered 15/11, 2016 at 16:56 Comment(4)
Could you please point me to source code where it scans metadata of dataframe for any tree based algorithm? Also would it make sense to use rformula + tree based alsgorithm in pipeline?? Rformula internally uses stringIndexer + one hot encoder + vector assembler.Carrollcarronade
@hadooper: github.com/apache/spark/blob/v2.2.0/mllib/src/main/scala/org/…Loculus
But if GBTClassifier expects the dataframe to have just two columns: "label" and "features", and the "features" column should be of type Vector with its values of type double, as I understand, how can the metadata created by StringIndexer be passed into GBTClassifier?Vito
With a column of strings. Do you have to run StringIndexer() as well as OneHotEncoderEstimator()?Abrahamsen
C
7

There is a component of the ML pipeline called StringIndexer you can use to convert your strings to Double's in a reasonable way. http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.feature.StringIndexer has more documentation, and http://spark.apache.org/docs/latest/ml-guide.html shows how to construct pipelines.

Chico answered 28/8, 2015 at 18:42 Comment(1)
With a column of strings. Do you have to run StringIndexer() as well as OneHotEncoderEstimator()? Or can you just run the latter?Abrahamsen
M
0

I use the following method for oneHotEncoding a single column in a Spark dataFrame:

def ohcOneColumn(df, colName, debug=False):

  colsToFillNa = []

  if debug: print("Entering method ohcOneColumn")
  countUnique = df.groupBy(colName).count().count()
  if debug: print(countUnique)

  collectOnce = df.select(colName).distinct().collect()
  for uniqueValIndex in range(countUnique):
    uniqueVal = collectOnce[uniqueValIndex][0]
    if debug: print(uniqueVal)
    newColName = str(colName) + '_' + str(uniqueVal) + '_TF'
    df = df.withColumn(newColName, df[colName]==uniqueVal)
    colsToFillNa.append(newColName)
  df = df.drop(colName)
  df = df.na.fill(False, subset=colsToFillNa)
  return df

I use the following method for oneHotEncoding Spark dataFrames:

from pyspark.sql.functions import col, countDistinct, approxCountDistinct
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator

def detectAndLabelCat(sparkDf, minValCount=5, debug=False, excludeCols=['Target']):
  if debug: print("Entering method detectAndLabelCat")
  newDf = sparkDf
  colList = sparkDf.columns

  for colName in sparkDf.columns:
    uniqueVals = sparkDf.groupBy(colName).count()
    if debug: print(uniqueVals)
    countUnique = uniqueVals.count()
    dtype = str(sparkDf.schema[colName].dataType)
    #dtype = str(df.schema[nc].dataType)
    if (colName in excludeCols):
      if debug: print(str(colName) + ' is in the excluded columns list.')

    elif countUnique == 1:
      newDf = newDf.drop(colName)
      if debug:
        print('dropping column ' + str(colName) + ' because it only contains one unique value.')
      #end if debug
    #elif (1==2):
    elif ((countUnique < minValCount) | (dtype=="String") | (dtype=="StringType")):
      if debug: 
        print(len(newDf.columns))
        oldColumns = newDf.columns
      newDf = ohcOneColumn(newDf, colName, debug=debug)
      if debug: 
        print(len(newDf.columns))
        newColumns = set(newDf.columns) - set(oldColumns)
        print('Adding:')
        print(newColumns)
        for newColumn in newColumns:
          if newColumn in newDf.columns:
            try:
              newUniqueValCount = newDf.groupBy(newColumn).count().count()
              print("There are " + str(newUniqueValCount) + " unique values in " + str(newColumn))
            except:
              print('Uncaught error discussing ' + str(newColumn))
          #else:
          #  newColumns.remove(newColumn)

        print('Dropping:')
        print(set(oldColumns) - set(newDf.columns))

    else:
      if debug: print('Nothing done for column ' + str(colName))

      #end if countUnique == 1, elif countUnique other condition
    #end outer for
  return newDf
Macswan answered 17/7, 2019 at 17:58 Comment(1)
#To test the above methods, I use the following: tdf = spark.createDataFrame([ ('horse', 'orange'), ('cow', 'apple'), ('pig', 'orange'), ('horse', 'pineapple'), ('horse', 'orange'), ('pig', 'apple') ], ["animalType", "fruitType"]) tdf.show() newDf = ohcOneColumn(tdf, "animalType", debug=False) newDf.show() newerDf = detectAndLabelCat(tdf, debug=False) newerDf.show()Macswan
H
-2

You can cast a string column type in a spark data frame to a numerical data type using the cast function.

from pyspark.sql import SQLContext
from pyspark.sql.types import DoubleType, IntegerType

sqlContext = SQLContext(sc)
dataset = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('./data/titanic.csv')   

dataset = dataset.withColumn("Age", dataset["Age"].cast(DoubleType()))
dataset = dataset.withColumn("Survived", dataset["Survived"].cast(IntegerType()))

In the above example, we read in a csv file as a data frame, cast the default string datatypes into integer and double, and overwrite the original data frame. We can then use the VectorAssembler to merge the features in a single vector and apply your favorite Spark ML algorithm.

Hebdomadal answered 27/5, 2017 at 23:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.