管道聚合
编辑管道聚合
编辑管道聚合处理其他聚合产生的输出,而不是处理文档集,从而向输出树添加信息。管道聚合有很多不同的类型,每种类型都从其他聚合计算不同的信息,但这些类型可以分为两个系列
- 父级
- 一类管道聚合,它接收其父聚合的输出,并能够计算新的桶或新的聚合以添加到现有桶中。
- 同级
- 一类管道聚合,它接收同级聚合的输出,并能够计算一个新的聚合,该聚合将与同级聚合处于同一级别。
管道聚合可以通过使用 buckets_path
参数来指示所需指标的路径,从而引用它们执行计算所需的聚合。定义这些路径的语法可以在下面的 buckets_path
语法 部分中找到。
管道聚合不能有子聚合,但根据类型,它可以在 buckets_path
中引用另一个管道,从而允许管道聚合链接在一起。例如,您可以将两个导数链接在一起以计算二阶导数(即导数的导数)。
由于管道聚合仅添加到输出中,因此在链接管道聚合时,每个管道聚合的输出都将包含在最终输出中。
buckets_path
语法
编辑大多数管道聚合都需要另一个聚合作为输入。输入聚合通过 buckets_path
参数定义,该参数遵循特定的格式
AGG_SEPARATOR = `>` ; METRIC_SEPARATOR = `.` ; AGG_NAME = <the name of the aggregation> ; METRIC = <the name of the metric (in case of multi-value metrics aggregation)> ; MULTIBUCKET_KEY = `[<KEY_NAME>]` PATH = <AGG_NAME><MULTIBUCKET_KEY>? (<AGG_SEPARATOR>, <AGG_NAME> )* ( <METRIC_SEPARATOR>, <METRIC> ) ;
例如,路径 "my_bucket>my_stats.avg"
将指向 "my_bucket"
桶聚合中包含的 "my_stats"
指标中的 avg
值。
以下是一些更多示例
-
multi_bucket["foo"]>single_bucket>multi_metric.avg
将转到"multi_bucket"
多桶聚合的"foo"
桶内的单个桶"single_bucket"
下的"multi_metric"
聚合中的avg
指标。 -
agg1["foo"]._count
将获取多桶聚合"multi_bucket"
中"foo"
桶的_count
指标
路径相对于管道聚合的位置;它们不是绝对路径,并且路径不能在聚合树中“向上”返回。例如,此导数嵌入在 date_histogram 中,并引用一个“同级”指标 "the_sum"
resp = client.search( aggs={ "my_date_histo": { "date_histogram": { "field": "timestamp", "calendar_interval": "day" }, "aggs": { "the_sum": { "sum": { "field": "lemmings" } }, "the_deriv": { "derivative": { "buckets_path": "the_sum" } } } } }, ) print(resp)
response = client.search( body: { aggregations: { my_date_histo: { date_histogram: { field: 'timestamp', calendar_interval: 'day' }, aggregations: { the_sum: { sum: { field: 'lemmings' } }, the_deriv: { derivative: { buckets_path: 'the_sum' } } } } } } ) puts response
const response = await client.search({ aggs: { my_date_histo: { date_histogram: { field: "timestamp", calendar_interval: "day", }, aggs: { the_sum: { sum: { field: "lemmings", }, }, the_deriv: { derivative: { buckets_path: "the_sum", }, }, }, }, }, }); console.log(response);
POST /_search { "aggs": { "my_date_histo": { "date_histogram": { "field": "timestamp", "calendar_interval": "day" }, "aggs": { "the_sum": { "sum": { "field": "lemmings" } }, "the_deriv": { "derivative": { "buckets_path": "the_sum" } } } } } }
buckets_path
也用于同级管道聚合,其中聚合“紧挨”一系列桶,而不是嵌入在它们“内部”。例如,max_bucket
聚合使用 buckets_path
来指定嵌入在同级聚合中的指标
resp = client.search( aggs={ "sales_per_month": { "date_histogram": { "field": "date", "calendar_interval": "month" }, "aggs": { "sales": { "sum": { "field": "price" } } } }, "max_monthly_sales": { "max_bucket": { "buckets_path": "sales_per_month>sales" } } }, ) print(resp)
response = client.search( body: { aggregations: { sales_per_month: { date_histogram: { field: 'date', calendar_interval: 'month' }, aggregations: { sales: { sum: { field: 'price' } } } }, max_monthly_sales: { max_bucket: { buckets_path: 'sales_per_month>sales' } } } } ) puts response
const response = await client.search({ aggs: { sales_per_month: { date_histogram: { field: "date", calendar_interval: "month", }, aggs: { sales: { sum: { field: "price", }, }, }, }, max_monthly_sales: { max_bucket: { buckets_path: "sales_per_month>sales", }, }, }, }); console.log(response);
POST /_search { "aggs": { "sales_per_month": { "date_histogram": { "field": "date", "calendar_interval": "month" }, "aggs": { "sales": { "sum": { "field": "price" } } } }, "max_monthly_sales": { "max_bucket": { "buckets_path": "sales_per_month>sales" } } } }
如果同级管道聚合引用多桶聚合(例如 terms
聚合),它也可以选择多桶中的特定键。例如,bucket_script
可以选择两个特定的桶(通过它们的桶键)来执行计算
resp = client.search( aggs={ "sales_per_month": { "date_histogram": { "field": "date", "calendar_interval": "month" }, "aggs": { "sale_type": { "terms": { "field": "type" }, "aggs": { "sales": { "sum": { "field": "price" } } } }, "hat_vs_bag_ratio": { "bucket_script": { "buckets_path": { "hats": "sale_type['hat']>sales", "bags": "sale_type['bag']>sales" }, "script": "params.hats / params.bags" } } } } }, ) print(resp)
response = client.search( body: { aggregations: { sales_per_month: { date_histogram: { field: 'date', calendar_interval: 'month' }, aggregations: { sale_type: { terms: { field: 'type' }, aggregations: { sales: { sum: { field: 'price' } } } }, hat_vs_bag_ratio: { bucket_script: { buckets_path: { hats: "sale_type['hat']>sales", bags: "sale_type['bag']>sales" }, script: 'params.hats / params.bags' } } } } } } ) puts response
const response = await client.search({ aggs: { sales_per_month: { date_histogram: { field: "date", calendar_interval: "month", }, aggs: { sale_type: { terms: { field: "type", }, aggs: { sales: { sum: { field: "price", }, }, }, }, hat_vs_bag_ratio: { bucket_script: { buckets_path: { hats: "sale_type['hat']>sales", bags: "sale_type['bag']>sales", }, script: "params.hats / params.bags", }, }, }, }, }, }); console.log(response);
POST /_search { "aggs": { "sales_per_month": { "date_histogram": { "field": "date", "calendar_interval": "month" }, "aggs": { "sale_type": { "terms": { "field": "type" }, "aggs": { "sales": { "sum": { "field": "price" } } } }, "hat_vs_bag_ratio": { "bucket_script": { "buckets_path": { "hats": "sale_type['hat']>sales", "bags": "sale_type['bag']>sales" }, "script": "params.hats / params.bags" } } } } } }
特殊路径
编辑除了指向指标外,buckets_path
还可以使用特殊的 "_count"
路径。这指示管道聚合使用文档计数作为其输入。例如,可以计算每个桶的文档计数的导数,而不是特定指标
resp = client.search( aggs={ "my_date_histo": { "date_histogram": { "field": "timestamp", "calendar_interval": "day" }, "aggs": { "the_deriv": { "derivative": { "buckets_path": "_count" } } } } }, ) print(resp)
response = client.search( body: { aggregations: { my_date_histo: { date_histogram: { field: 'timestamp', calendar_interval: 'day' }, aggregations: { the_deriv: { derivative: { buckets_path: '_count' } } } } } } ) puts response
const response = await client.search({ aggs: { my_date_histo: { date_histogram: { field: "timestamp", calendar_interval: "day", }, aggs: { the_deriv: { derivative: { buckets_path: "_count", }, }, }, }, }, }); console.log(response);
POST /_search { "aggs": { "my_date_histo": { "date_histogram": { "field": "timestamp", "calendar_interval": "day" }, "aggs": { "the_deriv": { "derivative": { "buckets_path": "_count" } } } } } }
buckets_path
还可以使用 "_bucket_count"
并指向一个多桶聚合,以在管道聚合中使用该聚合返回的桶数,而不是指标。例如,这里可以使用 bucket_selector
来过滤掉内部术语聚合中不包含桶的桶
resp = client.search( index="sales", size=0, aggs={ "histo": { "date_histogram": { "field": "date", "calendar_interval": "day" }, "aggs": { "categories": { "terms": { "field": "category" } }, "min_bucket_selector": { "bucket_selector": { "buckets_path": { "count": "categories._bucket_count" }, "script": { "source": "params.count != 0" } } } } } }, ) print(resp)
response = client.search( index: 'sales', body: { size: 0, aggregations: { histo: { date_histogram: { field: 'date', calendar_interval: 'day' }, aggregations: { categories: { terms: { field: 'category' } }, min_bucket_selector: { bucket_selector: { buckets_path: { count: 'categories._bucket_count' }, script: { source: 'params.count != 0' } } } } } } } ) puts response
const response = await client.search({ index: "sales", size: 0, aggs: { histo: { date_histogram: { field: "date", calendar_interval: "day", }, aggs: { categories: { terms: { field: "category", }, }, min_bucket_selector: { bucket_selector: { buckets_path: { count: "categories._bucket_count", }, script: { source: "params.count != 0", }, }, }, }, }, }, }); console.log(response);
POST /sales/_search { "size": 0, "aggs": { "histo": { "date_histogram": { "field": "date", "calendar_interval": "day" }, "aggs": { "categories": { "terms": { "field": "category" } }, "min_bucket_selector": { "bucket_selector": { "buckets_path": { "count": "categories._bucket_count" }, "script": { "source": "params.count != 0" } } } } } } }
处理聚合名称中的点
编辑支持另一种语法来处理名称中有点的聚合或指标,例如第 99.9
个 百分位数。此指标可以称为
"buckets_path": "my_percentile[99.9]"
处理数据中的间隙
编辑现实世界中的数据通常是嘈杂的,有时包含 间隙,即数据根本不存在的地方。这可能是由于多种原因造成的,最常见的原因是
- 落入桶的文档不包含必需的字段
- 一个或多个桶没有匹配查询的文档
- 计算的指标无法生成值,很可能是因为另一个依赖桶缺少值。一些管道聚合具有必须满足的特定要求(例如,导数无法计算第一个值的指标,因为它没有先前的值,HoltWinters 移动平均线需要“预热”数据才能开始计算等)
间隙策略是一种机制,用于在遇到“有间隙”或缺少数据时通知管道聚合所需的行为。所有管道聚合都接受 gap_policy
参数。目前有两种间隙策略可供选择
- 跳过
- 此选项将缺少的数据视为该桶不存在。它将跳过该桶,并继续使用下一个可用值进行计算。
- 插入零
- 此选项会将缺失的值替换为零 (
0
),并且管道聚合计算将照常进行。 - 保留值
- 此选项与跳过类似,但如果指标提供非空、非 NaN 值,则使用此值,否则将跳过空桶。