How to use QuantileDiscretizer across groups in a DataFrame?
Asked Answered
B

2

8

I have a DataFrame with the following columns.

scala> show_times.printSchema
root
 |-- account: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- show_name: string (nullable = true)
 |-- total_time_watched: integer (nullable = true)

This is data about how many times customer has watched watched a particular show. I'm supposed to categorize the customer for each show based on total time watched.

The dataset has 133 million rows in total with 192 distinct show_names.

For each individual show I'm supposed to bin the customer into 3 categories (1,2,3).

I use Spark MLlib's QuantileDiscretizer

Currently I loop through every show and run QuantileDiscretizer in the sequential manner as in the code below.

enter image description here

What I'd like to have in the end is for the following sample input to get the sample output.

Sample Input:

account,channel,show_name,total_time_watched
acct1,ESPN,show1,200
acct2,ESPN,show1,250
acct3,ESPN,show1,800
acct4,ESPN,show1,850
acct5,ESPN,show1,1300
acct6,ESPN,show1,1320
acct1,ESPN,show2,200
acct2,ESPN,show2,250
acct3,ESPN,show2,800
acct4,ESPN,show2,850
acct5,ESPN,show2,1300
acct6,ESPN,show2,1320

Sample Output:

account,channel,show_name,total_time_watched,Time_watched_bin
acct1,ESPN,show1,200,1
acct2,ESPN,show1,250,1
acct3,ESPN,show1,800,2
acct4,ESPN,show1,850,2
acct5,ESPN,show1,1300,3
acct6,ESPN,show1,1320,3
acct1,ESPN,show2,200,1
acct2,ESPN,show2,250,1
acct3,ESPN,show2,800,2
acct4,ESPN,show2,850,2
acct5,ESPN,show2,1300,3
acct6,ESPN,show2,1320,3

Is there a more efficient and distributed way to do it using some groupBy-like operation instead of looping through each show_name and bin it one after other?

Biparty answered 2/5, 2017 at 16:27 Comment(1)
Can you please copy and paste the code (to get rid of the non-editable screenshot)? Thanks.Cold
C
2

I know nothing about QuantileDiscretizer, but think you're mostly concerned with the dataset to apply QuantileDiscretizer to. I think you want to figure out how to split your input dataset into smaller datasets per show_name (you said that there are 192 distinct show_name in the input dataset).

Solution 1: Partition Parquet Dataset

I've noticed that you use parquet as the input format. My understanding of the format is very limited but I've noticed that people are using some partitioning scheme to split large datasets into smaller chunks that they could then process whatever they like (per some partitioning scheme).

In your case the partitioning scheme could include show_name.

That would make your case trivial as the splitting were done at writing time (aka not my problem anymore).

See How to save a partitioned parquet file in Spark 2.1?

Solution 2: Scala's Future

Given your iterative solution, you could wrap every iteration into a Future that you'd submit to process in parallel.

Spark SQL's SparkSession (and Spark Core's SparkContext) are thread-safe.

Solution 3: Dataset's filter and union operators

I would think twice before following this solution since it puts burden on your shoulders which I think could easily be sorted out by solution 1.

Given you've got one large 133-million-row parquet file, I'd first build the 192 datasets per show_name using filter operator (as you did to build show_rdd which is against the name as it's a DataFrame not RDD) and union (again as you did).

See Dataset API.

Solution 4: Use Window Functions

That's something I think could work, but didn't check it out myself.

You could use window functions (see WindowSpec and Column's over operator).

Window functions would give you partitioning (windows) while over would somehow apply QuantileDiscretizer to a window/partition. That would however require "destructuring" QuantileDiscretizer into an Estimator to train a model and somehow fit the result model to the window again.

I think it's doable, but haven't done it myself. Sorry.

Cold answered 3/5, 2017 at 9:30 Comment(2)
Were you able to do the first solution ? I have the exact problem at hand, only difference is that the number of partitions is a larger number.Marthamarthe
This is a premature answer to the question. Code speaks louder.Feinberg
P
0

This is older question. However answering it to help someone with same situation in future.

It can be achieved using pandas udf function. Both input and output of pandas UDF function is dataframe. We need to provide schema of the output dataframe as shown in annotation in below code sample. Below code sample can achieve required result.

output_schema = StructType(df.schema.fields + [StructField('Time_watched_bin', IntegerType(), True)])

@pandas_udf(output_schema, PandasUDFType.GROUPED_MAP)
# pdf: pandas dataframe
def get_buckets(pdf):
    pdf['Time_watched_bin'] = pd.cut(pdf['total_time_watched'], 3, labels=False)
        
    return pdf

df = df.groupby('show_name').apply(get_buckets)

df will have new column 'Time_watched_bin' with bucket information.

Peonage answered 1/7, 2020 at 10:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.