Compute first order derivative with MongoDB aggregation framework
Asked Answered
H

2

13

Is it possible to calculate a first order derivative using the aggregate framework?

For example, I have the data :

{time_series : [10,20,40,70,110]}

I'm trying to obtain an output like:

{derivative : [10,20,30,40]}
Hectocotylus answered 15/8, 2016 at 15:2 Comment(5)
Is there are reason you're wanting to do this within the aggregation framework vs. using a robust python library implementation?Tacit
@Tacit - Can you give me an example of a python library implementation? My current workaround is to fetch all the fields using pymongo, and doing the derivative in python. It turns out to be quite slow (limited by network bandwidth?) which made me look around for alternatives.Hectocotylus
@Tacit I think the aggregation framework is the best option here. Even faster than numpy.diff. I added benchmark test result to my answerPiefer
@Styvane Don't get me wrong, I was the first person to upvote both answers here because they're great, but the "best" option is about more than performance. A well tested library call is simpler/easier to understand/cleaner than a complex aggregate pipeline.Tacit
@Tacit I can't agree more. Not everything in programming is about performance. It is just a shame that MongoDB doesn't provide an operator for this. Btw I lost track of the curly brace many times when I was writing.Piefer
P
8
db.collection.aggregate(
    [
      {
        "$addFields": {
          "indexes": {
            "$range": [
              0,
              {
                "$size": "$time_series"
              }
            ]
          },
          "reversedSeries": {
            "$reverseArray": "$time_series"
          }
        }
      },
      {
        "$project": {
          "derivatives": {
            "$reverseArray": {
              "$slice": [
                {
                  "$map": {
                    "input": {
                      "$zip": {
                        "inputs": [
                          "$reversedSeries",
                          "$indexes"
                        ]
                      }
                    },
                    "in": {
                      "$subtract": [
                        {
                          "$arrayElemAt": [
                            "$$this",
                            0
                          ]
                        },
                        {
                          "$arrayElemAt": [
                            "$reversedSeries",
                            {
                              "$add": [
                                {
                                  "$arrayElemAt": [
                                    "$$this",
                                    1
                                  ]
                                },
                                1
                              ]
                            }
                          ]
                        }
                      ]
                    }
                  }
                },
                {
                  "$subtract": [
                    {
                      "$size": "$time_series"
                    },
                    1
                  ]
                }
              ]
            }
          },
          "time_series": 1
        }
      }
    ]
)

We can use the pipeline above in version 3.4+ to do this. In the pipeline, we use the $addFields pipeline stage. operator to add the array of the "time_series"'s elements index to do document, we also reversed the time series array and add it to the document using respectively the $range and $reverseArray operators

We reversed the array here because the element at position p in the array is always greater than the element at position p+1 which means that [p] - [p+1] < 0 and we do not want to use the $multiply here.(see pipeline for version 3.2)

Next we $zipped the time series data with the indexes array and applied a substract expression to the resulted array using the $map operator.

We then $slice the result to discard the null/None value from the array and re-reversed the result.


In 3.2 we can use the $unwind operator to unwind our array and include the index of each element in the array by specifying a document as operand instead of the traditional "path" prefixed by $.

Next in the pipeline, we need to $group our documents and use the $push accumulator operator to return an array of sub-documents that look like this:

{
    "_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
    "time_series" : [
        { "value" : 10, "index" : NumberLong(0) },
        { "value" : 20, "index" : NumberLong(1) },
        { "value" : 40, "index" : NumberLong(2) },
        { "value" : 70, "index" : NumberLong(3) },
        { "value" : 110, "index" : NumberLong(4) }
    ]
}

Finally comes the $project stage. In this stage, we need to use the $map operator to apply a series of expression to each element in the the newly computed array in the $group stage.

Here is what is going on inside the $map (see $map as a for loop) in expression:

For each subdocument, we assign the value field to a variable using the $let variable operator. We then subtract it value from the value of the "value" field of the next element in the array.

Since the next element in the array is the element at the current index plus one, all we need is the help of the $arrayElemAt operator and a simple $addition of the current element's index and 1.

The $subtract expression return a negative value so we need to multiply the value by -1 using the $multiply operator.

We also need to $filter the resulted array because it the last element is None or null. The reason is that when the current element is the last element, $subtract return None because the index of the next element equal the size of the array.

db.collection.aggregate([
  {
    "$unwind": {
      "path": "$time_series",
      "includeArrayIndex": "index"
    }
  },
  {
    "$group": {
      "_id": "$_id",
      "time_series": {
        "$push": {
          "value": "$time_series",
          "index": "$index"
        }
      }
    }
  },
  {
    "$project": {
      "time_series": {
        "$filter": {
          "input": {
            "$map": {
              "input": "$time_series",
              "as": "el",
              "in": {
                "$multiply": [
                  {
                    "$subtract": [
                      "$$el.value",
                      {
                        "$let": {
                          "vars": {
                            "nextElement": {
                              "$arrayElemAt": [
                                "$time_series",
                                {
                                  "$add": [
                                    "$$el.index",
                                    1
                                  ]
                                }
                              ]
                            }
                          },
                          "in": "$$nextElement.value"
                        }
                      }
                    ]
                  },
                  -1
                ]
              }
            }
          },
          "as": "item",
          "cond": {
            "$gte": [
              "$$item",
              0
            ]
          }
        }
      }
    }
  }
])

Another option which I think is less efficient is perform a map/reduce operation on our collection using the map_reduce method.

>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
...               function() {
...                 var derivatives = [];
...                 for (var index=1; index<this.time_series.length; index++) {
...                   derivatives.push(this.time_series[index] - this.time_series[index-1]);
...                 }
...                 emit(this._id, derivatives);
...               }
...               """)
>>> reducer = Code("""
...                function(key, value) {}
...                """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
...     print(res)  # or do something with the document.
... 
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}

You can also retrieve all the document and use the numpy.diff to return the derivative like this:

import numpy as np


for document in collection.find({}, {'time_series': 1}):
    result = np.diff(document['time_series']) 
Piefer answered 27/8, 2016 at 21:14 Comment(0)
C
4

it's a bit dirty, but perhaps something like this?

use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})

var mapF = function() {
    emit(this.id, this.time_series);
    emit(this.id, this.time_series);
};

var reduceF = function(key, values){
    var n = values[0].length;
    var ret = [];
    for(var i = 0; i < n-1; i++){
        ret.push( values[0][i+1] - values[0][i] );
    }
    return {'gradient': ret};
};

var finalizeF = function(key, val){
    return val.gradient;
}

db['data'].mapReduce(
    mapF,
    reduceF,
    { out: 'data_d1', finalize: finalizeF }
)

db['data_d1'].find({})

The "strategy" here is to emit the data to be operated on twice so that it is accessible in the reduce stage, return an object to avoid the message "reduce -> multiple not supported yet" and then filter back the array in the finalizer.

This script then produces:

MongoDB shell version: 3.2.9
connecting to: test
switched to db test_db
WriteResult({ "nRemoved" : 1 })
WriteResult({ "nInserted" : 1 })
{
    "result" : "data_d1",
        "timeMillis" : 13,
        "counts" : {
            "input" : 1,
            "emit" : 2,     
            "reduce" : 1,           
            "output" : 1                    
        },                                      
        "ok" : 1                                    
}                                                   
{ "_id" : 1, "value" : [ 10, 20, 30, 40 ] }         
bye

Alternatively, one could move all the processing into the finalizer (reduceF is not called here since mapF is assumed to emit unique keys):

use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})

var mapF = function() {
    emit(this.id, this.time_series);
};

var reduceF = function(key, values){
};

var finalizeF = function(key, val){
    var x = val;
    var n = x.length;

    var ret = [];
    for(var i = 0; i < n-1; i++){
        ret.push( x[i+1] - x[i] );
    }
    return ret;
}

db['data'].mapReduce(
    mapF,
    reduceF,
    { out: 'data_d1', finalize: finalizeF }
)

db['data_d1'].find({})
Cruikshank answered 25/8, 2016 at 20:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.