KMeans clustering in PySpark
Asked Answered
B

2

17

I have a spark dataframe 'mydataframe' with many columns. I am trying to run kmeans on only two columns: lat and long (latitude & longitude) using them as simple values). I want to extract 7 clusters based on just those 2 columns and then I want to attach the cluster asignment to my original dataframe. I've tried:

from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel

# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd  # needs to be an RDD
data_rdd.cache()

# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")

But I am getting an error after a while:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5191.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5191.0 (TID 260738, 10.19.211.69, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last)

I've tried to detach and re-attach the cluster. Same result. What am I doing wrong?

Bonaparte answered 1/12, 2017 at 2:22 Comment(3)
On geographic data, use Haversine distance, and don't use kmeans.Refrigeration
@Anony-Mousse -wow, thank you! What would you recommend as more appropriate for latitude and longitude clustering?Bonaparte
Haversine distance, and OPTICS clustering.Refrigeration
P
79

Since, based on another recent question of yours, I guess you are in your very first steps with Spark clustering (you are even importing sqrt & array, without ever using them, probably because it is like that in the docs example), let me offer advice in a more general level rather than in the specific question you are asking here (hopefully also saving you from subsequently opening 3-4 more questions, trying to get your cluster assignments back into your dataframe)...

Since

  1. you have your data already in a dataframe

  2. you want to attach the cluster membership back into your initial dataframe

you have no reason to revert to an RDD and use the (soon to be deprecated) MLlib package; you will do your job much more easily, elegantly, and efficiently using the (now recommended) ML package, which works directly with dataframes.

Step 0 - make some toy data resembling yours:

spark.version
# u'2.2.0'

df = spark.createDataFrame([[0, 33.3, -17.5],
                              [1, 40.4, -20.5],
                              [2, 28., -23.9],
                              [3, 29.5, -19.0],
                              [4, 32.8, -18.84]
                             ],
                              ["other","lat", "long"])

df.show()
# +-----+----+------+
# |other| lat|  long|
# +-----+----+------+
# |    0|33.3| -17.5|
# |    1|40.4| -20.5| 
# |    2|28.0| -23.9|
# |    3|29.5| -19.0|
# |    4|32.8|-18.84|
# +-----+----+------+

Step 1 - assemble your features

In contrast to most ML packages out there, Spark ML requires your input features to be gathered in a single column of your dataframe, usually named features; and it provides a specific method for doing this, VectorAssembler:

from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+ 
# |other| lat|  long|     features|
# +-----+----+------+-------------+
# |    0|33.3| -17.5| [33.3,-17.5]|
# |    1|40.4| -20.5| [40.4,-20.5]|
# |    2|28.0| -23.9| [28.0,-23.9]| 
# |    3|29.5| -19.0| [29.5,-19.0]|
# |    4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+ 

As perhaps already guessed, the argument inputCols serves to tell VectoeAssembler which particular columns in our dataframe are to be used as features.

Step 2 - fit your KMeans model

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))

select('features') here serves to tell the algorithm which column of the dataframe to use for clustering - remember that, after Step 1 above, your original lat & long features are no more directly used.

Step 3 - transform your initial dataframe to include cluster assignments

transformed = model.transform(new_df)
transformed.show()    
# +-----+----+------+-------------+----------+ 
# |other| lat|  long|     features|prediction|
# +-----+----+------+-------------+----------+
# |    0|33.3| -17.5| [33.3,-17.5]|         0| 
# |    1|40.4| -20.5| [40.4,-20.5]|         1|
# |    2|28.0| -23.9| [28.0,-23.9]|         0|
# |    3|29.5| -19.0| [29.5,-19.0]|         0|
# |    4|32.8|-18.84|[32.8,-18.84]|         0|
# +-----+----+------+-------------+----------+

The last column of the transformed dataframe, prediction, shows the cluster assignment - in my toy case, I have ended up with 4 records in cluster #0 and 1 record in cluster #1.

You can further manipulate the transformed dataframe with select statements, or even drop the features column (which has now fulfilled its function and may be no longer necessary)...

Hopefully you are much closer now to what you actually wanted to achieve in the first place. For extracting cluster statistics etc., another recent answer of mine might be helpful...

Pliers answered 1/12, 2017 at 12:45 Comment(4)
Dear desertnaut, huge thank you for taking your time and writing the best stackoverflow answer I've ever read. I'll be sure to keep it an excellent source going forward. Yes, you guessed correctly - I would have asked more questions! :) I had no idea I am using some old, depreciated library and I am very glad you showed me the 'right path'. I understood everything in your excellent explanation. One tiny question (more Spark-related than kMeans related): Is this OK - from storage and memory perspective - to produce more and more new dataframes (df, then df_new) - even if df is huge?Bonaparte
@Bonaparte standard practice has it to assign your transformed data in new dataframes as you go. In any case, experiment and see...Pliers
As @Pliers mentioned, converting to rdd for your ML operations is highly inefficient. That being said, alas, even the KMeans method in the pyspark.ml.clustering library still uses the collect function when getting your model outputs. This renders the spark capability useless when applying Kmeans on very large sets of data and all your worker nodes will be idle and only your driver node will be working overtimeReed
Running this exact code gave me the classification 0011 instead of 0100 in your example. Maybe some default values of the algorithm have been changed in newer versions? (I used pyspark 3.4.1)Barbitone
P
6

Despite my other general answer, and in case you, for whatever reason, must stick with MLlib & RDDs, here is what causes your error using the same toy df.

When you select columns from a dataframe to convert to RDD, as you do, the result is an RDD of Rows:

df.select('lat', 'long').rdd.collect()
# [Row(lat=33.3, long=-17.5), Row(lat=40.4, long=-20.5), Row(lat=28.0, long=-23.9), Row(lat=29.5, long=-19.0), Row(lat=32.8, long=-18.84)]

which is not suitable as an input to MLlib KMeans. You'll need a map operation for this to work:

df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1])).collect()
# [(33.3, -17.5), (40.4, -20.5), (28.0, -23.9), (29.5, -19.0), (32.8, -18.84)]

So, your code should be like this:

from pyspark.mllib.clustering import KMeans, KMeansModel

rdd = df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1]))
clusters = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") # works OK
clusters.centers
# [array([ 40.4, -20.5]), array([ 30.9 , -19.81])]
Pliers answered 1/12, 2017 at 13:12 Comment(2)
great adding. One thing, collect() return list and you can send dataframe to kmeans training model also.Shillyshally
We use collect only for the final results; if we could use it here, there would be no reason to bother with Spark whatsoever - we would be far better off with scikit-learn or similar...Pliers

© 2022 - 2024 — McMap. All rights reserved.