速率聚合

编辑

rate 度量聚合只能在 date_histogramcomposite 聚合内部使用。它计算每个桶中文档或字段的速率。字段值可以从文档中特定的数值或直方图字段中提取。

对于 composite 聚合,必须只有一个 date_histogram 源才能支持 rate 聚合。

语法

编辑

一个独立的 rate 聚合看起来像这样

{
  "rate": {
    "unit": "month",
    "field": "requests"
  }
}

以下请求会将所有销售记录分组到每月桶中,然后将每个桶中的销售交易数量转换为年销售率。

resp = client.search(
    index="sales",
    size=0,
    aggs={
        "by_date": {
            "date_histogram": {
                "field": "date",
                "calendar_interval": "month"
            },
            "aggs": {
                "my_rate": {
                    "rate": {
                        "unit": "year"
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  index: 'sales',
  body: {
    size: 0,
    aggregations: {
      by_date: {
        date_histogram: {
          field: 'date',
          calendar_interval: 'month'
        },
        aggregations: {
          my_rate: {
            rate: {
              unit: 'year'
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  index: "sales",
  size: 0,
  aggs: {
    by_date: {
      date_histogram: {
        field: "date",
        calendar_interval: "month",
      },
      aggs: {
        my_rate: {
          rate: {
            unit: "year",
          },
        },
      },
    },
  },
});
console.log(response);
GET sales/_search
{
  "size": 0,
  "aggs": {
    "by_date": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"  
      },
      "aggs": {
        "my_rate": {
          "rate": {
            "unit": "year"  
          }
        }
      }
    }
  }
}

直方图按月分组。

但速率转换为年速率。

响应将返回每个桶中的年交易率。由于一年有 12 个月,年速率将通过将月速率乘以 12 自动计算得出。

{
  ...
  "aggregations" : {
    "by_date" : {
      "buckets" : [
        {
          "key_as_string" : "2015/01/01 00:00:00",
          "key" : 1420070400000,
          "doc_count" : 3,
          "my_rate" : {
            "value" : 36.0
          }
        },
        {
          "key_as_string" : "2015/02/01 00:00:00",
          "key" : 1422748800000,
          "doc_count" : 2,
          "my_rate" : {
            "value" : 24.0
          }
        },
        {
          "key_as_string" : "2015/03/01 00:00:00",
          "key" : 1425168000000,
          "doc_count" : 2,
          "my_rate" : {
            "value" : 24.0
          }
        }
      ]
    }
  }
}

除了计算文档数量,还可以计算每个桶中文档中所有字段值的总和或每个桶中值的数量。以下请求会将所有销售记录分组到每月桶中,然后计算每月总销售额并将其转换为平均每日销售额。

resp = client.search(
    index="sales",
    size=0,
    aggs={
        "by_date": {
            "date_histogram": {
                "field": "date",
                "calendar_interval": "month"
            },
            "aggs": {
                "avg_price": {
                    "rate": {
                        "field": "price",
                        "unit": "day"
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  index: 'sales',
  body: {
    size: 0,
    aggregations: {
      by_date: {
        date_histogram: {
          field: 'date',
          calendar_interval: 'month'
        },
        aggregations: {
          avg_price: {
            rate: {
              field: 'price',
              unit: 'day'
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  index: "sales",
  size: 0,
  aggs: {
    by_date: {
      date_histogram: {
        field: "date",
        calendar_interval: "month",
      },
      aggs: {
        avg_price: {
          rate: {
            field: "price",
            unit: "day",
          },
        },
      },
    },
  },
});
console.log(response);
GET sales/_search
{
  "size": 0,
  "aggs": {
    "by_date": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"  
      },
      "aggs": {
        "avg_price": {
          "rate": {
            "field": "price", 
            "unit": "day"  
          }
        }
      }
    }
  }
}

直方图按月分组。

计算所有销售价格的总和

转换为平均每日销售额

响应将包含每个月的平均每日销售价格。

{
  ...
  "aggregations" : {
    "by_date" : {
      "buckets" : [
        {
          "key_as_string" : "2015/01/01 00:00:00",
          "key" : 1420070400000,
          "doc_count" : 3,
          "avg_price" : {
            "value" : 17.741935483870968
          }
        },
        {
          "key_as_string" : "2015/02/01 00:00:00",
          "key" : 1422748800000,
          "doc_count" : 2,
          "avg_price" : {
            "value" : 2.142857142857143
          }
        },
        {
          "key_as_string" : "2015/03/01 00:00:00",
          "key" : 1425168000000,
          "doc_count" : 2,
          "avg_price" : {
            "value" : 12.096774193548388
          }
        }
      ]
    }
  }
}

您还可以利用 composite 聚合来计算库存中每个商品的平均每日销售价格

resp = client.search(
    index="sales",
    filter_path="aggregations",
    size="0",
    aggs={
        "buckets": {
            "composite": {
                "sources": [
                    {
                        "month": {
                            "date_histogram": {
                                "field": "date",
                                "calendar_interval": "month"
                            }
                        }
                    },
                    {
                        "type": {
                            "terms": {
                                "field": "type"
                            }
                        }
                    }
                ]
            },
            "aggs": {
                "avg_price": {
                    "rate": {
                        "field": "price",
                        "unit": "day"
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  index: 'sales',
  filter_path: 'aggregations',
  size: 0,
  body: {
    aggregations: {
      buckets: {
        composite: {
          sources: [
            {
              month: {
                date_histogram: {
                  field: 'date',
                  calendar_interval: 'month'
                }
              }
            },
            {
              type: {
                terms: {
                  field: 'type'
                }
              }
            }
          ]
        },
        aggregations: {
          avg_price: {
            rate: {
              field: 'price',
              unit: 'day'
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  index: "sales",
  filter_path: "aggregations",
  size: 0,
  aggs: {
    buckets: {
      composite: {
        sources: [
          {
            month: {
              date_histogram: {
                field: "date",
                calendar_interval: "month",
              },
            },
          },
          {
            type: {
              terms: {
                field: "type",
              },
            },
          },
        ],
      },
      aggs: {
        avg_price: {
          rate: {
            field: "price",
            unit: "day",
          },
        },
      },
    },
  },
});
console.log(response);
GET sales/_search?filter_path=aggregations&size=0
{
  "aggs": {
    "buckets": {
      "composite": { 
        "sources": [
          {
            "month": {
              "date_histogram": { 
                "field": "date",
                "calendar_interval": "month"
              }
            }
          },
          {
            "type": { 
              "terms": {
                "field": "type"
              }
            }
          }
        ]
      },
      "aggs": {
        "avg_price": {
          "rate": {
            "field": "price", 
            "unit": "day" 
          }
        }
      }
    }
  }
}

带有日期直方图源和商品类型源的复合聚合。

日期直方图源按月分组

按每个销售商品类型分组的词条源

计算每月和每件商品的所有销售价格总和

转换为每件商品的平均每日销售额

响应将包含每个月每件商品的平均每日销售价格。

{
  "aggregations" : {
    "buckets" : {
      "after_key" : {
        "month" : 1425168000000,
        "type" : "t-shirt"
      },
      "buckets" : [
        {
          "key" : {
            "month" : 1420070400000,
            "type" : "bag"
          },
          "doc_count" : 1,
          "avg_price" : {
            "value" : 4.838709677419355
          }
        },
        {
          "key" : {
            "month" : 1420070400000,
            "type" : "hat"
          },
          "doc_count" : 1,
          "avg_price" : {
            "value" : 6.451612903225806
          }
        },
        {
          "key" : {
            "month" : 1420070400000,
            "type" : "t-shirt"
          },
          "doc_count" : 1,
          "avg_price" : {
            "value" : 6.451612903225806
          }
        },
        {
          "key" : {
            "month" : 1422748800000,
            "type" : "hat"
          },
          "doc_count" : 1,
          "avg_price" : {
            "value" : 1.7857142857142858
          }
        },
        {
          "key" : {
            "month" : 1422748800000,
            "type" : "t-shirt"
          },
          "doc_count" : 1,
          "avg_price" : {
            "value" : 0.35714285714285715
          }
        },
        {
          "key" : {
            "month" : 1425168000000,
            "type" : "hat"
          },
          "doc_count" : 1,
          "avg_price" : {
            "value" : 6.451612903225806
          }
        },
        {
          "key" : {
            "month" : 1425168000000,
            "type" : "t-shirt"
          },
          "doc_count" : 1,
          "avg_price" : {
            "value" : 5.645161290322581
          }
        }
      ]
    }
  }
}

通过添加值为 value_countmode 参数,我们可以将计算从 sum 更改为字段值的数量

resp = client.search(
    index="sales",
    size=0,
    aggs={
        "by_date": {
            "date_histogram": {
                "field": "date",
                "calendar_interval": "month"
            },
            "aggs": {
                "avg_number_of_sales_per_year": {
                    "rate": {
                        "field": "price",
                        "unit": "year",
                        "mode": "value_count"
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  index: 'sales',
  body: {
    size: 0,
    aggregations: {
      by_date: {
        date_histogram: {
          field: 'date',
          calendar_interval: 'month'
        },
        aggregations: {
          avg_number_of_sales_per_year: {
            rate: {
              field: 'price',
              unit: 'year',
              mode: 'value_count'
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  index: "sales",
  size: 0,
  aggs: {
    by_date: {
      date_histogram: {
        field: "date",
        calendar_interval: "month",
      },
      aggs: {
        avg_number_of_sales_per_year: {
          rate: {
            field: "price",
            unit: "year",
            mode: "value_count",
          },
        },
      },
    },
  },
});
console.log(response);
GET sales/_search
{
  "size": 0,
  "aggs": {
    "by_date": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"  
      },
      "aggs": {
        "avg_number_of_sales_per_year": {
          "rate": {
            "field": "price", 
            "unit": "year",  
            "mode": "value_count" 
          }
        }
      }
    }
  }
}

直方图按月分组。

计算所有销售价格的数量

转换为年度计数

将模式更改为值计数

响应将包含每个月的平均每日销售价格。

{
  ...
  "aggregations" : {
    "by_date" : {
      "buckets" : [
        {
          "key_as_string" : "2015/01/01 00:00:00",
          "key" : 1420070400000,
          "doc_count" : 3,
          "avg_number_of_sales_per_year" : {
            "value" : 36.0
          }
        },
        {
          "key_as_string" : "2015/02/01 00:00:00",
          "key" : 1422748800000,
          "doc_count" : 2,
          "avg_number_of_sales_per_year" : {
            "value" : 24.0
          }
        },
        {
          "key_as_string" : "2015/03/01 00:00:00",
          "key" : 1425168000000,
          "doc_count" : 2,
          "avg_number_of_sales_per_year" : {
            "value" : 24.0
          }
        }
      ]
    }
  }
}

默认情况下使用 sum 模式。

"mode": "sum"
计算所有值字段的总和
"mode": "value_count"
使用字段中值的数量

桶大小和速率之间的关系

编辑

rate 聚合支持 date_histogram 聚合的 calendar_intervals 参数 可以使用的所有速率。指定的速率应与 date_histogram 聚合间隔兼容,即应可以将桶大小转换为速率。默认情况下,使用 date_histogram 的间隔。

"rate": "second"
与所有间隔兼容
"rate": "minute"
与所有间隔兼容
"rate": "hour"
与所有间隔兼容
"rate": "day"
与所有间隔兼容
"rate": "week"
与所有间隔兼容
"rate": "month"
仅与 monthquarteryear 日历间隔兼容
"rate": "quarter"
仅与 monthquarteryear 日历间隔兼容
"rate": "year"
仅与 monthquarteryear 日历间隔兼容

如果日期直方图不是速率直方图的直接父级,则还有其他限制。在这种情况下,速率间隔和直方图间隔都必须在同一组中:[second, ` minute`, hour, day, week] 或 [month, quarter, year]。例如,如果日期直方图基于 month,则仅支持 monthquarteryear 的速率间隔。如果日期直方图基于 day,则仅支持 second、` minute`、hourdayweek 的速率间隔。

脚本

编辑

如果需要针对未索引的值运行聚合,请在运行时字段上运行聚合。例如,如果我们需要在计算速率之前调整价格

resp = client.search(
    index="sales",
    size=0,
    runtime_mappings={
        "price.adjusted": {
            "type": "double",
            "script": {
                "source": "emit(doc['price'].value * params.adjustment)",
                "params": {
                    "adjustment": 0.9
                }
            }
        }
    },
    aggs={
        "by_date": {
            "date_histogram": {
                "field": "date",
                "calendar_interval": "month"
            },
            "aggs": {
                "avg_price": {
                    "rate": {
                        "field": "price.adjusted"
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  index: 'sales',
  body: {
    size: 0,
    runtime_mappings: {
      'price.adjusted' => {
        type: 'double',
        script: {
          source: "emit(doc['price'].value * params.adjustment)",
          params: {
            adjustment: 0.9
          }
        }
      }
    },
    aggregations: {
      by_date: {
        date_histogram: {
          field: 'date',
          calendar_interval: 'month'
        },
        aggregations: {
          avg_price: {
            rate: {
              field: 'price.adjusted'
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  index: "sales",
  size: 0,
  runtime_mappings: {
    "price.adjusted": {
      type: "double",
      script: {
        source: "emit(doc['price'].value * params.adjustment)",
        params: {
          adjustment: 0.9,
        },
      },
    },
  },
  aggs: {
    by_date: {
      date_histogram: {
        field: "date",
        calendar_interval: "month",
      },
      aggs: {
        avg_price: {
          rate: {
            field: "price.adjusted",
          },
        },
      },
    },
  },
});
console.log(response);
GET sales/_search
{
  "size": 0,
  "runtime_mappings": {
    "price.adjusted": {
      "type": "double",
      "script": {
        "source": "emit(doc['price'].value * params.adjustment)",
        "params": {
          "adjustment": 0.9
        }
      }
    }
  },
  "aggs": {
    "by_date": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "avg_price": {
          "rate": {
            "field": "price.adjusted"
          }
        }
      }
    }
  }
}
{
  ...
  "aggregations" : {
    "by_date" : {
      "buckets" : [
        {
          "key_as_string" : "2015/01/01 00:00:00",
          "key" : 1420070400000,
          "doc_count" : 3,
          "avg_price" : {
            "value" : 495.0
          }
        },
        {
          "key_as_string" : "2015/02/01 00:00:00",
          "key" : 1422748800000,
          "doc_count" : 2,
          "avg_price" : {
            "value" : 54.0
          }
        },
        {
          "key_as_string" : "2015/03/01 00:00:00",
          "key" : 1425168000000,
          "doc_count" : 2,
          "avg_price" : {
            "value" : 337.5
          }
        }
      ]
    }
  }
}