管道聚合

编辑

管道聚合处理其他聚合产生的输出,而不是处理文档集,从而向输出树添加信息。管道聚合有很多不同的类型,每种类型都从其他聚合计算不同的信息,但这些类型可以分为两个系列

父级
一类管道聚合,它接收其父聚合的输出,并能够计算新的桶或新的聚合以添加到现有桶中。
同级
一类管道聚合,它接收同级聚合的输出,并能够计算一个新的聚合,该聚合将与同级聚合处于同一级别。

管道聚合可以通过使用 buckets_path 参数来指示所需指标的路径,从而引用它们执行计算所需的聚合。定义这些路径的语法可以在下面的 buckets_path 语法 部分中找到。

管道聚合不能有子聚合,但根据类型,它可以在 buckets_path 中引用另一个管道,从而允许管道聚合链接在一起。例如,您可以将两个导数链接在一起以计算二阶导数(即导数的导数)。

由于管道聚合仅添加到输出中,因此在链接管道聚合时,每个管道聚合的输出都将包含在最终输出中。

buckets_path 语法

编辑

大多数管道聚合都需要另一个聚合作为输入。输入聚合通过 buckets_path 参数定义,该参数遵循特定的格式

AGG_SEPARATOR       =  `>` ;
METRIC_SEPARATOR    =  `.` ;
AGG_NAME            =  <the name of the aggregation> ;
METRIC              =  <the name of the metric (in case of multi-value metrics aggregation)> ;
MULTIBUCKET_KEY     =  `[<KEY_NAME>]`
PATH                =  <AGG_NAME><MULTIBUCKET_KEY>? (<AGG_SEPARATOR>, <AGG_NAME> )* ( <METRIC_SEPARATOR>, <METRIC> ) ;

例如,路径 "my_bucket>my_stats.avg" 将指向 "my_bucket" 桶聚合中包含的 "my_stats" 指标中的 avg 值。

以下是一些更多示例

  • multi_bucket["foo"]>single_bucket>multi_metric.avg 将转到 "multi_bucket" 多桶聚合的 "foo" 桶内的单个桶 "single_bucket" 下的 "multi_metric" 聚合中的 avg 指标。
  • agg1["foo"]._count 将获取多桶聚合 "multi_bucket""foo" 桶的 _count 指标

路径相对于管道聚合的位置;它们不是绝对路径,并且路径不能在聚合树中“向上”返回。例如,此导数嵌入在 date_histogram 中,并引用一个“同级”指标 "the_sum"

resp = client.search(
    aggs={
        "my_date_histo": {
            "date_histogram": {
                "field": "timestamp",
                "calendar_interval": "day"
            },
            "aggs": {
                "the_sum": {
                    "sum": {
                        "field": "lemmings"
                    }
                },
                "the_deriv": {
                    "derivative": {
                        "buckets_path": "the_sum"
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'timestamp',
          calendar_interval: 'day'
        },
        aggregations: {
          the_sum: {
            sum: {
              field: 'lemmings'
            }
          },
          the_deriv: {
            derivative: {
              buckets_path: 'the_sum'
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  aggs: {
    my_date_histo: {
      date_histogram: {
        field: "timestamp",
        calendar_interval: "day",
      },
      aggs: {
        the_sum: {
          sum: {
            field: "lemmings",
          },
        },
        the_deriv: {
          derivative: {
            buckets_path: "the_sum",
          },
        },
      },
    },
  },
});
console.log(response);
POST /_search
{
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "day"
      },
      "aggs": {
        "the_sum": {
          "sum": { "field": "lemmings" }              
        },
        "the_deriv": {
          "derivative": { "buckets_path": "the_sum" } 
        }
      }
    }
  }
}

该指标称为 "the_sum"

buckets_path 通过相对路径 "the_sum" 引用指标

buckets_path 也用于同级管道聚合,其中聚合“紧挨”一系列桶,而不是嵌入在它们“内部”。例如,max_bucket 聚合使用 buckets_path 来指定嵌入在同级聚合中的指标

resp = client.search(
    aggs={
        "sales_per_month": {
            "date_histogram": {
                "field": "date",
                "calendar_interval": "month"
            },
            "aggs": {
                "sales": {
                    "sum": {
                        "field": "price"
                    }
                }
            }
        },
        "max_monthly_sales": {
            "max_bucket": {
                "buckets_path": "sales_per_month>sales"
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    aggregations: {
      sales_per_month: {
        date_histogram: {
          field: 'date',
          calendar_interval: 'month'
        },
        aggregations: {
          sales: {
            sum: {
              field: 'price'
            }
          }
        }
      },
      max_monthly_sales: {
        max_bucket: {
          buckets_path: 'sales_per_month>sales'
        }
      }
    }
  }
)
puts response
const response = await client.search({
  aggs: {
    sales_per_month: {
      date_histogram: {
        field: "date",
        calendar_interval: "month",
      },
      aggs: {
        sales: {
          sum: {
            field: "price",
          },
        },
      },
    },
    max_monthly_sales: {
      max_bucket: {
        buckets_path: "sales_per_month>sales",
      },
    },
  },
});
console.log(response);
POST /_search
{
  "aggs": {
    "sales_per_month": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "sales": {
          "sum": {
            "field": "price"
          }
        }
      }
    },
    "max_monthly_sales": {
      "max_bucket": {
        "buckets_path": "sales_per_month>sales" 
      }
    }
  }
}

buckets_path 指示此 max_bucket 聚合,我们想要 sales_per_month 日期直方图中 sales 聚合的最大值。

如果同级管道聚合引用多桶聚合(例如 terms 聚合),它也可以选择多桶中的特定键。例如,bucket_script 可以选择两个特定的桶(通过它们的桶键)来执行计算

resp = client.search(
    aggs={
        "sales_per_month": {
            "date_histogram": {
                "field": "date",
                "calendar_interval": "month"
            },
            "aggs": {
                "sale_type": {
                    "terms": {
                        "field": "type"
                    },
                    "aggs": {
                        "sales": {
                            "sum": {
                                "field": "price"
                            }
                        }
                    }
                },
                "hat_vs_bag_ratio": {
                    "bucket_script": {
                        "buckets_path": {
                            "hats": "sale_type['hat']>sales",
                            "bags": "sale_type['bag']>sales"
                        },
                        "script": "params.hats / params.bags"
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    aggregations: {
      sales_per_month: {
        date_histogram: {
          field: 'date',
          calendar_interval: 'month'
        },
        aggregations: {
          sale_type: {
            terms: {
              field: 'type'
            },
            aggregations: {
              sales: {
                sum: {
                  field: 'price'
                }
              }
            }
          },
          hat_vs_bag_ratio: {
            bucket_script: {
              buckets_path: {
                hats: "sale_type['hat']>sales",
                bags: "sale_type['bag']>sales"
              },
              script: 'params.hats / params.bags'
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  aggs: {
    sales_per_month: {
      date_histogram: {
        field: "date",
        calendar_interval: "month",
      },
      aggs: {
        sale_type: {
          terms: {
            field: "type",
          },
          aggs: {
            sales: {
              sum: {
                field: "price",
              },
            },
          },
        },
        hat_vs_bag_ratio: {
          bucket_script: {
            buckets_path: {
              hats: "sale_type['hat']>sales",
              bags: "sale_type['bag']>sales",
            },
            script: "params.hats / params.bags",
          },
        },
      },
    },
  },
});
console.log(response);
POST /_search
{
  "aggs": {
    "sales_per_month": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "sale_type": {
          "terms": {
            "field": "type"
          },
          "aggs": {
            "sales": {
              "sum": {
                "field": "price"
              }
            }
          }
        },
        "hat_vs_bag_ratio": {
          "bucket_script": {
            "buckets_path": {
              "hats": "sale_type['hat']>sales",   
              "bags": "sale_type['bag']>sales"    
            },
            "script": "params.hats / params.bags"
          }
        }
      }
    }
  }
}

buckets_path 选择帽子和包桶(通过 ['hat']/['bag']`)来专门在脚本中使用,而不是从 sale_type 聚合中获取所有桶

特殊路径

编辑

除了指向指标外,buckets_path 还可以使用特殊的 "_count" 路径。这指示管道聚合使用文档计数作为其输入。例如,可以计算每个桶的文档计数的导数,而不是特定指标

resp = client.search(
    aggs={
        "my_date_histo": {
            "date_histogram": {
                "field": "timestamp",
                "calendar_interval": "day"
            },
            "aggs": {
                "the_deriv": {
                    "derivative": {
                        "buckets_path": "_count"
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    aggregations: {
      my_date_histo: {
        date_histogram: {
          field: 'timestamp',
          calendar_interval: 'day'
        },
        aggregations: {
          the_deriv: {
            derivative: {
              buckets_path: '_count'
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  aggs: {
    my_date_histo: {
      date_histogram: {
        field: "timestamp",
        calendar_interval: "day",
      },
      aggs: {
        the_deriv: {
          derivative: {
            buckets_path: "_count",
          },
        },
      },
    },
  },
});
console.log(response);
POST /_search
{
  "aggs": {
    "my_date_histo": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "day"
      },
      "aggs": {
        "the_deriv": {
          "derivative": { "buckets_path": "_count" } 
        }
      }
    }
  }
}

通过使用 _count 而不是指标名称,我们可以计算直方图中文档计数的导数

buckets_path 还可以使用 "_bucket_count" 并指向一个多桶聚合,以在管道聚合中使用该聚合返回的桶数,而不是指标。例如,这里可以使用 bucket_selector 来过滤掉内部术语聚合中不包含桶的桶

resp = client.search(
    index="sales",
    size=0,
    aggs={
        "histo": {
            "date_histogram": {
                "field": "date",
                "calendar_interval": "day"
            },
            "aggs": {
                "categories": {
                    "terms": {
                        "field": "category"
                    }
                },
                "min_bucket_selector": {
                    "bucket_selector": {
                        "buckets_path": {
                            "count": "categories._bucket_count"
                        },
                        "script": {
                            "source": "params.count != 0"
                        }
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  index: 'sales',
  body: {
    size: 0,
    aggregations: {
      histo: {
        date_histogram: {
          field: 'date',
          calendar_interval: 'day'
        },
        aggregations: {
          categories: {
            terms: {
              field: 'category'
            }
          },
          min_bucket_selector: {
            bucket_selector: {
              buckets_path: {
                count: 'categories._bucket_count'
              },
              script: {
                source: 'params.count != 0'
              }
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  index: "sales",
  size: 0,
  aggs: {
    histo: {
      date_histogram: {
        field: "date",
        calendar_interval: "day",
      },
      aggs: {
        categories: {
          terms: {
            field: "category",
          },
        },
        min_bucket_selector: {
          bucket_selector: {
            buckets_path: {
              count: "categories._bucket_count",
            },
            script: {
              source: "params.count != 0",
            },
          },
        },
      },
    },
  },
});
console.log(response);
POST /sales/_search
{
  "size": 0,
  "aggs": {
    "histo": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "day"
      },
      "aggs": {
        "categories": {
          "terms": {
            "field": "category"
          }
        },
        "min_bucket_selector": {
          "bucket_selector": {
            "buckets_path": {
              "count": "categories._bucket_count" 
            },
            "script": {
              "source": "params.count != 0"
            }
          }
        }
      }
    }
  }
}

通过使用 _bucket_count 而不是指标名称,我们可以过滤掉 histo 桶,其中它们不包含 categories 聚合的桶

处理聚合名称中的点

编辑

支持另一种语法来处理名称中有点的聚合或指标,例如第 99.9百分位数。此指标可以称为

"buckets_path": "my_percentile[99.9]"

处理数据中的间隙

编辑

现实世界中的数据通常是嘈杂的,有时包含 间隙,即数据根本不存在的地方。这可能是由于多种原因造成的,最常见的原因是

  • 落入桶的文档不包含必需的字段
  • 一个或多个桶没有匹配查询的文档
  • 计算的指标无法生成值,很可能是因为另一个依赖桶缺少值。一些管道聚合具有必须满足的特定要求(例如,导数无法计算第一个值的指标,因为它没有先前的值,HoltWinters 移动平均线需要“预热”数据才能开始计算等)

间隙策略是一种机制,用于在遇到“有间隙”或缺少数据时通知管道聚合所需的行为。所有管道聚合都接受 gap_policy 参数。目前有两种间隙策略可供选择

跳过
此选项将缺少的数据视为该桶不存在。它将跳过该桶,并继续使用下一个可用值进行计算。
插入零
此选项会将缺失的值替换为零 (0),并且管道聚合计算将照常进行。
保留值
此选项与跳过类似,但如果指标提供非空、非 NaN 值,则使用此值,否则将跳过空桶。