Rollup 入门
编辑Rollup 入门
编辑从 8.15.0 版本开始,在没有 Rollup 用法的集群中调用 put job API 将失败,并显示有关 Rollup 已弃用和计划移除的消息。集群必须包含 Rollup 作业或 Rollup 索引才能允许执行 put job API。
要使用 Rollup 功能,您需要创建一个或多个“Rollup 作业”。这些作业在后台持续运行,并汇总您指定的索引,并将汇总后的文档放入辅助索引(您也可以选择)。
假设您有一系列每日索引,其中包含传感器数据(sensor-2017-01-01
、sensor-2017-01-02
等)。一个示例文档可能如下所示:
{ "timestamp": 1516729294000, "temperature": 200, "voltage": 5.2, "node": "a" }
创建 Rollup 作业
编辑我们希望将这些文档汇总成每小时的摘要,这将使我们能够生成任何时间间隔(一小时或更长)的报表和仪表板。Rollup 作业可能如下所示:
resp = client.rollup.put_job( id="sensor", index_pattern="sensor-*", rollup_index="sensor_rollup", cron="*/30 * * * * ?", page_size=1000, groups={ "date_histogram": { "field": "timestamp", "fixed_interval": "60m" }, "terms": { "fields": [ "node" ] } }, metrics=[ { "field": "temperature", "metrics": [ "min", "max", "sum" ] }, { "field": "voltage", "metrics": [ "avg" ] } ], ) print(resp)
const response = await client.rollup.putJob({ id: "sensor", index_pattern: "sensor-*", rollup_index: "sensor_rollup", cron: "*/30 * * * * ?", page_size: 1000, groups: { date_histogram: { field: "timestamp", fixed_interval: "60m", }, terms: { fields: ["node"], }, }, metrics: [ { field: "temperature", metrics: ["min", "max", "sum"], }, { field: "voltage", metrics: ["avg"], }, ], }); console.log(response);
PUT _rollup/job/sensor { "index_pattern": "sensor-*", "rollup_index": "sensor_rollup", "cron": "*/30 * * * * ?", "page_size": 1000, "groups": { "date_histogram": { "field": "timestamp", "fixed_interval": "60m" }, "terms": { "fields": [ "node" ] } }, "metrics": [ { "field": "temperature", "metrics": [ "min", "max", "sum" ] }, { "field": "voltage", "metrics": [ "avg" ] } ] }
我们为作业指定 ID 为“sensor”(在 url 中:PUT _rollup/job/sensor
),并告诉它汇总索引模式"sensor-*"
。此作业将查找并汇总与该模式匹配的任何索引。然后,Rollup 摘要将存储在"sensor_rollup"
索引中。
cron
参数控制作业何时以及多久激活一次。当 Rollup 作业的 cron 计划触发时,它将从上次激活后停止的地方开始汇总。因此,如果您将 cron 配置为每 30 秒运行一次,则该作业将处理索引到sensor-*
索引中的最近 30 秒的数据。
如果 cron 配置为每天午夜运行一次,则该作业将处理过去 24 小时的数据。选择主要取决于您希望 Rollup 的“实时性”,以及您希望连续处理还是将其移至非高峰时段。
接下来,我们定义一组groups
。从本质上讲,我们正在定义稍后查询数据时希望在其上进行透视的维度。此作业中的分组允许我们对timestamp
字段使用date_histogram
聚合,以每小时为间隔进行汇总。它还允许我们对node
字段运行术语聚合。
定义好要为数据生成哪些组后,接下来配置要收集哪些指标。默认情况下,每个组只收集doc_counts
。为了使 Rollup 有用,您通常会添加平均值、最小值、最大值等指标。在此示例中,指标非常简单:我们希望保存temperature
字段的最小值/最大值/总和,以及voltage
字段的平均值。
有关作业语法的更多详细信息,请参阅创建 Rollup 作业。
执行上述命令并创建作业后,您将收到以下响应:
{ "acknowledged": true }
启动作业
编辑创建作业后,它将处于非活动状态。作业需要启动才能开始处理数据(这允许您稍后停止它们作为暂时暂停的一种方式,而无需删除配置)。
要启动作业,请执行以下命令:
resp = client.rollup.start_job( id="sensor", ) print(resp)
response = client.rollup.start_job( id: 'sensor' ) puts response
const response = await client.rollup.startJob({ id: "sensor", }); console.log(response);
POST _rollup/job/sensor/_start
搜索汇总结果
编辑作业运行并处理了一些数据后,我们可以使用Rollup 搜索端点进行一些搜索。Rollup 功能的设计使您可以使用您习惯的相同查询 DSL 语法……它只是恰好运行在汇总的数据上。
例如,请看此查询:
resp = client.rollup.rollup_search( index="sensor_rollup", size=0, aggregations={ "max_temperature": { "max": { "field": "temperature" } } }, ) print(resp)
response = client.rollup.rollup_search( index: 'sensor_rollup', body: { size: 0, aggregations: { max_temperature: { max: { field: 'temperature' } } } } ) puts response
const response = await client.rollup.rollupSearch({ index: "sensor_rollup", size: 0, aggregations: { max_temperature: { max: { field: "temperature", }, }, }, }); console.log(response);
GET /sensor_rollup/_rollup_search { "size": 0, "aggregations": { "max_temperature": { "max": { "field": "temperature" } } } }
这是一个简单的聚合,它计算temperature
字段的最大值。但您会注意到它被发送到sensor_rollup
索引而不是原始sensor-*
索引。您还会注意到它正在使用_rollup_search
端点。否则,语法与您预期的一样。
如果您执行该查询,您将收到一个看起来像正常聚合响应的结果:
{ "took" : 102, "timed_out" : false, "terminated_early" : false, "_shards" : ... , "hits" : { "total" : { "value": 0, "relation": "eq" }, "max_score" : 0.0, "hits" : [ ] }, "aggregations" : { "max_temperature" : { "value" : 202.0 } } }
唯一值得注意的区别是 Rollup 搜索结果没有hits
,因为我们实际上不再搜索原始的实时数据。否则,语法是相同的。
这里有一些有趣的收获。首先,即使数据以每小时为间隔进行汇总并按节点名称进行分区,我们运行的查询只是计算所有文档的最高温度。groups
在作业中配置的并不是查询的必填元素,它们只是您可以根据其进行分区的额外维度。其次,请求和响应语法与普通的 DSL 几乎相同,易于集成到仪表板和应用程序中。
最后,我们可以使用我们定义的那些分组字段来构建更复杂的查询:
resp = client.rollup.rollup_search( index="sensor_rollup", size=0, aggregations={ "timeline": { "date_histogram": { "field": "timestamp", "fixed_interval": "7d" }, "aggs": { "nodes": { "terms": { "field": "node" }, "aggs": { "max_temperature": { "max": { "field": "temperature" } }, "avg_voltage": { "avg": { "field": "voltage" } } } } } } }, ) print(resp)
response = client.rollup.rollup_search( index: 'sensor_rollup', body: { size: 0, aggregations: { timeline: { date_histogram: { field: 'timestamp', fixed_interval: '7d' }, aggregations: { nodes: { terms: { field: 'node' }, aggregations: { max_temperature: { max: { field: 'temperature' } }, avg_voltage: { avg: { field: 'voltage' } } } } } } } } ) puts response
const response = await client.rollup.rollupSearch({ index: "sensor_rollup", size: 0, aggregations: { timeline: { date_histogram: { field: "timestamp", fixed_interval: "7d", }, aggs: { nodes: { terms: { field: "node", }, aggs: { max_temperature: { max: { field: "temperature", }, }, avg_voltage: { avg: { field: "voltage", }, }, }, }, }, }, }, }); console.log(response);
GET /sensor_rollup/_rollup_search { "size": 0, "aggregations": { "timeline": { "date_histogram": { "field": "timestamp", "fixed_interval": "7d" }, "aggs": { "nodes": { "terms": { "field": "node" }, "aggs": { "max_temperature": { "max": { "field": "temperature" } }, "avg_voltage": { "avg": { "field": "voltage" } } } } } } } }
这将返回相应的响应:
{ "took" : 93, "timed_out" : false, "terminated_early" : false, "_shards" : ... , "hits" : { "total" : { "value": 0, "relation": "eq" }, "max_score" : 0.0, "hits" : [ ] }, "aggregations" : { "timeline" : { "buckets" : [ { "key_as_string" : "2018-01-18T00:00:00.000Z", "key" : 1516233600000, "doc_count" : 6, "nodes" : { "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 0, "buckets" : [ { "key" : "a", "doc_count" : 2, "max_temperature" : { "value" : 202.0 }, "avg_voltage" : { "value" : 5.1499998569488525 } }, { "key" : "b", "doc_count" : 2, "max_temperature" : { "value" : 201.0 }, "avg_voltage" : { "value" : 5.700000047683716 } }, { "key" : "c", "doc_count" : 2, "max_temperature" : { "value" : 202.0 }, "avg_voltage" : { "value" : 4.099999904632568 } } ] } } ] } } }
除了更复杂(日期直方图和术语聚合,加上额外的平均值指标)之外,您还会注意到日期直方图使用7d
间隔而不是60m
。
结论
编辑此快速入门应已提供对 Rollup 提供的核心功能的简明概述。设置 Rollup 时还有更多提示和注意事项,您可以在本节的其余部分找到这些内容。您也可以浏览REST API以了解可用的功能。