I will explain the concept of Aggregate operation in Spark as follows:
Definition of the aggregate function
**def aggregate** (initial value)(an intra-partition sequence operation)(an inter-partition combination operation)
val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4)
--> 4 represents the number of partitions available in our Spark cluster.
Hence, the rdd is distributed into 4 partitions as:
11, 12, 13
24, 25, 26
35, 36, 37
24, 25, 16
we divide the problem statement into two parts:
The first part of the problem is to aggregate the total number of flowers picked in each quadrant; that's the intra-partition sequence aggregation
11+12+13 = 36
24+25+26 = 75
35+36+37 = 108
24+25 +16 = 65
The second part of the problem is to sum these individual aggregates across the partitions; that's the inter-partition aggregation.
36 + 75 + 108 + 65 = 284
The sum, stored in an RDD can further be used and processed for any kind of transformation or other action
So the code becomes like:
val sum = flowers.aggregate(0)((acc, value) => (acc + value), (x,y) => (x+y))
or
val sum = flowers.aggregate(0)(_+_, _+_)
Answer: 284
Explanation: (0) - is the accumulator
The first + is the intra-partition sum, adding the total number of flowers picked by each picker in each quadrant of the garden.
The second + is the inter-partition sum, which aggregates the total sums from each quadrant.
Case 1:
Suppose, if we need to reduce functions after the initial value. What would happen if initial value weren't zero??. If it were 4, for example:
The number would added to each intra-partition aggregate, and also to the inter-partition aggregate:
So the first calculation would be:
11+12+13 = 36 + 5 = 41
24+25+26 = 75 + 5 = 80
35+36+37 = 108 + 5 = 113
24+25 +16 = 65 + 5 = 70
Here's the inter-partition aggregation calculation with the initial value of 5:
partition1 + partition2 + partition3+ partition4 + 5 = 41 + 80 + 113 + 70 = 309
So, coming to your query: The sum can calculated based on the number of partitions the rdd data is distributed. i thought that your data is distributed as below and that's why you have the result as (19, 4). So, when doing aggregate operation be specific with number of partition value:
val list = sc.parallelize(List(1,2,3,4))
val list2 = list.glom().collect
val res12 = list.aggregate((1,0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
result:
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at command-472682101230301:1
list2: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
res12: (Int, Int) = (19,4)
Explanation: As your data is distributed in 8 partitions, the result is like (by using the above explained logic)
intra-partition addition:
0+1=1
1+1=2
0+1=1
2+1=3
0+1=1
3+1=4
0+1=1
4+1=5
total=18
inter-partition calculation:
18+1 (1+2+1+3+1+4+1+5+1) = 19
Thank you