PySpark ML: Get KMeans cluster statistics
Asked Answered
K

1

2

I have built a KMeansModel. My results are stored in a PySpark DataFrame called transformed.

(a) How do I interpret the contents of transformed?

(b) How do I create one or more Pandas DataFrame from transformed that would show summary statistics for each of the 13 features for each of the 14 clusters?

from pyspark.ml.clustering import KMeans
# Trains a k-means model.
kmeans = KMeans().setK(14).setSeed(1)
model = kmeans.fit(X_spark_scaled) # Fits a model to the input dataset with optional parameters.

transformed = model.transform(X_spark_scaled).select("features", "prediction") # X_spark_scaled is my PySpark DataFrame consisting of 13 features
transformed.show(5, truncate = False)
+------------------------------------------------------------------------------------------------------------------------------------+----------+
|features                                                                                                                            |prediction|
+------------------------------------------------------------------------------------------------------------------------------------+----------+
|(14,[4,5,7,8,9,13],[1.0,1.0,485014.0,0.25,2.0,1.0])                                                                                 |12        |
|(14,[2,7,8,9,12,13],[1.0,2401233.0,1.0,1.0,1.0,1.0])                                                                                |2         |
|(14,[2,4,5,7,8,9,13],[0.3333333333333333,0.6666666666666666,0.6666666666666666,2429111.0,0.9166666666666666,1.3333333333333333,3.0])|2         |
|(14,[4,5,7,8,9,12,13],[1.0,1.0,2054748.0,0.15384615384615385,11.0,1.0,1.0])                                                         |11        |
|(14,[2,7,8,9,13],[1.0,43921.0,1.0,1.0,1.0])                                                                                         |1         |
+------------------------------------------------------------------------------------------------------------------------------------+----------+
only showing top 5 rows

As an aside, I found from another SO post that I can map the features to their names like below. It would be nice to have summary statistics (mean, median, std, min, max) for each feature of each cluster in one or more Pandas dataframes.

attr_list = [attr for attr in chain(*transformed.schema['features'].metadata['ml_attr']['attrs'].values())]
attr_list

Per request in the comments, here is a snapshot consisting of 2 records of the data (don't want to provide too many records -- proprietary information here)

+---------------------+------------------------+-----------------------+----------------------+----------------------+------------------------------+---------------------------------+------------+-------------------+--------------------+------------------------------------+--------------------------+-------------------------------+-----------------+--------------------+--------------------+
|device_type_robot_pct|device_type_smart_tv_pct|device_type_desktop_pct|device_type_tablet_pct|device_type_mobile_pct|device_type_mobile_persist_pct|visitors_seen_with_anonymiser_pct|ip_time_span|          ip_weight|mean_ips_per_visitor|visitors_seen_with_multi_country_pct|international_visitors_pct|visitors_seen_with_multi_ua_pct|count_tuids_on_ip|            features|      scaledFeatures|
+---------------------+------------------------+-----------------------+----------------------+----------------------+------------------------------+---------------------------------+------------+-------------------+--------------------+------------------------------------+--------------------------+-------------------------------+-----------------+--------------------+--------------------+
|                  0.0|                     0.0|                    0.0|                   0.0|                   1.0|                           1.0|                              0.0|    485014.0|               0.25|                 2.0|                                 0.0|                       0.0|                            0.0|              1.0|(14,[4,5,7,8,9,13...|(14,[4,5,7,8,9,13...|
|                  0.0|                     0.0|                    1.0|                   0.0|                   0.0|                           0.0|                              0.0|   2401233.0|                1.0|                 1.0|                                 0.0|                       0.0|                            1.0|              1.0|(14,[2,7,8,9,12,1...|(14,[2,7,8,9,12,1...|
Karlykarlyn answered 6/11, 2017 at 5:30 Comment(8)
Can you also show a sample of the initial data X_spark_scaled, please?Scansion
I would rather use sklearn than pyspark. Run a benchmark, which is faster? Which is easier to use? Which finds better clusters?Pali
@Anony-Mousse I actually tried sklearn, but I have about 600bn records and that is far too many for sklearn to be able to bring into memoryKarlykarlyn
@Scansion will do later todayKarlykarlyn
For kmeans, that size doesn't make that much sense anymore. (Plus, you probably have problems getting a statistically meaningful result anyway). A sample is enough. And Spark kmeans is so incredibly slow... I have yet to see a use case where Spark on the entire data would give a better result in reasonable time than a better kmeans on a single-node on "as much data as you can fit into one bodes memory".Pali
@Anony-Mousse typo. *600mn. I'm using PySpark's MLlib and it was fast enough (30-90 minutes? ran in the background on our computing cluster). regardless, this convo is a tangent relative to the question I've posedKarlykarlyn
600m, that fits into main memory of a single server. Did you run until convergence? (setEpsilon 0)? 'cause if you stop early, then you really can just use a sample right away to get equally good results.Pali
It's not just "tangential" to your question: pyspark is much more limited than sklearn, and if you'd use sklearn instead of pyspark (which isn't native Python), the question would be much easier to answer.Pali
S
12

As Anony-Mousse has commented, (Py)Spark ML is indeed much more limited that scikit-learn or other similar packages, and such functionality is not trivial; nevertheless, here is a way to get what you want (cluster statistics):

spark.version
# u'2.2.0'

from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# toy data - 5-d features including sparse vectors
df = spark.createDataFrame(
 [(Vectors.sparse(5,[(0, 164.0),(1,520.0)]), 1.0),
  (Vectors.dense([519.0,2723.0,0.0,3.0,4.0]), 1.0),
  (Vectors.sparse(5,[(0, 2868.0), (1, 928.0)]), 1.0),
  (Vectors.sparse(5,[(0, 57.0), (1, 2715.0)]), 0.0),
  (Vectors.dense([1241.0,2104.0,0.0,0.0,2.0]), 1.0)],
 ["features", "target"])

df.show()
# +--------------------+------+ 
# |            features|target| 
# +--------------------+------+ 
# |(5,[0,1],[164.0,5...|   1.0|
# |[519.0,2723.0,0.0...|   1.0| 
# |(5,[0,1],[2868.0,...|   1.0|
# |(5,[0,1],[57.0,27...|   0.0| 
# |[1241.0,2104.0,0....|   1.0|
# +--------------------+------+

kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df.select('features'))

transformed = model.transform(df).select("features", "prediction")
transformed.show()
# +--------------------+----------+
# |            features|prediction|
# +--------------------+----------+
# |(5,[0,1],[164.0,5...|         1| 
# |[519.0,2723.0,0.0...|         2|
# |(5,[0,1],[2868.0,...|         0|
# |(5,[0,1],[57.0,27...|         2|
# |[1241.0,2104.0,0....|         2|
# +--------------------+----------+

Up to here, and regarding your first question:

How do I interpret the contents of transformed?

The features column is just a replication of the same column in your original data.

The prediction column is the cluster to which the respective data record belongs to; in my example, with 5 data records and k=3 clusters, I end up with 1 record in cluster #0, 1 record in cluster #1, and 3 records in cluster #2.

Regarding your second question:

How do I create one or more Pandas DataFrame from transformed that would show summary statistics for each of the 13 features for each of the 14 clusters?

(Note: seems you have 14 features and not 13...)

This is a good example of a seemingly simple task for which, unfortunately, PySpark does not provide ready functionality - not least because all features are grouped in a single vector features; to do that, we must first "disassemble" features, effectively coming up with the invert operation of VectorAssembler.

The only way I can presently think of is to revert temporarily to an RDD and perform a map operation [EDIT: this is not really necessary - see UPDATE below]; here is an example with my cluster #2 above, which contains both dense and sparse vectors:

# keep only cluster #2:
cl_2 = transformed.filter(transformed.prediction==2)
cl_2.show() 
# +--------------------+----------+ 
# |            features|prediction|
# +--------------------+----------+
# |[519.0,2723.0,0.0...|         2|
# |(5,[0,1],[57.0,27...|         2|
# |[1241.0,2104.0,0....|         2| 
# +--------------------+----------+

# set the data dimensionality as a parameter:
dimensionality = 5

cluster_2 = cl_2.drop('prediction').rdd.map(lambda x: [float(x[0][i]) for i in range(dimensionality)]).toDF(schema=['x'+str(i) for i in range(dimensionality)])
cluster_2.show()
# +------+------+---+---+---+ 
# |    x0|    x1| x2| x3| x4|
# +------+------+---+---+---+
# | 519.0|2723.0|0.0|3.0|4.0|
# |  57.0|2715.0|0.0|0.0|0.0| 
# |1241.0|2104.0|0.0|0.0|2.0|
# +------+------+---+---+---+

(If you have your initial data in a Spark dataframe initial_data, you can change the last part to toDF(schema=initial_data.columns), in order to keep the original feature names.)

From this point, you could either convert cluster_2 dataframe to a pandas one (if it fits in your memory), or use the describe() function of Spark dataframes to get your summary statistics:

cluster_2.describe().show()
# result:
+-------+-----------------+-----------------+---+------------------+---+ 
|summary|               x0|               x1| x2|                x3| x4|
+-------+-----------------+-----------------+---+------------------+---+ 
|  count|                3|                3|  3|                 3|  3|
|   mean|605.6666666666666|           2514.0|0.0|               1.0|2.0|
| stddev|596.7389155512932|355.0929455790413|0.0|1.7320508075688772|2.0|
|    min|             57.0|           2104.0|0.0|               0.0|0.0|
|    max|           1241.0|           2723.0|0.0|               3.0|4.0|
+-------+-----------------+-----------------+---+------------------+---+

Using the above code with dimensionality=14 in your case should do the job...

Annoyed with all these (arguably useless) significant digits in mean and stddev? As a bonus, here is a small utility function I had come up some time ago for a pretty summary:

def prettySummary(df):
    """ Neat summary statistics of a Spark dataframe
    Args:
        pyspark.sql.dataframe.DataFrame (df): input dataframe
    Returns:
        pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
    """
    import pandas as pd
    temp = df.describe().toPandas()
    temp.iloc[1:3,1:] = temp.iloc[1:3,1:].convert_objects(convert_numeric=True)
    pd.options.display.float_format = '{:,.2f}'.format
    return temp

stats_df = prettySummary(cluster_2)
stats_df
# result:
    summary     x0       x1   x2   x3   x4
 0  count        3        3    3    3    3 
 1   mean   605.67 2,514.00 0.00 1.00 2.00 
 2 stddev   596.74   355.09 0.00 1.73 2.00 
 3    min     57.0   2104.0  0.0  0.0  0.0 
 4    max   1241.0   2723.0  0.0  3.0  4.0

UPDATE: Thinking of it again, and seeing your sample data, I came up with a more straightforward solution, without the need to invoke an intermediate RDD (an operation that one would arguably prefer to avoid, if possible)...

The key observation is the complete contents of transformed, i.e. without the select statements; keeping the same toy dataset as above, we get:

transformed = model.transform(df)  # no 'select' statements
transformed.show()
# +--------------------+------+----------+
# |            features|target|prediction| 
# +--------------------+------+----------+
# |(5,[0,1],[164.0,5...|   1.0|         1|
# |[519.0,2723.0,0.0...|   1.0|         2|
# |(5,[0,1],[2868.0,...|   1.0|         0|
# |(5,[0,1],[57.0,27...|   0.0|         2|
# |[1241.0,2104.0,0....|   1.0|         2|
# +--------------------+------+----------+

As you can see, whatever other columns are present in the dataframe df to be transformed (just one in my case - target) just "pass-through" the transformation procedure and end-up being present in the final outcome...

Hopefully you start getting the idea: if df contains your initial 14 features, each one in a separate column, plus a 15th column named features (roughly as shown in your sample data, but without the last column), then the following code:

kmeans = KMeans().setK(14)
model = kmeans.fit(df.select('features'))
transformed = model.transform(df).drop('features')

will leave you with a Spark dataframe transformed containing 15 columns, i.e. your initial 14 features plus a prediction column with the corresponding cluster number.

From this point, you can proceed as I have shown above to filter specific clusters from transformed and get your summary statistics, but you'll have avoided the (costly...) conversion to intermediate temporary RDDs, thus keeping all your operations in the more efficient context of Spark dataframes...

Scansion answered 7/11, 2017 at 11:30 Comment(4)
"PySpark does not provide ready functionality. . ." Totally agree. We love sklearn, but our huge data volume forces us to use PySpark. "coming up with the invert operation of VectorAssembler" yes, that succinctly specifies exactly what I needed to do. Thank you very much for this detailed and helpful answer.Karlykarlyn
@Karlykarlyn You are very welcome; re "invert operation of VectorAssembler" - as I explain in the update, turns out it is not necessary if you already have the initial featuresScansion
yeah, so I was really asking the wrong question in my mind, and the wrong question on here (#2). But you gave the right answer! My fault for not fully understanding I was filtering out my initial features with the select and not knowing they were there to begin with. Thanks again!Karlykarlyn
@Karlykarlyn don't blame yourself - even in the docs, they always accompany transform with select("features", "prediction"); no accident that myself initially fell for the RDD solution...Scansion

© 2022 - 2024 — McMap. All rights reserved.