Explain the aggregate functionality in Spark (with Python and Scala)
Asked Answered
R

9

57

I am looking for some better explanation of the aggregate functionality that is available via spark in python.

The example I have is as follows (using pyspark from Spark 1.2.0 version)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Output:

(10, 4)

I get the expected result (10,4) which is sum of 1+2+3+4 and 4 elements. If I change the initial value passed to the aggregate function to (1,0) from (0,0) I get the following result

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Output:

(19, 4)

The value increases by 9. If I change it to (2,0), the value goes to (28,4) and so on.

Can someone explain to me how this value is calculated? I expected the value to go up by 1 not by 9, expected to see (11,4) instead I am seeing (19,4).

Returnee answered 30/1, 2015 at 16:49 Comment(0)
E
107

I wasn't fully convinced from the accepted answer, and JohnKnight's answer helped, so here's my point of view:

First, let's explain aggregate() in my own words:

Prototype:

aggregate(zeroValue, seqOp, combOp)

Description:

aggregate() lets you take an RDD and generate a single value that is of a different type than what was stored in the original RDD.

Parameters:

  1. zeroValue: The initialization value, for your result, in the desired format.
  2. seqOp: The operation you want to apply to RDD records. Runs once for every record in a partition.
  3. combOp: Defines how the resulted objects (one for every partition), gets combined.

Example:

Compute the sum of a list and the length of that list. Return the result in a pair of (sum, length).

In a Spark shell, I first created a list with 4 elements, with 2 partitions:

listRDD = sc.parallelize([1,2,3,4], 2)

then I defined my seqOp:

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

and my combOp:

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

and then I aggregated:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

As you can see, I gave descriptive names to my variables, but let me explain it further:

The first partition has the sublist [1, 2]. We will apply the seqOp to each element of that list and this will produce a local result, a pair of (sum, length), that will reflect the result locally, only in that first partition.

So, let's start: local_result gets initialized to the zeroValue parameter we provided the aggregate() with, i.e. (0, 0) and list_element is the first element of the list, i.e. 1. As a result this is what happens:

0 + 1 = 1
0 + 1 = 1

Now, the local result is (1, 1), that means, that so far, for the 1st partition, after processing only the first element, the sum is 1 and the length 1. Notice, that local_result gets updated from (0, 0), to (1, 1).

1 + 2 = 3
1 + 1 = 2

and now the local result is (3, 2), which will be the final result from the 1st partition, since they are no other elements in the sublist of the 1st partition.

Doing the same for 2nd partition, we get (7, 2).

Now we apply the combOp to each local result, so that we can form, the final, global result, like this: (3,2) + (7,2) = (10, 4)


Example described in 'figure':

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

Inspired by this great example.


So now if the zeroValue is not (0, 0), but (1, 0), one would expect to get (8 + 4, 2 + 2) = (12, 4), which doesn't explain what you experience. Even if we alter the number of partitions of my example, I won't be able to get that again.

The key here is JohnKnight's answer, which state that the zeroValue is not only analogous to the number of partitions, but may be applied more times than you expect.

Exclude answered 15/8, 2016 at 4:48 Comment(2)
Really glad it helped @Neethu!Exclude
@Returnee this really should be the accepted answer. Especially because the most-upvoted answer in this Q is in Scala(??)!Thecla
C
32

Explanation using Scala

Aggregate lets you transform and combine the values of the RDD at will.

It uses two functions:

The first one transforms and adds the elements of the original collection [T] in a local aggregate [U] and takes the form: (U,T) => U. You can see it as a fold and therefore it also requires a zero for that operation. This operation is applied locally to each partition in parallel.

Here is where the key of the question lies: The only value that should be used here is the ZERO value for the reduction operation. This operation is executed locally on each partition, therefore, adding anything to that zero value will add to the result multiplied by the number of partitions of the RDD.

The second operation takes 2 values of the result type of the previous operation [U] and combines it in to one value. This operation will reduce the partial results of each partition and produce the actual total.

For example: Given an RDD of Strings:

val rdd:RDD[String] = ???

Let's say you want to the aggregate of the length of the strings in that RDD, so you would do:

  1. The first operation will transform strings into size (int) and accumulate the values for size.

    val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`

  2. provide the ZERO for the addition operation (0)

    val ZERO = 0

  3. an operation to add two integers together:

    val add: (Int, Int) => Int = _ + _

Putting it all together:

rdd.aggregate(ZERO, stringSizeCummulator, add)

with Spark 2.4 and higher version

rdd.aggregate(ZERO)(stringAccumulator,add)

So, why is the ZERO needed? When the cummulator function is applied to the first element of a partition, there's no running total. ZERO is used here.

Eg. My RDD is:

  • Partition 1: ["Jump", "over"]
  • Partition 2: ["the", "wall"]

This will result:

P1:

  1. stringSizeCummulator(ZERO, "Jump") = 4
  2. stringSizeCummulator(4, "over") = 8

P2:

  1. stringSizeCummulator(ZERO, "the") = 3
  2. stringSizeCummulator(3, "wall") = 7

Reduce: add(P1, P2) = 15

Copepod answered 30/1, 2015 at 18:2 Comment(3)
You are right. When I started playing with the spark.default.parallelism setting by specifying different values, the values returned for each run changed when I passed (1,0) as the initial value for the aggregate function. It makes lot more sense with your explanation. Thanks.Returnee
Question about Python, anwser using scala? Does this kind of things exists in pyspark?Snuffer
@Snuffer hope that was the only problem here! This answer doesn't explain why the OP gets this behavior. It seems attractive, I have upvoted too, but I don't think it answers the question... :/Exclude
U
17

I don't have enough reputation points to comment on the previous answer by Maasg. Actually the zero value should be 'neutral' towards the seqop, meaning it wouldn't interfere with the seqop result, like 0 towards add, or 1 towards *;

You should NEVER try with non-neutral values as it might be applied arbitrary times. This behavior is not only tied to num of partitions.

I tried the same experiment as stated in the question. with 1 partition, the zero value was applied 3 times. with 2 partitions, 6 times. with 3 partitions, 9 times and this will go on.

Urethritis answered 13/7, 2015 at 17:13 Comment(0)
M
2

You can use the following code (in scala) to see precisely what aggregate is doing. It builds a tree of all the addition and merge operations:

sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]

val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)

And then, in the shell:

scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))

So, we have these 3 partitions: [4], [1,2], and [3].

scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))

You can represent the result as a tree:

+
| \__________________
+                    +
| \________          | \
+          +         +   2
| \        | \       | \         
0  +       0  3      0  1
   | \
   0  4

You can see that a first zero element is created on the driver node (at the left of the tree), and then, the results for all the partitions are merged one by one. You also see that if you replace 0 by 1 as you did in your question, it will add 1 to each result on each partition, and also add 1 to the initial value on the driver. So, the total number of time the zero value you give is used is:

number of partitions + 1.

So, in your case, the result of

aggregate(
  (X, Y),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

will be:

(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)

The implementation of aggregate is quite simple. It is defined in RDD.scala, line 1107:

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
}
Minuteman answered 9/6, 2017 at 19:2 Comment(0)
G
1

Great explanations, it really helped me to understand the underneath working of the aggregate function. I have played with it for some time and found out as below.

  • if you are using the acc as (0,0) then it will not change the result of the out put of the function.

  • if the initial accumulator is changed then it will process the result something as below

[ sum of RDD elements + acc initial value * No. of RDD partitions + acc initial value ]

for the question here, I would suggest to check the partitions as the number of partitions should be 8 as per my understanding as every time we process the seq op on a partition of RDD it will start with the initial sum of acc result and also when it is going to do the comb Op it will again use the acc initial value once.

for e.g. List (1,2,3,4) & acc (1,0)

Get partitions in scala by RDD.partitions.size

if Partitions are 2 & number of elements is 4 then => [ 10 + 1 * 2 + 1 ] => (13,4)

if Partition are 4 & number of elements is 4 then => [ 10 + 1 * 4 + 1 ] => (15,4)

Hope this helps, you can check here for explanation. Thanks.

Glowworm answered 23/11, 2016 at 18:11 Comment(0)
E
1

Thanks to gsamaras.

My viewgraph is as below, enter image description here

Err answered 11/4, 2019 at 18:4 Comment(0)
C
0

For people looking for Scala Equivalent code for the above example - here it is. Same logic, same input/result.

scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21

scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)

scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)
Clubfoot answered 25/10, 2016 at 3:39 Comment(0)
T
0

I try many experiments about this question. It is better to set num of partition for aggregate. the seqOp will process each partion and apply the initial value, what' more, combOp will also apply the initial value when combines all partitions. So, I present the format for this question:

final result = sum(list) + num_Of_Partitions * initial_Value + 1
Topaz answered 21/2, 2017 at 7:21 Comment(1)
This formula can obviously not be true, as when the initial value is 0, the result should be the sum of the list.Minuteman
S
0

I will explain the concept of Aggregate operation in Spark as follows:

Definition of the aggregate function

**def aggregate** (initial value)(an intra-partition sequence operation)(an inter-partition combination operation)

val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4) --> 4 represents the number of partitions available in our Spark cluster.

Hence, the rdd is distributed into 4 partitions as:

11, 12, 13
24, 25, 26
35, 36, 37
24, 25, 16

we divide the problem statement into two parts: The first part of the problem is to aggregate the total number of flowers picked in each quadrant; that's the intra-partition sequence aggregation

11+12+13 = 36
24+25+26 = 75
35+36+37 = 108
24+25 +16 = 65

The second part of the problem is to sum these individual aggregates across the partitions; that's the inter-partition aggregation.

36 + 75 + 108 + 65 = 284

The sum, stored in an RDD can further be used and processed for any kind of transformation or other action

So the code becomes like:

val sum = flowers.aggregate(0)((acc, value) => (acc + value), (x,y) => (x+y)) or val sum = flowers.aggregate(0)(_+_, _+_)
Answer: 284

Explanation: (0) - is the accumulator The first + is the intra-partition sum, adding the total number of flowers picked by each picker in each quadrant of the garden. The second + is the inter-partition sum, which aggregates the total sums from each quadrant.

Case 1:

Suppose, if we need to reduce functions after the initial value. What would happen if initial value weren't zero??. If it were 4, for example:

The number would added to each intra-partition aggregate, and also to the inter-partition aggregate:

So the first calculation would be:

11+12+13 = 36 + 5 = 41
24+25+26 = 75 + 5 = 80
35+36+37 = 108 + 5 = 113
24+25 +16 = 65 + 5 = 70

Here's the inter-partition aggregation calculation with the initial value of 5:

partition1 + partition2 + partition3+ partition4 + 5 = 41 + 80 + 113 + 70 = 309

So, coming to your query: The sum can calculated based on the number of partitions the rdd data is distributed. i thought that your data is distributed as below and that's why you have the result as (19, 4). So, when doing aggregate operation be specific with number of partition value:

val list = sc.parallelize(List(1,2,3,4))
val list2 = list.glom().collect
val res12 = list.aggregate((1,0))(
      (acc, value) => (acc._1 + value, acc._2 + 1),
      (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

result:

list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at command-472682101230301:1
list2: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
res12: (Int, Int) = (19,4)

Explanation: As your data is distributed in 8 partitions, the result is like (by using the above explained logic)

intra-partition addition:

0+1=1
1+1=2
0+1=1
2+1=3
0+1=1
3+1=4
0+1=1
4+1=5

total=18

inter-partition calculation:

18+1 (1+2+1+3+1+4+1+5+1) = 19

Thank you

Socinus answered 30/7, 2019 at 11:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.