复合聚合

编辑

复合聚合开销很大。在生产环境中部署复合聚合之前,请对您的应用程序进行负载测试。

一个多桶聚合,从不同的源创建复合桶。

与其他 多桶 聚合不同,您可以使用 composite 聚合来有效地分页多级聚合中的所有桶。此聚合提供了一种流式传输特定聚合的所有桶的方法,类似于滚动对文档的作用。

复合桶由为每个文档提取/创建的值的组合构建,并且每个组合都被视为一个复合桶。

例如,考虑以下文档

{
  "keyword": ["foo", "bar"],
  "number": [23, 65, 76]
}

使用 keywordnumber 作为聚合的源字段会产生以下复合桶

{ "keyword": "foo", "number": 23 }
{ "keyword": "foo", "number": 65 }
{ "keyword": "foo", "number": 76 }
{ "keyword": "bar", "number": 23 }
{ "keyword": "bar", "number": 65 }
{ "keyword": "bar", "number": 76 }

值源

编辑

sources 参数定义在构建复合桶时使用的源字段。sources 的定义顺序控制键的返回顺序。

定义 sources 时必须使用唯一的名称。

sources 参数可以是以下任何类型

词项

编辑

terms 值源类似于简单的 terms 聚合。这些值像 terms 聚合一样从字段中提取。

示例

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "product": {
                            "terms": {
                                "field": "product"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              product: {
                terms: {
                  field: 'product'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            product: {
              terms: {
                field: "product",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "product": { "terms": { "field": "product" } } }
        ]
      }
    }
  }
}

terms 聚合一样,可以使用运行时字段来创建复合桶的值

resp = client.search(
    runtime_mappings={
        "day_of_week": {
            "type": "keyword",
            "script": "\n        emit(doc['timestamp'].value.dayOfWeekEnum\n          .getDisplayName(TextStyle.FULL, Locale.ENGLISH))\n      "
        }
    },
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "dow": {
                            "terms": {
                                "field": "day_of_week"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
const response = await client.search({
  runtime_mappings: {
    day_of_week: {
      type: "keyword",
      script:
        "\n        emit(doc['timestamp'].value.dayOfWeekEnum\n          .getDisplayName(TextStyle.FULL, Locale.ENGLISH))\n      ",
    },
  },
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            dow: {
              terms: {
                field: "day_of_week",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "runtime_mappings": {
    "day_of_week": {
      "type": "keyword",
      "script": """
        emit(doc['timestamp'].value.dayOfWeekEnum
          .getDisplayName(TextStyle.FULL, Locale.ENGLISH))
      """
    }
  },
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          {
            "dow": {
              "terms": { "field": "day_of_week" }
            }
          }
        ]
      }
    }
  }
}

虽然相似,但 terms 值源不支持与 terms 聚合相同的参数集。有关其他支持的值源参数,请参阅

直方图

编辑

histogram 值源可以应用于数值,以在值上构建固定大小的间隔。interval 参数定义如何转换数值。例如,设置为 5 的 interval 会将任何数值转换为其最接近的间隔,值 101 将转换为 100,这是 100 到 105 之间间隔的键。

示例

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "histo": {
                            "histogram": {
                                "field": "price",
                                "interval": 5
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              histo: {
                histogram: {
                  field: 'price',
                  interval: 5
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            histo: {
              histogram: {
                field: "price",
                interval: 5,
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "histo": { "histogram": { "field": "price", "interval": 5 } } }
        ]
      }
    }
  }
}

histogram 聚合一样,可以使用运行时字段来创建复合桶的值

resp = client.search(
    runtime_mappings={
        "price.discounted": {
            "type": "double",
            "script": "\n        double price = doc['price'].value;\n        if (doc['product'].value == 'mad max') {\n          price *= 0.8;\n        }\n        emit(price);\n      "
        }
    },
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "price": {
                            "histogram": {
                                "interval": 5,
                                "field": "price.discounted"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    runtime_mappings: {
      'price.discounted' => {
        type: 'double',
        script: "\n        double price = doc['price'].value;\n        if (doc['product'].value == 'mad max') {\n          price *= 0.8;\n        }\n        emit(price);\n      "
      }
    },
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              price: {
                histogram: {
                  interval: 5,
                  field: 'price.discounted'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  runtime_mappings: {
    "price.discounted": {
      type: "double",
      script:
        "\n        double price = doc['price'].value;\n        if (doc['product'].value == 'mad max') {\n          price *= 0.8;\n        }\n        emit(price);\n      ",
    },
  },
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            price: {
              histogram: {
                interval: 5,
                field: "price.discounted",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "runtime_mappings": {
    "price.discounted": {
      "type": "double",
      "script": """
        double price = doc['price'].value;
        if (doc['product'].value == 'mad max') {
          price *= 0.8;
        }
        emit(price);
      """
    }
  },
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          {
            "price": {
              "histogram": {
                "interval": 5,
                "field": "price.discounted"
              }
            }
          }
        ]
      }
    }
  }
}

日期直方图

编辑

date_histogram 类似于 histogram 值源,不同之处在于间隔由日期/时间表达式指定

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } }
        ]
      }
    }
  }
}

上面的示例创建每天的间隔,并将所有 timestamp 值转换为其最接近的间隔的开始时间。可用的间隔表达式:yearquartermonthweekdayhourminutesecond

时间值也可以通过时间单位解析支持的缩写指定。请注意,不支持小数时间值,但是可以通过切换到另一个时间单位来解决此问题(例如,1.5h 可以改为指定为 90m)。

格式

在内部,日期表示为 64 位数字,表示自 Epoch 以来的毫秒时间戳。这些时间戳作为桶键返回。可以使用 格式模式中指定的格式返回格式化的日期字符串。

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d",
                                "format": "yyyy-MM-dd"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d',
                  format: 'yyyy-MM-dd'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
                format: "yyyy-MM-dd",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          {
            "date": {
              "date_histogram": {
                "field": "timestamp",
                "calendar_interval": "1d",
                "format": "yyyy-MM-dd"         
              }
            }
          }
        ]
      }
    }
  }
}

支持表达性的日期格式模式

时区

日期时间在 Elasticsearch 中以 UTC 格式存储。默认情况下,所有分桶和舍入也以 UTC 格式完成。time_zone 参数可用于指示分桶应使用不同的时区。

时区可以指定为 ISO 8601 UTC 偏移量(例如 +01:00-08:00),也可以指定为时区 ID,即 TZ 数据库中使用的标识符,如 America/Los_Angeles

偏移量

使用 offset 参数将每个桶的起始值更改为指定的正 (+) 或负 (-) 持续时间,例如 1h 表示一小时,1d 表示一天。有关更多可能的时间持续时间选项,请参阅时间单位

例如,当使用 day 的间隔时,每个桶从午夜运行到午夜。将 offset 参数设置为 +6h 会将每个桶更改为从上午 6 点到上午 6 点运行

resp = client.index(
    index="my-index-000001",
    id="1",
    refresh=True,
    document={
        "date": "2015-10-01T05:30:00Z"
    },
)
print(resp)

resp1 = client.index(
    index="my-index-000001",
    id="2",
    refresh=True,
    document={
        "date": "2015-10-01T06:30:00Z"
    },
)
print(resp1)

resp2 = client.search(
    index="my-index-000001",
    size="0",
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "date": {
                            "date_histogram": {
                                "field": "date",
                                "calendar_interval": "day",
                                "offset": "+6h",
                                "format": "iso8601"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp2)
response = client.index(
  index: 'my-index-000001',
  id: 1,
  refresh: true,
  body: {
    date: '2015-10-01T05:30:00Z'
  }
)
puts response

response = client.index(
  index: 'my-index-000001',
  id: 2,
  refresh: true,
  body: {
    date: '2015-10-01T06:30:00Z'
  }
)
puts response

response = client.search(
  index: 'my-index-000001',
  size: 0,
  body: {
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              date: {
                date_histogram: {
                  field: 'date',
                  calendar_interval: 'day',
                  offset: '+6h',
                  format: 'iso8601'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.index({
  index: "my-index-000001",
  id: 1,
  refresh: "true",
  document: {
    date: "2015-10-01T05:30:00Z",
  },
});
console.log(response);

const response1 = await client.index({
  index: "my-index-000001",
  id: 2,
  refresh: "true",
  document: {
    date: "2015-10-01T06:30:00Z",
  },
});
console.log(response1);

const response2 = await client.search({
  index: "my-index-000001",
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            date: {
              date_histogram: {
                field: "date",
                calendar_interval: "day",
                offset: "+6h",
                format: "iso8601",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response2);
PUT my-index-000001/_doc/1?refresh
{
  "date": "2015-10-01T05:30:00Z"
}

PUT my-index-000001/_doc/2?refresh
{
  "date": "2015-10-01T06:30:00Z"
}

GET my-index-000001/_search?size=0
{
  "aggs": {
    "my_buckets": {
      "composite" : {
        "sources" : [
          {
            "date": {
              "date_histogram" : {
                "field": "date",
                "calendar_interval": "day",
                "offset": "+6h",
                "format": "iso8601"
              }
            }
          }
        ]
      }
    }
  }
}

上面的请求不是从午夜开始的单个桶,而是将文档分组到从上午 6 点开始的桶中

{
  ...
  "aggregations": {
    "my_buckets": {
      "after_key": { "date": "2015-10-01T06:00:00.000Z" },
      "buckets": [
        {
          "key": { "date": "2015-09-30T06:00:00.000Z" },
          "doc_count": 1
        },
        {
          "key": { "date": "2015-10-01T06:00:00.000Z" },
          "doc_count": 1
        }
      ]
    }
  }
}

每个桶的起始 offset 是在进行 time_zone 调整后计算的。

GeoTile 网格

编辑

geotile_grid 值源适用于 geo_point 字段,并将点分组到表示网格中单元格的桶中。生成的网格可能是稀疏的,并且仅包含具有匹配数据的单元格。每个单元格都对应于许多在线地图站点使用的地图瓦片。每个单元格都使用 "{zoom}/{x}/{y}" 格式标记,其中 zoom 等于用户指定的精度。

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "tile": {
                            "geotile_grid": {
                                "field": "location",
                                "precision": 8
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              tile: {
                geotile_grid: {
                  field: 'location',
                  precision: 8
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            tile: {
              geotile_grid: {
                field: "location",
                precision: 8,
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "tile": { "geotile_grid": { "field": "location", "precision": 8 } } }
        ]
      }
    }
  }
}

精度

长度为 29 的最高精度地理瓦片产生的单元格覆盖的陆地面积小于 10 厘米 x 10 厘米。此精度非常适合复合聚合,因为不必生成每个瓦片并将其加载到内存中。

请参阅关于精度(缩放)如何与地面大小相关联的缩放级别文档。此聚合的精度可以在 0 到 29 之间,包括 0 和 29。

边界框筛选

地理瓦片源可以选择约束为特定的地理边界框,这会减少所用瓦片的范围。当只需要地理区域的特定部分进行高精度平铺时,这些边界很有用。

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "tile": {
                            "geotile_grid": {
                                "field": "location",
                                "precision": 22,
                                "bounds": {
                                    "top_left": "POINT (4.9 52.4)",
                                    "bottom_right": "POINT (5.0 52.3)"
                                }
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              tile: {
                geotile_grid: {
                  field: 'location',
                  precision: 22,
                  bounds: {
                    top_left: 'POINT (4.9 52.4)',
                    bottom_right: 'POINT (5.0 52.3)'
                  }
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            tile: {
              geotile_grid: {
                field: "location",
                precision: 22,
                bounds: {
                  top_left: "POINT (4.9 52.4)",
                  bottom_right: "POINT (5.0 52.3)",
                },
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          {
            "tile": {
              "geotile_grid": {
                "field": "location",
                "precision": 22,
                "bounds": {
                  "top_left": "POINT (4.9 52.4)",
                  "bottom_right": "POINT (5.0 52.3)"
                }
              }
            }
          }
        ]
      }
    }
  }
}

混合不同的值源

编辑

sources 参数接受值源数组。可以将不同的值源混合在一起以创建复合桶。例如

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d"
                            }
                        }
                    },
                    {
                        "product": {
                            "terms": {
                                "field": "product"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d'
                }
              }
            },
            {
              product: {
                terms: {
                  field: 'product'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
              },
            },
          },
          {
            product: {
              terms: {
                field: "product",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
          { "product": { "terms": { "field": "product" } } }
        ]
      }
    }
  }
}

这将从两个值源(一个 date_histogram 和一个 terms)创建的值创建复合桶。每个桶由两个值组成,每个值对应于聚合中定义的每个值源。允许任何类型的组合,并且在复合桶中保留数组中的顺序。

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "shop": {
                            "terms": {
                                "field": "shop"
                            }
                        }
                    },
                    {
                        "product": {
                            "terms": {
                                "field": "product"
                            }
                        }
                    },
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              shop: {
                terms: {
                  field: 'shop'
                }
              }
            },
            {
              product: {
                terms: {
                  field: 'product'
                }
              }
            },
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            shop: {
              terms: {
                field: "shop",
              },
            },
          },
          {
            product: {
              terms: {
                field: "product",
              },
            },
          },
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "shop": { "terms": { "field": "shop" } } },
          { "product": { "terms": { "field": "product" } } },
          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } }
        ]
      }
    }
  }
}

排序

编辑

默认情况下,复合桶按其自然顺序排序。值按其值的升序排序。当请求多个值源时,排序按每个值源完成,复合桶的第一个值与另一个复合桶的第一个值进行比较,如果它们相等,则使用复合桶中的下一个值进行决胜。这意味着复合桶 [foo, 100] 被认为小于 [foobar, 0],因为 foo 被认为小于 foobar。可以通过直接在值源定义中将 order 设置为 asc(默认值)或 desc(降序)来定义每个值源的排序方向。例如

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d",
                                "order": "desc"
                            }
                        }
                    },
                    {
                        "product": {
                            "terms": {
                                "field": "product",
                                "order": "asc"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d',
                  order: 'desc'
                }
              }
            },
            {
              product: {
                terms: {
                  field: 'product',
                  order: 'asc'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
                order: "desc",
              },
            },
          },
          {
            product: {
              terms: {
                field: "product",
                order: "asc",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } },
          { "product": { "terms": { "field": "product", "order": "asc" } } }
        ]
      }
    }
  }
}

... 将在比较 date_histogram 源中的值时以降序对复合桶进行排序,并在比较 terms 源中的值时以升序进行排序。

缺失桶

编辑

默认情况下,会忽略给定源没有值的文档。可以通过将 missing_bucket 设置为 true 来将其包括在响应中(默认为 false

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "product_name": {
                            "terms": {
                                "field": "product",
                                "missing_bucket": True,
                                "missing_order": "last"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              product_name: {
                terms: {
                  field: 'product',
                  missing_bucket: true,
                  missing_order: 'last'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            product_name: {
              terms: {
                field: "product",
                missing_bucket: true,
                missing_order: "last",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [{
          "product_name": {
            "terms": {
              "field": "product",
              "missing_bucket": true,
              "missing_order": "last"
            }
          }
        }]
      }
    }
  }
}

在上面的示例中,product_name 源为没有 product 值的文档发出显式的 null 桶。此桶放在最后。

可以使用可选的 missing_order 参数来控制 null 桶的位置。如果 missing_orderfirstlast,则 null 桶将分别放置在第一个或最后一个位置。如果省略 missing_orderdefault,则源的 order 将确定桶的位置。如果 orderasc(升序),则桶位于第一个位置。如果 orderdesc(降序),则桶位于最后一个位置。

大小

编辑

可以设置 size 参数以定义应返回多少个复合桶。每个复合桶都被视为一个单独的桶,因此将大小设置为 10 将返回从值源创建的前 10 个复合桶。响应包含一个数组中每个复合桶的值,该数组包含从每个值源提取的值。默认为 10

分页

编辑

如果复合桶的数量太多(或未知)而无法在单个响应中返回,则可以将检索拆分为多个请求。由于复合桶本质上是扁平的,因此请求的 size 正好是响应中将返回的复合桶的数量(假设至少有 size 个复合桶要返回)。如果要检索所有复合桶,最好使用较小的 size(例如 1001000),然后使用 after 参数检索下一个结果。例如

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "size": 2,
                "sources": [
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d"
                            }
                        }
                    },
                    {
                        "product": {
                            "terms": {
                                "field": "product"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          size: 2,
          sources: [
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d'
                }
              }
            },
            {
              product: {
                terms: {
                  field: 'product'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        size: 2,
        sources: [
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
              },
            },
          },
          {
            product: {
              terms: {
                field: "product",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "size": 2,
        "sources": [
          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
          { "product": { "terms": { "field": "product" } } }
        ]
      }
    }
  }
}

... 返回

{
  ...
  "aggregations": {
    "my_buckets": {
      "after_key": {
        "date": 1494288000000,
        "product": "mad max"
      },
      "buckets": [
        {
          "key": {
            "date": 1494201600000,
            "product": "rocky"
          },
          "doc_count": 1
        },
        {
          "key": {
            "date": 1494288000000,
            "product": "mad max"
          },
          "doc_count": 2
        }
      ]
    }
  }
}

要获取下一组桶,请重新发送相同的聚合,并将 after 参数设置为响应中返回的 after_key 值。例如,此请求使用上一个响应中提供的 after_key

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "size": 2,
                "sources": [
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d",
                                "order": "desc"
                            }
                        }
                    },
                    {
                        "product": {
                            "terms": {
                                "field": "product",
                                "order": "asc"
                            }
                        }
                    }
                ],
                "after": {
                    "date": 1494288000000,
                    "product": "mad max"
                }
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          size: 2,
          sources: [
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d',
                  order: 'desc'
                }
              }
            },
            {
              product: {
                terms: {
                  field: 'product',
                  order: 'asc'
                }
              }
            }
          ],
          after: {
            date: 1_494_288_000_000,
            product: 'mad max'
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        size: 2,
        sources: [
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
                order: "desc",
              },
            },
          },
          {
            product: {
              terms: {
                field: "product",
                order: "asc",
              },
            },
          },
        ],
        after: {
          date: 1494288000000,
          product: "mad max",
        },
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "size": 2,
        "sources": [
          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } },
          { "product": { "terms": { "field": "product", "order": "asc" } } }
        ],
        "after": { "date": 1494288000000, "product": "mad max" } 
      }
    }
  }
}

应将聚合限制为在提供的之后排序的桶。

after_key 通常是响应中返回的最后一个桶的键,但不能保证如此。请始终使用返回的 after_key,而不是从桶中推导出来。

提前终止

编辑

为了获得最佳性能,应在索引上设置索引排序,使其部分或完全匹配复合聚合中的源排序。例如,以下索引排序

resp = client.indices.create(
    index="my-index-000001",
    settings={
        "index": {
            "sort.field": [
                "username",
                "timestamp"
            ],
            "sort.order": [
                "asc",
                "desc"
            ]
        }
    },
    mappings={
        "properties": {
            "username": {
                "type": "keyword",
                "doc_values": True
            },
            "timestamp": {
                "type": "date"
            }
        }
    },
)
print(resp)
response = client.indices.create(
  index: 'my-index-000001',
  body: {
    settings: {
      index: {
        'sort.field' => [
          'username',
          'timestamp'
        ],
        'sort.order' => [
          'asc',
          'desc'
        ]
      }
    },
    mappings: {
      properties: {
        username: {
          type: 'keyword',
          doc_values: true
        },
        timestamp: {
          type: 'date'
        }
      }
    }
  }
)
puts response
const response = await client.indices.create({
  index: "my-index-000001",
  settings: {
    index: {
      "sort.field": ["username", "timestamp"],
      "sort.order": ["asc", "desc"],
    },
  },
  mappings: {
    properties: {
      username: {
        type: "keyword",
        doc_values: true,
      },
      timestamp: {
        type: "date",
      },
    },
  },
});
console.log(response);
PUT my-index-000001
{
  "settings": {
    "index": {
      "sort.field": [ "username", "timestamp" ],   
      "sort.order": [ "asc", "desc" ]              
    }
  },
  "mappings": {
    "properties": {
      "username": {
        "type": "keyword",
        "doc_values": true
      },
      "timestamp": {
        "type": "date"
      }
    }
  }
}

此索引首先按 username 排序,然后按 timestamp 排序。

…​ username 字段按升序排列,而 timestamp 字段按降序排列。

  1. 可用于优化这些复合聚合
resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "user_name": {
                            "terms": {
                                "field": "user_name"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              user_name: {
                terms: {
                  field: 'user_name'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            user_name: {
              terms: {
                field: "user_name",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "user_name": { "terms": { "field": "user_name" } } }     
        ]
      }
    }
  }
}

user_name 是索引排序的前缀,并且顺序匹配 (asc)。

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "user_name": {
                            "terms": {
                                "field": "user_name"
                            }
                        }
                    },
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d",
                                "order": "desc"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              user_name: {
                terms: {
                  field: 'user_name'
                }
              }
            },
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d',
                  order: 'desc'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            user_name: {
              terms: {
                field: "user_name",
              },
            },
          },
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
                order: "desc",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "user_name": { "terms": { "field": "user_name" } } }, 
          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } } 
        ]
      }
    }
  }
}

user_name 是索引排序的前缀,并且顺序匹配 (asc)。

timestamp 也匹配前缀,并且顺序匹配 (desc)。

为了优化提前终止,建议将请求中的 track_total_hits 设置为 false。匹配请求的总命中数可以在第一个请求中检索,并且在每个页面上计算此数字将是昂贵的

resp = client.search(
    size=0,
    track_total_hits=False,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "user_name": {
                            "terms": {
                                "field": "user_name"
                            }
                        }
                    },
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d",
                                "order": "desc"
                            }
                        }
                    }
                ]
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    track_total_hits: false,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              user_name: {
                terms: {
                  field: 'user_name'
                }
              }
            },
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d',
                  order: 'desc'
                }
              }
            }
          ]
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  track_total_hits: false,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            user_name: {
              terms: {
                field: "user_name",
              },
            },
          },
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
                order: "desc",
              },
            },
          },
        ],
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "track_total_hits": false,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "user_name": { "terms": { "field": "user_name" } } },
          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } }
        ]
      }
    }
  }
}

请注意,源的顺序很重要,在下面的示例中,将 user_nametimestamp 切换将停用排序优化,因为此配置与索引排序规范不匹配。如果源的顺序对您的用例无关紧要,您可以遵循这些简单的指南

  • 将基数最高的字段放在最前面。
  • 确保字段的顺序与索引排序的顺序匹配。
  • 将多值字段放在最后,因为它们不能用于提前终止。

索引排序会减慢索引速度,因此测试特定用例和数据集的索引排序以确保其满足您的要求非常重要。如果不是这样,请注意,如果查询匹配所有文档(match_all 查询),则 composite 聚合也会尝试在未排序的索引上提前终止。

子聚合

编辑

与任何 multi-bucket 聚合一样,composite 聚合可以包含子聚合。这些子聚合可用于计算由父聚合创建的每个复合桶上的其他桶或统计信息。例如,以下示例计算每个复合桶的字段平均值

resp = client.search(
    size=0,
    aggs={
        "my_buckets": {
            "composite": {
                "sources": [
                    {
                        "date": {
                            "date_histogram": {
                                "field": "timestamp",
                                "calendar_interval": "1d",
                                "order": "desc"
                            }
                        }
                    },
                    {
                        "product": {
                            "terms": {
                                "field": "product"
                            }
                        }
                    }
                ]
            },
            "aggregations": {
                "the_avg": {
                    "avg": {
                        "field": "price"
                    }
                }
            }
        }
    },
)
print(resp)
response = client.search(
  body: {
    size: 0,
    aggregations: {
      my_buckets: {
        composite: {
          sources: [
            {
              date: {
                date_histogram: {
                  field: 'timestamp',
                  calendar_interval: '1d',
                  order: 'desc'
                }
              }
            },
            {
              product: {
                terms: {
                  field: 'product'
                }
              }
            }
          ]
        },
        aggregations: {
          the_avg: {
            avg: {
              field: 'price'
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  size: 0,
  aggs: {
    my_buckets: {
      composite: {
        sources: [
          {
            date: {
              date_histogram: {
                field: "timestamp",
                calendar_interval: "1d",
                order: "desc",
              },
            },
          },
          {
            product: {
              terms: {
                field: "product",
              },
            },
          },
        ],
      },
      aggregations: {
        the_avg: {
          avg: {
            field: "price",
          },
        },
      },
    },
  },
});
console.log(response);
GET /_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } },
          { "product": { "terms": { "field": "product" } } }
        ]
      },
      "aggregations": {
        "the_avg": {
          "avg": { "field": "price" }
        }
      }
    }
  }
}

... 返回

{
  ...
  "aggregations": {
    "my_buckets": {
      "after_key": {
        "date": 1494201600000,
        "product": "rocky"
      },
      "buckets": [
        {
          "key": {
            "date": 1494460800000,
            "product": "apocalypse now"
          },
          "doc_count": 1,
          "the_avg": {
            "value": 10.0
          }
        },
        {
          "key": {
            "date": 1494374400000,
            "product": "mad max"
          },
          "doc_count": 1,
          "the_avg": {
            "value": 27.0
          }
        },
        {
          "key": {
            "date": 1494288000000,
            "product": "mad max"
          },
          "doc_count": 2,
          "the_avg": {
            "value": 22.5
          }
        },
        {
          "key": {
            "date": 1494201600000,
            "product": "rocky"
          },
          "doc_count": 1,
          "the_avg": {
            "value": 10.0
          }
        }
      ]
    }
  }
}

管道聚合

编辑

复合聚合目前与管道聚合不兼容,并且在大多数情况下也没有意义。例如,由于复合聚合的分页性质,单个逻辑分区(例如一天)可能会分布在多个页面上。由于管道聚合纯粹是对桶的最终列表进行后处理,因此在复合页面上运行诸如导数之类的操作可能会导致不准确的结果,因为它仅考虑该页面上的“部分”结果。

将来可能会支持自包含于单个桶的管道聚合(例如 bucket_selector)。