Renaming columns for PySpark DataFrame aggregates
Asked Answered
B

11

111

I am analysing some data with PySpark DataFrames. Suppose I have a DataFrame df that I am aggregating:

(df.groupBy("group")
   .agg({"money":"sum"})
   .show(100)
)

This will give me:

group                SUM(money#2L)
A                    137461285853
B                    172185566943
C                    271179590646

The aggregation works just fine but I dislike the new column name SUM(money#2L). Is there a way to rename this column into something human readable from the .agg method? Maybe something more similar to what one would do in dplyr:

df %>% group_by(group) %>% summarise(sum_money = sum(money))
Bearskin answered 1/5, 2015 at 14:1 Comment(0)
B
203

Although I still prefer dplyr syntax, this code snippet will do:

import pyspark.sql.functions as sf

(df.groupBy("group")
   .agg(sf.sum('money').alias('money'))
   .show(100))

It gets verbose.

Bearskin answered 13/7, 2015 at 12:5 Comment(1)
For anyone else that has copy-pasted this alias part but don't see it taking affect, pay attention to your parentheses. alias('string') exists inside the agg, otherwise you're aliasing the entire DataFrame not only the column.Storer
I
90

withColumnRenamed should do the trick. Here is the link to the pyspark.sql API.

df.groupBy("group")\
  .agg({"money":"sum"})\
  .withColumnRenamed("SUM(money)", "money")
  .show(100)
Iguanodon answered 18/6, 2015 at 1:2 Comment(2)
The alias is a good pointer, but this is the correct answer - there are good reasons to use the dictionary within agg at times and it seems the only way to "alias" an aggregated column is to rename it.Reichel
thank you! prefer withColumnRenamed over alias. Why? because divide and conquer works better than overloaded brain.Eyecatching
N
8

I made a little helper function for this that might help some people out.

import re

from functools import partial

def rename_cols(agg_df, ignore_first_n=1):
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful. Pass an aggregated dataframe
    and the number of aggregation columns to ignore.
    """
    delimiters = "(", ")"
    split_pattern = '|'.join(map(re.escape, delimiters))
    splitter = partial(re.split, split_pattern)
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
    renamed = map(split_agg, agg_df.columns[ignore_first_n:])
    renamed = zip(agg_df.columns[ignore_first_n:], renamed)
    for old, new in renamed:
        agg_df = agg_df.withColumnRenamed(old, new)
    return agg_df

An example:

gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
 .groupby("id")
 .agg({"rank": "mean",
       "*": "count",
       "rate": "mean", 
       "price": "mean", 
       "clicks": "mean", 
       })
)

>>> gb.columns
['id',
 'avg(rate)',
 'count(1)',
 'avg(price)',
 'avg(rank)',
 'avg(clicks)']

>>> rename_cols(gb).columns
['id',
 'avg_rate',
 'count_1',
 'avg_price',
 'avg_rank',
 'avg_clicks']

Doing at least a bit to save people from typing so much.

Navy answered 6/10, 2016 at 17:56 Comment(4)
Very useful and timely. I was just about to ask the same question. It would be nice if you could specify a new column name within the agg dict (within Spark I mean).Selectman
@EvanZamir thanks! I might try and do a simple PR in spark for that.Navy
You can simply rename by df = df.toDF(*newColumnNames), whereby newColumnNames holds all column names of the DataFrame (df) :)Selangor
Hi, I made the following mod because with ignore_first_n=2 it was truncating the last letter of the columns: split_agg = lambda x: '_'.join(splitter(x))[:-1]Annitaanniversary
G
7

It's simple as:

 val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()

Use .as in agg to name the new row created.

Gules answered 10/12, 2018 at 6:5 Comment(1)
As of PySpark 2.4.0, the .as('new_name') should be .alias('new_name').Dividers
J
6

.alias and .withColumnRenamed both work if you're willing to hard-code your column names. If you need a programmatic solution, e.g. friendlier names for an aggregation of all remaining columns, this provides a good starting point:

grouping_column = 'group'
cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
(
    df
    .groupBy(grouping_column)
    .agg(
        *cols
    )
)
Janessa answered 3/2, 2022 at 18:17 Comment(1)
This piece of code is amazing! This is the way to do it. Should get more upvotes.Longanimity
F
4
df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
    start_index = column.find('(')
    end_index = column.find(')')
    if (start_index and end_index):
        df = df.withColumnRenamed(column, column[start_index+1:end_index])

The above code can strip out anything that is outside of the "()". For example, "sum(foo)" will be renamed as "foo".

Fink answered 12/10, 2018 at 15:30 Comment(1)
just watch out for columns without parentheses, they will be removed alltogether, such as the groupby var. Can add a if/continue check. I had a single variable that was my groupby var, so just checked for that.Bathurst
S
4
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']

df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
|  1| siva|    100|
|  2|siva2|    200|
|  3|siva3|    300|
|  4|siva4|    400|
|  5|siva5|    500|
+---+-----+-------+


**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+
Sharpnosed answered 10/7, 2019 at 9:10 Comment(0)
S
3

While the previously given answers are good, I think they're lacking a neat way to deal with dictionary-usage in the .agg()

If you want to use a dict, which actually might be also dynamically generated because you have hundreds of columns, you can use the following without dealing with dozens of code-lines:

# Your dictionary-version of using the .agg()-function
# Note: The provided logic could actually also be applied to a non-dictionary approach
df = df.groupBy("group")\
   .agg({
          "money":"sum"
        , "...":  "..."
    })

# Now do the renaming
newColumnNames = ["group", "money", "..."] # Provide the names for ALL columns of the new df
df = df.toDF(*newColumnNames)              # Do the renaming

Of course the newColumnNames-list can also be dynamically generated. E.g., if you only append columns from the aggregation to your df you can pre-store newColumnNames = df.columns and then just append the additional names.
Anyhow, be aware that the newColumnNames must contain all column names of the dataframe, not only those to be renamed (because .toDF() creates a new dataframe due to Sparks immutable RDDs)!

Selangor answered 5/8, 2020 at 9:28 Comment(0)
C
0

Another quick little one liner to add the the mix:

df.groupBy('group')
  .agg({'money':'sum',
        'moreMoney':'sum',
        'evenMoreMoney':'sum'
        })
    .select(*(col(i).alias(i.replace("(",'_').replace(')','')) for i in df.columns))

just change the alias function to whatever you'd like to name them. The above generates sum_money, sum_moreMoney, since I do like seeing the operator in the variable name.

Clachan answered 24/11, 2021 at 19:10 Comment(0)
E
0

[Special case]

If we want to rename the aggregated columns with the same name as the columns being summed over (i.e.: sum(column1) --> column1), we can do it like so:

import pyspark.sql.functions as F

groupby_keys = ["categorical_column_1", "categorical_column_2"]
numerical_columns = ["numerical_column_1", "numerical_column_2"]

aggregation_computations = [F.sum(col).alias(col) for col in numerical_columns]
df = df.groupby(groupby_keys).agg(*aggregation_computations)
df.show()

+----------------------+----------------------+--------------------+--------------------+
| categorical_column_1 | categorical_column_2 | numerical_column_1 | numerical_column_2 |
+----------------------+----------------------+--------------------+--------------------+
|     category_1_1     |     category_2_1     |           1        |          1.0       |
|     category_1_2     |     category_2_1     |           2        |          2.0       |
|     category_1_1     |     category_2_2     |           3        |          3.0       |
|     category_1_2     |     category_2_2     |           4        |          4.0       |
+----------------------+----------------------+--------------------+--------------------+
Evilminded answered 16/10, 2023 at 23:32 Comment(0)
C
0

This is late to the party, but I think it's a more general solution than hardcoding a specific function and safer than trying to rename after the fact.

import pyspark.sql.functions as sf

y = {"money":"sum"}

df.groupBy("group").agg(
    *[ getattr(sf, fun)(col).alias(col) for col, fun in y.items() ]
)
Chaldron answered 23/10, 2023 at 16:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.