How to aggregate until a certain value is reached in ElasticSearch?
Asked Answered
A

1

6

I would like to aggregate a list of documents (each of them has two fields - timestamp and amount) by "amount" field until a certain value is reached. For example I would like to get list of documents sorted by timestamp which total amount is equal to 100. Is it possible to do in one query?

Here is my query which returns total amount - I would like to add here a condition to stop aggregation when a certain value is reached.

{
"query": {
    "bool": {
        "filter": [
            {
                "range": {
                    "timestamp": {
                        "gte": 1525168583
                    }
                }
            }
        ]
    }
},
"aggs": {
    "total_amount": {
        "sum": {
            "field": "amount"
        }
    }
},
"sort": [
    "timestamp"
],
"size": 10000
}

Thank You

Annadiana answered 16/5, 2019 at 9:17 Comment(5)
Please note that sort and size have nothing to do with aggregations, only with the hits that are returned. So it's unclear what results you'd like to see. Can you show some mock example of what you'd like to see?Crespi
Yes I know, You can ignore them sorry.Annadiana
Still, I'd like you to explain with an example what result you'd like to see. Do you want the total per hour/day/... not to exceed 100? or get the set of documents ordered by timestamp up to their total amount reaches 100?Crespi
I would like to get a list of documents ordered by timestamp up to their total amount reaches 100.Annadiana
The answer is: no, it's not possible. Aggregations do not affect the query itself, sorting or limiting the result set's size. They are applied to all the documents matching the query (irrespective of "size" value).Magnesium
S
1

It's perfectly possible using a combination of function_score scripting for mimicking sorting, filter aggs for the range gte query and a healthy amount of scripted_metric aggs to limit the summation up to a certain amount.

Let's first set up a mapping and ingest some docs:

PUT summation
{
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "epoch_second"
      }
    }
  }
}
POST summation/_doc
{
  "context": "newest",
  "timestamp": 1587049128,
  "amount": 20
}

POST summation/_doc
{
  "context": "2nd newest",
  "timestamp": 1586049128,
  "amount": 30
}

POST summation/_doc
{
  "context": "3rd newest",
  "timestamp": 1585049128,
  "amount": 40
}

POST summation/_doc
{
  "context": "4th newest",
  "timestamp": 1585049128,
  "amount": 30
}

Then perform the query:

GET summation/_search
{
  "size": 0,
  "aggs": {
    "filtered_agg": {
      "filter": {
        "bool": {
          "must": [
            {
              "range": {
                "timestamp": {
                  "gte": 1585049128
                }
              }
            },
            {
              "function_score": {
                "query": {
                  "match_all": {}
                },
                "script_score": {
                  "script": {
                    "source": "return (params['now'] - doc['timestamp'].date.toMillis())",
                    "params": {
                      "now": 1587049676
                    }
                  }
                }
              }
            }
          ]
        }
      },
      "aggs": {
        "limited_sum": {
          "scripted_metric": {
            "init_script": """
                state['my_hash'] = new HashMap();
                state['my_hash'].put('sum', 0);
                state['my_hash'].put('docs', new ArrayList());
            """,
            "map_script": """
              if (state['my_hash']['sum'] <= 100) {
                state['my_hash']['sum'] += doc['amount'].value;
                state['my_hash']['docs'].add(doc['context.keyword'].value);
              }
            """,
            "combine_script": "return state['my_hash']",
            "reduce_script": "return states[0]"
          }
        }
      }
    }
  }
}

yielding

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "filtered_agg" : {
      "meta" : { },
      "doc_count" : 4,
      "limited_sum" : {
        "value" : {
          "docs" : [
            "newest",
            "2nd newest",
            "3rd newest",
            "4th newest"
          ],
          "sum" : 120
        }
      }
    }
  }
}

I've chosen here to only return the doc.contexts but you can adjust it to retrieve whatever you like -- be it IDs, amounts etc.

Seaman answered 16/4, 2020 at 16:55 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.