Apache Spark Moving Average
Asked Answered
L

3

42

I have a huge file in HDFS having Time Series data points (Yahoo Stock prices).

I want to find the moving average of the Time Series how do I go about writing the Apache Spark job to do that .

Loverly answered 1/5, 2014 at 5:3 Comment(5)
Moving average is a tricky problem for Spark, and any distributed system. When the data is spread across multiple machines, there will be some time windows that cross partitions. I think the key is duplicating data points at the start and end of partitions. I will try to think of a way to do this in Spark.Dither
Why can't this be done by traversing the RDD? That returns the partitions in order.Daynadays
This is the same answer as @Arwind but written in Java: #31966115Cerys
@Cerys Well that was a year later !!Loverly
Better later than never, I guessCerys
J
31

You can use the sliding function from MLLIB which probably does the same thing as Daniel's answer. You will have to sort the data by time before using the sliding function.

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()
Jamille answered 4/3, 2015 at 19:26 Comment(4)
Awesome! It's not exactly like my answer. It fetches the first (window-1) elements from each partition and uses this small amount of data to fill in the gaps. (code)Dither
It Map Reduce, we need to have a Custom InputFormat which reads a few additional lined from the next split to read the complete window, just like, TextInputFormat reads some additonal from the next Split.Militarize
MapMethod could keep on maintaining a list of values to the size of widow. That is, till the size is not reached, keep on accumulating into the list. Once the size is reached, compute Average and do context.write(). In the Next Map() method call, add the new value to the list, delete the oldest value from the list and compute Average and do context.write(). SPARK, does not give the control of accumulating values till with in a Task, and managing its count etcMilitarize
.sliding(3).map(curSlice => (curSlice.sum / curSlice.size)) Seems Simple. Whats the data type of curSlice would be. If the values are not Numbers But Text and we need to find most frequent words in a window, can we have curSlice support all data types. ? @Jamille ?Militarize
D
24

Moving average is a tricky problem for Spark, and any distributed system. When the data is spread across multiple machines, there will be some time windows that cross partitions. We have to duplicate the data at the start of the partitions, so that calculating the moving average per partition gives complete coverage.

Here is a way to do this in Spark. The example data:

val ts = sc.parallelize(0 to 100, 10)
val window = 3

A simple partitioner that puts each row in the partition we specify by the key:

class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
  def numPartitions = p
  def getPartition(key: Any) = key.asInstanceOf[Int]
}

Create the data with the first window - 1 rows copied to the previous partition:

val partitioned = ts.mapPartitionsWithIndex((i, p) => {
  val overlap = p.take(window - 1).toArray
  val spill = overlap.iterator.map((i - 1, _))
  val keep = (overlap.iterator ++ p).map((i, _))
  if (i == 0) keep else keep ++ spill
}).partitionBy(new StraightPartitioner(ts.partitions.length)).values

Just calculate the moving average on each partition:

val movingAverage = partitioned.mapPartitions(p => {
  val sorted = p.toSeq.sorted
  val olds = sorted.iterator
  val news = sorted.iterator
  var sum = news.take(window - 1).sum
  (olds zip news).map({ case (o, n) => {
    sum += n
    val v = sum
    sum -= o
    v
  }})
})

Because of the duplicate segments this will have no gaps in coverage.

scala> movingAverage.collect.sameElements(3 to 297 by 3)
res0: Boolean = true
Dither answered 2/5, 2014 at 20:35 Comment(4)
The sorting in the last step may be unnecessary. It seems the data arrives sorted anyway. I don't know if there are guarantees for the repartitioning to behave this way.Dither
Why can't this be done by traversing the RDD? That returns the partitions in order... then you just need to duplicate the parts at the ends of the RDD. I wonder if updateStateByKey would help make things easier.Daynadays
It is an interesting approach but you're making a risky assumption that there are no empty / to short partitions. For example: val m = Map(1 -> (0 to 50).toIterator, 4 -> (51 to 100).toIterator).withDefault(i => Iterator()); val ts = sc.parallelize(Seq.empty[Int], 10).mapPartitionsWithIndex((i, _) => m(i))Echinate
I use something similar here and here with broadcast variables instead of partitioner and assign data based on counts.Echinate
P
20

Spark 1.4 introduced windowing functions, which means that you can do moving average as follows adjust windowing with rowsBetween:

val schema = Seq("id", "cykle", "value")
 val data = Seq(
        (1, 1, 1),
        (1, 2, 11),
        (1, 3, 1),
        (1, 4, 11),
        (1, 5, 1),
        (1, 6, 11),
        (2, 1, 1),
        (2, 2, 11),
        (2, 3, 1),
        (2, 4, 11),
        (2, 5, 1),
        (2, 6, 11)
      )

val dft = sc.parallelize(data).toDF(schema: _*)

dft.select('*).show

// PARTITION BY id  ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5)
val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2)

val x = dft.select($"id",$"cykle",avg($"value").over(w))
x.show

Output (in zeppelin):

schema: Seq[String] = List(id, cykle, value)
data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11))
dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int]
+---+-----+-----+
| id|cykle|value|
+---+-----+-----+
|  1|    1|    1|
|  1|    2|   11|
|  1|    3|    1|
|  1|    4|   11|
|  1|    5|    1|
|  1|    6|   11|
|  2|    1|    1|
|  2|    2|   11|
|  2|    3|    1|
|  2|    4|   11|
|  2|    5|    1|
|  2|    6|   11|
+---+-----+-----+
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@55cd666f
x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, 'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double]
+---+-----+-------------------------------------------------------------------------+
| id|cykle|'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING|
+---+-----+-------------------------------------------------------------------------+
|  1|    1|                                                        4.333333333333333|
|  1|    2|                                                                      6.0|
|  1|    3|                                                                      5.0|
|  1|    4|                                                                      7.0|
|  1|    5|                                                                      6.0|
|  1|    6|                                                        7.666666666666667|
|  2|    1|                                                        4.333333333333333|
|  2|    2|                                                                      6.0|
|  2|    3|                                                                      5.0|
|  2|    4|                                                                      7.0|
|  2|    5|                                                                      6.0|
|  2|    6|                                                        7.666666666666667|
+---+-----+————————————————————————————————————+
Pia answered 27/2, 2016 at 20:5 Comment(6)
Also check out this blog article: xinhstechblog.blogspot.de/2016/04/… It is a more practical explanation of how window functions work than the official announcement.Trubow
What happens if you don't have anything to partition by, that is, if you need to perform the moving average on all of the data? This is my case, as I have timeseries data and nothing to partition by. In this case all of the data would be moved to one node, which is a problem, right? How to overcome this issue?Carisa
@Marko what is the data? Have a look at the aproximate quarties and spark-ts databricks.com/blog/2016/05/19/… github.com/sryza/spark-timeseriesPia
Thanks for the answer even a year later :) Data represents multivariate time-series. That is, each column is a parameter measured during time. Not sure how approximate can help me with the moving average and I would avoid this library as it is third party and not developed any longer. Any other idea, perhaps? Does the problem that I'm afraid of really exist? Would I get all the data on one node if I have nothing to partition on?Carisa
I think the default partitioner is used #34491719Pia
what do you say @Echinate ?Pia

© 2022 - 2024 — McMap. All rights reserved.