Compute first order derivative with MongoDB aggreg

2020-08-10 08:18发布

问题:

Is it possible to calculate a first order derivative using the aggregate framework?

For example, I have the data :

{time_series : [10,20,40,70,110]}

I'm trying to obtain an output like:

{derivative : [10,20,30,40]}

回答1:

db.collection.aggregate(
    [
      {
        "$addFields": {
          "indexes": {
            "$range": [
              0,
              {
                "$size": "$time_series"
              }
            ]
          },
          "reversedSeries": {
            "$reverseArray": "$time_series"
          }
        }
      },
      {
        "$project": {
          "derivatives": {
            "$reverseArray": {
              "$slice": [
                {
                  "$map": {
                    "input": {
                      "$zip": {
                        "inputs": [
                          "$reversedSeries",
                          "$indexes"
                        ]
                      }
                    },
                    "in": {
                      "$subtract": [
                        {
                          "$arrayElemAt": [
                            "$$this",
                            0
                          ]
                        },
                        {
                          "$arrayElemAt": [
                            "$reversedSeries",
                            {
                              "$add": [
                                {
                                  "$arrayElemAt": [
                                    "$$this",
                                    1
                                  ]
                                },
                                1
                              ]
                            }
                          ]
                        }
                      ]
                    }
                  }
                },
                {
                  "$subtract": [
                    {
                      "$size": "$time_series"
                    },
                    1
                  ]
                }
              ]
            }
          },
          "time_series": 1
        }
      }
    ]
)

We can use the pipeline above in version 3.4+ to do this. In the pipeline, we use the $addFields pipeline stage. operator to add the array of the "time_series"'s elements index to do document, we also reversed the time series array and add it to the document using respectively the $range and $reverseArray operators

We reversed the array here because the element at position p in the array is always greater than the element at position p+1 which means that [p] - [p+1] < 0 and we do not want to use the $multiply here.(see pipeline for version 3.2)

Next we $zipped the time series data with the indexes array and applied a substract expression to the resulted array using the $map operator.

We then $slice the result to discard the null/None value from the array and re-reversed the result.


In 3.2 we can use the $unwind operator to unwind our array and include the index of each element in the array by specifying a document as operand instead of the traditional "path" prefixed by $.

Next in the pipeline, we need to $group our documents and use the $push accumulator operator to return an array of sub-documents that look like this:

{
    "_id" : ObjectId("57c11ddbe860bd0b5df6bc64"),
    "time_series" : [
        { "value" : 10, "index" : NumberLong(0) },
        { "value" : 20, "index" : NumberLong(1) },
        { "value" : 40, "index" : NumberLong(2) },
        { "value" : 70, "index" : NumberLong(3) },
        { "value" : 110, "index" : NumberLong(4) }
    ]
}

Finally comes the $project stage. In this stage, we need to use the $map operator to apply a series of expression to each element in the the newly computed array in the $group stage.

Here is what is going on inside the $map (see $map as a for loop) in expression:

For each subdocument, we assign the value field to a variable using the $let variable operator. We then subtract it value from the value of the "value" field of the next element in the array.

Since the next element in the array is the element at the current index plus one, all we need is the help of the $arrayElemAt operator and a simple $addition of the current element's index and 1.

The $subtract expression return a negative value so we need to multiply the value by -1 using the $multiply operator.

We also need to $filter the resulted array because it the last element is None or null. The reason is that when the current element is the last element, $subtract return None because the index of the next element equal the size of the array.

db.collection.aggregate([
  {
    "$unwind": {
      "path": "$time_series",
      "includeArrayIndex": "index"
    }
  },
  {
    "$group": {
      "_id": "$_id",
      "time_series": {
        "$push": {
          "value": "$time_series",
          "index": "$index"
        }
      }
    }
  },
  {
    "$project": {
      "time_series": {
        "$filter": {
          "input": {
            "$map": {
              "input": "$time_series",
              "as": "el",
              "in": {
                "$multiply": [
                  {
                    "$subtract": [
                      "$$el.value",
                      {
                        "$let": {
                          "vars": {
                            "nextElement": {
                              "$arrayElemAt": [
                                "$time_series",
                                {
                                  "$add": [
                                    "$$el.index",
                                    1
                                  ]
                                }
                              ]
                            }
                          },
                          "in": "$$nextElement.value"
                        }
                      }
                    ]
                  },
                  -1
                ]
              }
            }
          },
          "as": "item",
          "cond": {
            "$gte": [
              "$$item",
              0
            ]
          }
        }
      }
    }
  }
])

Another option which I think is less efficient is perform a map/reduce operation on our collection using the map_reduce method.

>>> import pymongo
>>> from bson.code import Code
>>> client = pymongo.MongoClient()
>>> db = client.test
>>> collection = db.collection
>>> mapper = Code("""
...               function() {
...                 var derivatives = [];
...                 for (var index=1; index<this.time_series.length; index++) {
...                   derivatives.push(this.time_series[index] - this.time_series[index-1]);
...                 }
...                 emit(this._id, derivatives);
...               }
...               """)
>>> reducer = Code("""
...                function(key, value) {}
...                """)
>>> for res in collection.map_reduce(mapper, reducer, out={'inline': 1})['results']:
...     print(res)  # or do something with the document.
... 
{'value': [10.0, 20.0, 30.0, 40.0], '_id': ObjectId('57c11ddbe860bd0b5df6bc64')}

You can also retrieve all the document and use the numpy.diff to return the derivative like this:

import numpy as np


for document in collection.find({}, {'time_series': 1}):
    result = np.diff(document['time_series']) 


回答2:

it's a bit dirty, but perhaps something like this?

use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})

var mapF = function() {
    emit(this.id, this.time_series);
    emit(this.id, this.time_series);
};

var reduceF = function(key, values){
    var n = values[0].length;
    var ret = [];
    for(var i = 0; i < n-1; i++){
        ret.push( values[0][i+1] - values[0][i] );
    }
    return {'gradient': ret};
};

var finalizeF = function(key, val){
    return val.gradient;
}

db['data'].mapReduce(
    mapF,
    reduceF,
    { out: 'data_d1', finalize: finalizeF }
)

db['data_d1'].find({})

The "strategy" here is to emit the data to be operated on twice so that it is accessible in the reduce stage, return an object to avoid the message "reduce -> multiple not supported yet" and then filter back the array in the finalizer.

This script then produces:

MongoDB shell version: 3.2.9
connecting to: test
switched to db test_db
WriteResult({ "nRemoved" : 1 })
WriteResult({ "nInserted" : 1 })
{
    "result" : "data_d1",
        "timeMillis" : 13,
        "counts" : {
            "input" : 1,
            "emit" : 2,     
            "reduce" : 1,           
            "output" : 1                    
        },                                      
        "ok" : 1                                    
}                                                   
{ "_id" : 1, "value" : [ 10, 20, 30, 40 ] }         
bye

Alternatively, one could move all the processing into the finalizer (reduceF is not called here since mapF is assumed to emit unique keys):

use test_db
db['data'].remove({})
db['data'].insert({id: 1, time_series: [10,20,40,70,110]})

var mapF = function() {
    emit(this.id, this.time_series);
};

var reduceF = function(key, values){
};

var finalizeF = function(key, val){
    var x = val;
    var n = x.length;

    var ret = [];
    for(var i = 0; i < n-1; i++){
        ret.push( x[i+1] - x[i] );
    }
    return ret;
}

db['data'].mapReduce(
    mapF,
    reduceF,
    { out: 'data_d1', finalize: finalizeF }
)

db['data_d1'].find({})