PySpark aggregation function for "any value"
Asked Answered
D

2

8

I have a PySpark Dataframe with an A field, few B fields that dependent on A (A->B) and C fields that I want to aggregate per each A. For example:

A | B | C
----------
A | 1 | 6
A | 1 | 7
B | 2 | 8
B | 2 | 4

I wish to group by A , present any of B and run aggregation (let's say SUM) on C.

The expected result would be:

A | B | C
----------
A | 1 | 13
B | 2 | 12

SQL-wise I would do:

SELECT A, COALESCE(B) as B, SUM(C) as C
FROM T
GROUP BY A

What is the PySpark way to do that?

I can group by A and B together or select MIN(B) per each A, for example:

df.groupBy('A').agg(F.min('B').alias('B'),F.sum('C').alias('C'))

or

df.groupBy(['A','B']).agg(F.sum('C').alias('C'))

but that seems inefficient. Is there is anything similar to SQL coalesce in PySpark?

Thanks

Dispend answered 25/2, 2018 at 12:49 Comment(0)
M
14

You'll just need to use first instead :

from pyspark.sql.functions import first, sum, col
from pyspark.sql import Row

array = [Row(A="A", B=1, C=6),
         Row(A="A", B=1, C=7),
         Row(A="B", B=2, C=8),
         Row(A="B", B=2, C=4)]
df = sqlContext.createDataFrame(sc.parallelize(array))

results = df.groupBy(col("A")).agg(first(col("B")).alias("B"), sum(col("C")).alias("C"))

Let's now check the results :

results.show()
# +---+---+---+
# |  A|  B|  C|
# +---+---+---+
# |  B|  2| 12|
# |  A|  1| 13|
# +---+---+---+

From the comments:

Is first here is computationally equivalent to any ?

groupBy causes shuffle. Thus a non deterministic behaviour is to expect.

Which is confirmed in the documentation of first :

Aggregate function: returns the first value in a group. The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned. note:: The function is non-deterministic because its results depends on order of rows which may be non-deterministic after a shuffle.

So yes, computationally there are the same, and that's one of the reasons you need to use sorting if you need a deterministic behaviour.

I hope this helps !

Mutineer answered 25/2, 2018 at 15:36 Comment(1)
Is first here is computationally equivalent to any ? I'm using this function a lot when doing a window (over partition order by) but then it requires sortingDispend
A
1

New in Spark 3.5.0:

You can use the aggregation function any_value to accomplish this.

Reusing @eliasah's answer with this new function:

from pyspark.sql.functions import any_value, sum, col
from pyspark.sql import Row

array = [Row(A="A", B=1, C=6),
         Row(A="A", B=1, C=7),
         Row(A="B", B=2, C=8),
         Row(A="B", B=2, C=4)]
df = sqlContext.createDataFrame(sc.parallelize(array))

results = df.groupBy(col("A")).agg(any_value(col("B")).alias("B"), sum(col("C")).alias("C"))
Amidships answered 27/10, 2023 at 22:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.