pyspark - Convert sparse vector obtained after one hot encoding into columns
Asked Answered
P

4

7

I am using apache Spark ML lib to handle categorical features using one hot encoding. After writing the below code I am getting a vector c_idx_vec as output of one hot encoding. I do understand how to interpret this output vector but I am unable to figure out how to convert this vector into columns so that I get a new transformed dataframe.Take this dataset for example:

>>> fd = spark.createDataFrame( [(1.0, "a"), (1.5, "a"), (10.0, "b"), (3.2, "c")], ["x","c"])
>>> ss = StringIndexer(inputCol="c",outputCol="c_idx")
>>> ff = ss.fit(fd).transform(fd)
>>> ff.show()

    +----+---+-----+
    |   x|  c|c_idx|
    +----+---+-----+
    | 1.0|  a|  0.0|
    | 1.5|  a|  0.0|
    |10.0|  b|  1.0|
    | 3.2|  c|  2.0|
    +----+---+-----+

By default, the OneHotEncoder will drop the last category:

>>> oe = OneHotEncoder(inputCol="c_idx",outputCol="c_idx_vec")
>>> fe = oe.transform(ff)
>>> fe.show()
    +----+---+-----+-------------+
    |   x|  c|c_idx|    c_idx_vec|
    +----+---+-----+-------------+
    | 1.0|  a|  0.0|(2,[0],[1.0])|
    | 1.5|  a|  0.0|(2,[0],[1.0])|
    |10.0|  b|  1.0|(2,[1],[1.0])|
    | 3.2|  c|  2.0|    (2,[],[])|
    +----+---+-----+-------------+

Of course, this behavior can be changed:

>>> oe.setDropLast(False)
>>> fl = oe.transform(ff)
>>> fl.show()

    +----+---+-----+-------------+
    |   x|  c|c_idx|    c_idx_vec|
    +----+---+-----+-------------+
    | 1.0|  a|  0.0|(3,[0],[1.0])|
    | 1.5|  a|  0.0|(3,[0],[1.0])|
    |10.0|  b|  1.0|(3,[1],[1.0])|
    | 3.2|  c|  2.0|(3,[2],[1.0])|
    +----+---+-----+-------------+

So, I wanted to know how to convert my c_idx_vec vector into new dataframe as below:

enter image description here

Popgun answered 19/6, 2018 at 14:48 Comment(2)
Please clarify what exactly your question is, and give also an example of your required resultHatshepsut
@Hatshepsut I have added my expected output. Please find link to see the output format for new dataframe after one hot encoding. This is similar to dataframe we obtain using get dummies in pandasPopgun
L
9

Here is what you can do:

>>> from pyspark.ml.feature import OneHotEncoder, StringIndexer
>>>
>>> fd = spark.createDataFrame( [(1.0, "a"), (1.5, "a"), (10.0, "b"), (3.2, "c")], ["x","c"])
>>> ss = StringIndexer(inputCol="c",outputCol="c_idx")
>>> ff = ss.fit(fd).transform(fd)
>>> ff.show()
+----+---+-----+
|   x|  c|c_idx|
+----+---+-----+
| 1.0|  a|  0.0|
| 1.5|  a|  0.0|
|10.0|  b|  1.0|
| 3.2|  c|  2.0|
+----+---+-----+

>>>
>>> oe = OneHotEncoder(inputCol="c_idx",outputCol="c_idx_vec")
>>> oe.setDropLast(False)
OneHotEncoder_49e58b281387d8dc0c6b
>>> fl = oe.transform(ff)
>>> fl.show()
+----+---+-----+-------------+
|   x|  c|c_idx|    c_idx_vec|
+----+---+-----+-------------+
| 1.0|  a|  0.0|(3,[0],[1.0])|
| 1.5|  a|  0.0|(3,[0],[1.0])|
|10.0|  b|  1.0|(3,[1],[1.0])|
| 3.2|  c|  2.0|(3,[2],[1.0])|
+----+---+-----+-------------+

// Get c and its repective index. One hot encoder will put those on same index in vector

>>> colIdx = fl.select("c","c_idx").distinct().rdd.collectAsMap()
>>> colIdx
{'c': 2.0, 'b': 1.0, 'a': 0.0}
>>>
>>> colIdx =  sorted((value, "ls_" + key) for (key, value) in colIdx.items())
>>> colIdx
[(0.0, 'ls_a'), (1.0, 'ls_b'), (2.0, 'ls_c')]
>>>
>>> newCols = list(map(lambda x: x[1], colIdx))
>>> actualCol = fl.columns
>>> actualCol
['x', 'c', 'c_idx', 'c_idx_vec']
>>> allColNames = actualCol + newCols
>>> allColNames
['x', 'c', 'c_idx', 'c_idx_vec', 'ls_a', 'ls_b', 'ls_c']
>>>
>>> def extract(row):
...     return tuple(map(lambda x: row[x], row.__fields__)) + tuple(row.c_idx_vec.toArray().tolist())
...
>>> result = fl.rdd.map(extract).toDF(allColNames)
>>> result.show(20, False)
+----+---+-----+-------------+----+----+----+
|x   |c  |c_idx|c_idx_vec    |ls_a|ls_b|ls_c|
+----+---+-----+-------------+----+----+----+
|1.0 |a  |0.0  |(3,[0],[1.0])|1.0 |0.0 |0.0 |
|1.5 |a  |0.0  |(3,[0],[1.0])|1.0 |0.0 |0.0 |
|10.0|b  |1.0  |(3,[1],[1.0])|0.0 |1.0 |0.0 |
|3.2 |c  |2.0  |(3,[2],[1.0])|0.0 |0.0 |1.0 |
+----+---+-----+-------------+----+----+----+

// Typecast new columns to int

>>> for col in newCols:
...     result = result.withColumn(col, result[col].cast("int"))
...
>>> result.show(20, False)
+----+---+-----+-------------+----+----+----+
|x   |c  |c_idx|c_idx_vec    |ls_a|ls_b|ls_c|
+----+---+-----+-------------+----+----+----+
|1.0 |a  |0.0  |(3,[0],[1.0])|1   |0   |0   |
|1.5 |a  |0.0  |(3,[0],[1.0])|1   |0   |0   |
|10.0|b  |1.0  |(3,[1],[1.0])|0   |1   |0   |
|3.2 |c  |2.0  |(3,[2],[1.0])|0   |0   |1   |
+----+---+-----+-------------+----+----+----+

Hope this helps!!

Lynellelynett answered 20/6, 2018 at 9:45 Comment(4)
Converting a dataframe to RDD (in order to perform map operations) is in general not advisable, and it should be avoided except when absolutely necessaryHatshepsut
Please, try to be constructive and not ironic (irony was certainly not the spirit of my reply to your comment)...Hatshepsut
willing to mutually upvote, if you agree ;) (OP has too little reputation to do so, even if he accepts one of our answers)Hatshepsut
Thanks @hadooper, This is helpful.Popgun
H
4

Not sure it is the most efficient or simple way, but you can do it with a udf; starting from your fl dataframe:

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

(fl.withColumn('is_a', ith("c_idx_vec", lit(0)))
   .withColumn('is_b', ith("c_idx_vec", lit(1)))
   .withColumn('is_c', ith("c_idx_vec", lit(2))).show())

The result is:

+----+---+-----+-------------+----+----+----+
|   x|  c|c_idx|    c_idx_vec|is_a|is_b|is_c|   
+----+---+-----+-------------+----+----+----+
| 1.0|  a|  0.0|(3,[0],[1.0])| 1.0| 0.0| 0.0|
| 1.5|  a|  0.0|(3,[0],[1.0])| 1.0| 0.0| 0.0|
|10.0|  b|  1.0|(3,[1],[1.0])| 0.0| 1.0| 0.0|
| 3.2|  c|  2.0|(3,[2],[1.0])| 0.0| 0.0| 1.0| 
+----+---+-----+-------------+----+----+----+

i.e. exactly as requested.

HT (and +1) to this answer that provided the udf.

Hatshepsut answered 20/6, 2018 at 9:21 Comment(3)
This is making assumption that vector length will always be of length 3Lynellelynett
Thanks@HatshepsutPopgun
@hadooper me too ;)Hatshepsut
S
2

Given that the situation is specified to the case that StringIndexer was used to generate the index number, and then One-hot encoding is generated using OneHotEncoderEstimator. The entire code from end to end should be like:

  1. Generate the data and index the string values, with the StringIndexerModel object is "saved"
>>> fd = spark.createDataFrame( [(1.0, "a"), (1.5, "a"), (10.0, "b"), (3.2, "c")], ["x","c"])
>>> ss = StringIndexer(inputCol="c",outputCol="c_idx")
>>>
>>> # need to save the indexer model object for indexing label info to be used later
>>> ss_fit = ss.fit(fd)
>>> ss_fit.labels   # to be used later
['a', 'b', 'c']

>>> ff = ss_fit.transform(fd)
>>> ff.show()

    +----+---+-----+
    |   x|  c|c_idx|
    +----+---+-----+
    | 1.0|  a|  0.0|
    | 1.5|  a|  0.0|
    |10.0|  b|  1.0|
    | 3.2|  c|  2.0|
    +----+---+-----+
  1. Do one-hot encoding using OneHotEncoderEstimator class, since OneHotEncoder is deprecating
>>> oe = OneHotEncoderEstimator(inputCols=["c_idx"],outputCols=["c_idx_vec"])
>>> oe_fit = oe.fit(ff)
>>> fe = oe_fit.transform(ff)
>>> fe.show()
    +----+---+-----+-------------+
    |   x|  c|c_idx|    c_idx_vec|
    +----+---+-----+-------------+
    | 1.0|  a|  0.0|(2,[0],[1.0])|
    | 1.5|  a|  0.0|(2,[0],[1.0])|
    |10.0|  b|  1.0|(2,[1],[1.0])|
    | 3.2|  c|  2.0|    (2,[],[])|
    +----+---+-----+-------------+
  1. Perform one-hot binary value reshaping. The one-hot values will always be 0.0 or 1.0.
>>> from pyspark.sql.types dimport FloatType, IntegerType
>>> from pyspark.sql.functions import lit, udf

>>> ith = udf(lambda v, i: float(v[i]), FloatType())
>>> fx = fe
>>> for sidx, oe_col in zip([ss_fit], oe.getOutputCols()):
... 
...     # iterate over string values and ignore the last one
...     for ii, val in list(enumerate(sidx.labels))[:-1]:
...         fx = fx.withColumn(
...             sidx.getInputCol() + '_' + val, 
...             ith(oe_col, lit(ii)).astype(IntegerType())
...         )
>>> fx.show()
+----+---+-----+-------------+---+---+
|   x|  c|c_idx|    c_idx_vec|c_a|c_b|
+----+---+-----+-------------+---+---+
| 1.0|  a|  0.0|(2,[0],[1.0])|  1|  0|
| 1.5|  a|  0.0|(2,[0],[1.0])|  1|  0|
|10.0|  b|  1.0|(2,[1],[1.0])|  0|  1|
| 3.2|  c|  2.0|    (2,[],[])|  0|  0|
+----+---+-----+-------------+---+---+

To be noticed that Spark, by default, removes the last category. So, following the behavior, the c_c column is not necessary here.

Syphilology answered 4/2, 2020 at 5:52 Comment(0)
B
1

I can't find a way to access sparse vector with data frame and i converted it to rdd.

from pyspark.sql import Row

# column names
labels = ['a', 'b', 'c']
extract_f = lambda row: Row(**row.asDict(), **dict(zip(labels, row.c_idx_vec.toArray())))
fe.rdd.map(extract_f).collect()
Bermejo answered 21/6, 2018 at 21:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.