Moving averages with MongoDB's aggregation framework?
Asked Answered
N

6

11

If you have 50 years of temperature weather data (daily) (for example) how would you calculate moving averages, using 3-month intervals, for that time period? Can you do that with one query or would you have to have multiple queries?

Example Data

01/01/2014 = 40 degrees
12/31/2013 = 38 degrees
12/30/2013 = 29 degrees
12/29/2013 = 31 degrees
12/28/2013 = 34 degrees
12/27/2013 = 36 degrees
12/26/2013 = 38 degrees
.....
Nevanevada answered 6/8, 2014 at 1:42 Comment(4)
What do you mean exactly? Do you want to have certain values overlapping? And if so which ones? By day? Or just a rolling average. The aggregation framework cannot really compare one document against another, so this is sounding more like mapReduce.Edentate
@neil-lunn I wanted to calculate a rolling average.. so for 3-month intervals I'd want to take one day and average that day with the last 3 months data, and then do that average for every day for 50 years the next 50 years. So I think certain values would overlap for averages. How would you do this with mapReduce instead it you can't do it with the aggregation framework. I think you are right, I'd have to compare separate documents. Thanks!Nevanevada
@neil-lunn, also it looks like aggregation framework cannot do this you are right... jira.mongodb.org/browse/SERVER-4437 .. let me know if you have any idea how to do it with mongodb's mapreduceNevanevada
Also I want to create this moving or rolling average array of data - more info on moving average here en.wikipedia.org/wiki/Moving_averageNevanevada
I
9

The agg framework now has $map and $reduce and $range built in so array processing is much more straightfoward. Below is an example of calculating moving average on a set of data where you wish to filter by some predicate. The basic setup is each doc contains filterable criteria and a value, e.g.

{sym: "A", d: ISODate("2018-01-01"), val: 10}
{sym: "A", d: ISODate("2018-01-02"), val: 30}

Here it is:

// This controls the number of observations in the moving average:
days = 4;

c=db.foo.aggregate([

// Filter down to what you want.  This can be anything or nothing at all.
{$match: {"sym": "S1"}}

// Ensure dates are going earliest to latest:
,{$sort: {d:1}}

// Turn docs into a single doc with a big vector of observations, e.g.
//     {sym: "A", d: d1, val: 10}
//     {sym: "A", d: d2, val: 11}
//     {sym: "A", d: d3, val: 13}
// becomes
//     {_id: "A", prx: [ {v:10,d:d1}, {v:11,d:d2},  {v:13,d:d3} ] }
//
// This will set us up to take advantage of array processing functions!
,{$group: {_id: "$sym", prx: {$push: {v:"$val",d:"$date"}} }}

// Nice additional info.  Note use of dot notation on array to get
// just scalar date at elem 0, not the object {v:val,d:date}:
,{$addFields: {numDays: days, startDate: {$arrayElemAt: [ "$prx.d", 0 ]}} }

// The Juice!  Assume we have a variable "days" which is the desired number
// of days of moving average.
// The complex expression below does this in python pseudocode:
//
// for z in range(0, size of value vector - # of days in moving avg):
//    seg = vector[n:n+days]
//    values = seg.v
//    dates = seg.d
//    for v in seg:
//        tot += v
//    avg = tot/len(seg)
// 
// Note that it is possible to overrun the segment at the end of the "walk"
// along the vector, i.e. not enough date-values.  So we only run the
// vector to (len(vector) - (days-1).
// Also, for extra info, we also add the number of days *actually* used in the
// calculation AND the as-of date which is the tail date of the segment!
//
// Again we take advantage of dot notation to turn the vector of
// object {v:val, d:date} into two vectors of simple scalars [v1,v2,...]
// and [d1,d2,...] with $prx.v and $prx.d
//
,{$addFields: {"prx": {$map: {
    input: {$range:[0,{$subtract:[{$size:"$prx"}, (days-1)]}]} ,
    as: "z",
    in: {
       avg: {$avg: {$slice: [ "$prx.v", "$$z", days ] } },
       d: {$arrayElemAt: [ "$prx.d", {$add: ["$$z", (days-1)] } ]}
        }
        }}
    }}

            ]);

This might produce the following output:

{
    "_id" : "S1",
    "prx" : [
        {
            "avg" : 11.738793632512115,
            "d" : ISODate("2018-09-05T16:10:30.259Z")
        },
        {
            "avg" : 12.420766702631376,
            "d" : ISODate("2018-09-06T16:10:30.259Z")
        },
        ...

    ],
    "numDays" : 4,
    "startDate" : ISODate("2018-09-02T16:10:30.259Z")
}
Iconoscope answered 2/9, 2018 at 16:15 Comment(2)
will this solution work on large data set(say 10 millions documents)?Overplus
@JayeshSingh Depends on the $group. If you are thinking creating an array of 20m val:date pairs will blow past the 16m doc limit, you're right.Iconoscope
S
5

The way I would tend to do this in MongoDB is maintain a running sum of the past 90 days in the document for each day's value, e.g.

{"day": 1, "tempMax": 40, "tempMaxSum90": 2232}
{"day": 2, "tempMax": 38, "tempMaxSum90": 2230}
{"day": 3, "tempMax": 36, "tempMaxSum90": 2231}
{"day": 4, "tempMax": 37, "tempMaxSum90": 2233}

Whenever a new data point needs to be added to the collection, instead of reading and summing 90 values you can efficiently calculate the next sum with two simple queries, one addition and one subtraction like this (psuedo-code):

tempMaxSum90(day) = tempMaxSum90(day-1) + tempMax(day) - tempMax(day-90)

The 90-day moving average for at each day is then just the 90-day sum divided by 90.

If you wanted to also offer moving averages over different time-scales, (e.g. 1 week, 30 day, 90 day, 1 year) you could simply maintain an array of sums with each document instead of a single sum, one sum for each time-scale required.

This approach costs additional storage space and additional processing to insert new data, however is appropriate in most time-series charting scenarios where new data is collected relatively slowly and fast retrieval is desirable.

Subfusc answered 4/9, 2015 at 5:54 Comment(0)
C
3

Starting in Mongo 5, it's a perfect use case for the new $setWindowFields aggregation operator:

Note that I'm consider the rolling average to have a 3-days window for simplicity (today and the 2 previous days):

// { date: ISODate("2013-12-26"), temp: 38 }
// { date: ISODate("2013-12-27"), temp: 36 }
// { date: ISODate("2013-12-28"), temp: 34 }
// { date: ISODate("2013-12-29"), temp: 31 }
// { date: ISODate("2013-12-30"), temp: 29 }
// { date: ISODate("2013-12-31"), temp: 38 }
// { date: ISODate("2014-01-01"), temp: 40 }
db.collection.aggregate([
  { $setWindowFields: {
    sortBy: { date: 1 },
    output: {
      movingAverage: {
        $avg: "$temp",
        window: { range: [-2, "current"], unit: "day" }
      }
    }
  }}
])
// { date: ISODate("2013-12-26"), temp: 38, movingAverage: 38 }
// { date: ISODate("2013-12-27"), temp: 36, movingAverage: 37 }
// { date: ISODate("2013-12-28"), temp: 34, movingAverage: 36 }
// { date: ISODate("2013-12-29"), temp: 31, movingAverage: 33.67 }
// { date: ISODate("2013-12-30"), temp: 29, movingAverage: 31.33 }
// { date: ISODate("2013-12-31"), temp: 38, movingAverage: 32.67 }
// { date: ISODate("2014-01-01"), temp: 40, movingAverage: 35.67 }

This:

  • sorts chronologically sorts documents: sortBy: { date: 1 }
  • creates for each document a span of documents (the window) that:
    • includes the "current" document and all previous documents within a "2"-"day" window
  • and within that window, averages temperatures: $avg: "$temp"
Colenecoleopteran answered 27/11, 2021 at 14:56 Comment(1)
This is the best approach in modern MongoDB versionsTreacherous
M
2

The accepted answer helped me, but it took a while for me to understand how it worked and so I thought i'd explain my method to help others out. Particularly in your context I think my answer will help

This works on smaller datasets ideally

First group the data by day, then append all days in an array to each day:

{
  "$sort": {
    "Date": -1
  }
},
{
  "$group": {
    "_id": {
      "Day": "$Date",
      "Temperature": "$Temperature"
    },
    "Previous Values": {
      "$push": {
        "Date": "$Date",
        "Temperature": "$Temperature"
      }
    }
  }

This will leave you with a record that looks like this (it'll be ordered correctly):

{"_id.Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": [
    {"Day": "2017-03-01", "Temperature": 20},
    {"Day": "2017-02-11", "Temperature": 22},
    {"Day": "2017-01-18", "Temperature": 03},
    ...
    ]},

Now that each day has all days appended to it, we need to remove the items from the Previous Values array that are more recent than the this _id.Day field, as the moving average is backward looking:

{
  "$project": {
    "_id": 0,
    "Date": "$_id.Date",
    "Temperature": "$_id.Temperature",
    "Previous Values": 1
  }
},
{
  "$project": {
    "_id": 0,
    "Date": 1,
    "Temperature": 1,
    "Previous Values": {
      "$filter": {
        "input": "$Previous Values",
        "as": "pv",
        "cond": {
          "$lte": ["$$pv.Date", "$Date"]
        }
      }
    }
  }
},

Each item in the Previous Values array will only contain the dates that are less than or equal to the date for each record:

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": [
    {"Day": "2017-01-31", "Temperature": 33},
    {"Day": "2017-01-30", "Temperature": 36},
    {"Day": "2017-01-29", "Temperature": 33},
    {"Day": "2017-01-28", "Temperature": 32},
    ...
    ]}

Now we can pick our average window size, since the data is by day, for week we'd take the first 7 records of the array; for monthly, 30; or 3-monthly, 90 days:

{
  "$project": {
    "_id": 0,
    "Date": 1,
    "Temperature": 1,
    "Previous Values": {
      "$slice": ["$Previous Values", 0, 90]
    }
  }
},

To average the previous temperatures we unwind the Previous Values array then group by the date field. The unwind operation does this:

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": {
        "Day": "2017-01-31", 
        "Temperature": 33}
},

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": {
        "Day": "2017-01-30", 
        "Temperature": 36}
},

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": {
        "Day": "2017-01-29", 
        "Temperature": 33}
},
...

See that the Day field is the same, but we now have a document for each of the previous dates from the Previous Values array. Now we can group back on day, then average Previous Values.Temperature to get the moving average:

{"$group": {
    "_id": {
      "Day": "$Date",
      "Temperature": "$Temperature"
    },
    "3 Month Moving Average": {
      "$avg": "$Previous Values.Temperature"
    }
  }
}

That's it! I know that joining every record to every record isn't ideal, but this works fine on smaller datasets

Michail answered 11/5, 2019 at 13:30 Comment(0)
N
0

I think I may have an answer for my own question. Map Reduce would do it. First use emit to map each document to it's neighbors that it should be averaged with, then use reduce to avg each array... and that new array of averages should be the moving averages plot overtime since it's id would be the new date interval that you care about

I guess I needed to understand map-reduce better ...

:)

For instance... if we wanted to do it in memory (later we can create collections)

GIST https://gist.github.com/mrgcohen/3f67c597a397132c46f7

Does that look right?

Nevanevada answered 8/8, 2014 at 15:32 Comment(2)
ok, made some tweaks, but i'm pretty sure the overall idea does it.. you may have to tweak the interval to what you want but it should work.Nevanevada
problem.. how fast will this run over a huge dataset, sounds like mongo is slow unless you start sharding... what's the best practice here? helpNevanevada
T
-1

I don't believe the aggregation framework can do this for multiple dates in the current version (2.6), or, at least, can't do this without some serious gymnastics. The reason is that the aggregation pipeline processes one document at a time and one document only, so it would be necessary to somehow create a document for each day that contains the previous 3 months worth of relevant information. This would be as a $group stage that would calculate the average, meaning that the prior stage would have produced about 90 copies of each day's record with some distinguishing key that can be used for the $group.

So I don't see a way to do this for more than one date at a time in a single aggregation. I'd be happy to be wrong and have to edit/remove this answer if somebody finds a way to do it, even if it's so complicated it's not practical. A PostgreSQL PARTITION type function would do the job here; maybe that function will be added someday.

Thanhthank answered 7/8, 2014 at 17:49 Comment(4)
so then you'd have to either query for pieces of this and calculate the moving average in some language (ruby,python,node) or run an aggregate query for each interval... is that really the best solution? Doesn't it just feel wrong? is there a better way to do this with map-reduce that I can't think of?Nevanevada
I haven't thought about map-reduce. Generally, I try to avoid using map-reduce as it has a significant performance penalty and is not as secure since you are running custom code server-side. I'll try to give some thought to it, or perhaps someone else will cook up an m/r solution.Thanhthank
You definitely should be able to do a M/R but I don't have the time to work it out at the moment. I will try to get to it and update my answer.Thanhthank
No longer an issue since v3.4 (Dec 2016); see $map/$range example above.Iconoscope

© 2022 - 2024 — McMap. All rights reserved.