Rollup 入门

编辑

在 8.11.0 版本中已弃用。

Rollup 功能将在未来版本中移除。请迁移降采样

从 8.15.0 版本开始,在没有 Rollup 用法的集群中调用 put job API 将失败,并显示有关 Rollup 已弃用和计划移除的消息。集群必须包含 Rollup 作业或 Rollup 索引才能允许执行 put job API。

要使用 Rollup 功能,您需要创建一个或多个“Rollup 作业”。这些作业在后台持续运行,并汇总您指定的索引,并将汇总后的文档放入辅助索引(您也可以选择)。

假设您有一系列每日索引,其中包含传感器数据(sensor-2017-01-01sensor-2017-01-02 等)。一个示例文档可能如下所示:

{
  "timestamp": 1516729294000,
  "temperature": 200,
  "voltage": 5.2,
  "node": "a"
}

创建 Rollup 作业

编辑

我们希望将这些文档汇总成每小时的摘要,这将使我们能够生成任何时间间隔(一小时或更长)的报表和仪表板。Rollup 作业可能如下所示:

resp = client.rollup.put_job(
    id="sensor",
    index_pattern="sensor-*",
    rollup_index="sensor_rollup",
    cron="*/30 * * * * ?",
    page_size=1000,
    groups={
        "date_histogram": {
            "field": "timestamp",
            "fixed_interval": "60m"
        },
        "terms": {
            "fields": [
                "node"
            ]
        }
    },
    metrics=[
        {
            "field": "temperature",
            "metrics": [
                "min",
                "max",
                "sum"
            ]
        },
        {
            "field": "voltage",
            "metrics": [
                "avg"
            ]
        }
    ],
)
print(resp)
const response = await client.rollup.putJob({
  id: "sensor",
  index_pattern: "sensor-*",
  rollup_index: "sensor_rollup",
  cron: "*/30 * * * * ?",
  page_size: 1000,
  groups: {
    date_histogram: {
      field: "timestamp",
      fixed_interval: "60m",
    },
    terms: {
      fields: ["node"],
    },
  },
  metrics: [
    {
      field: "temperature",
      metrics: ["min", "max", "sum"],
    },
    {
      field: "voltage",
      metrics: ["avg"],
    },
  ],
});
console.log(response);
PUT _rollup/job/sensor
{
  "index_pattern": "sensor-*",
  "rollup_index": "sensor_rollup",
  "cron": "*/30 * * * * ?",
  "page_size": 1000,
  "groups": {
    "date_histogram": {
      "field": "timestamp",
      "fixed_interval": "60m"
    },
    "terms": {
      "fields": [ "node" ]
    }
  },
  "metrics": [
    {
      "field": "temperature",
      "metrics": [ "min", "max", "sum" ]
    },
    {
      "field": "voltage",
      "metrics": [ "avg" ]
    }
  ]
}

我们为作业指定 ID 为“sensor”(在 url 中:PUT _rollup/job/sensor),并告诉它汇总索引模式"sensor-*"。此作业将查找并汇总与该模式匹配的任何索引。然后,Rollup 摘要将存储在"sensor_rollup"索引中。

cron参数控制作业何时以及多久激活一次。当 Rollup 作业的 cron 计划触发时,它将从上次激活后停止的地方开始汇总。因此,如果您将 cron 配置为每 30 秒运行一次,则该作业将处理索引到sensor-*索引中的最近 30 秒的数据。

如果 cron 配置为每天午夜运行一次,则该作业将处理过去 24 小时的数据。选择主要取决于您希望 Rollup 的“实时性”,以及您希望连续处理还是将其移至非高峰时段。

接下来,我们定义一组groups。从本质上讲,我们正在定义稍后查询数据时希望在其上进行透视的维度。此作业中的分组允许我们对timestamp字段使用date_histogram聚合,以每小时为间隔进行汇总。它还允许我们对node字段运行术语聚合。

定义好要为数据生成哪些组后,接下来配置要收集哪些指标。默认情况下,每个组只收集doc_counts。为了使 Rollup 有用,您通常会添加平均值、最小值、最大值等指标。在此示例中,指标非常简单:我们希望保存temperature字段的最小值/最大值/总和,以及voltage字段的平均值。

有关作业语法的更多详细信息,请参阅创建 Rollup 作业

执行上述命令并创建作业后,您将收到以下响应:

{
  "acknowledged": true
}

启动作业

编辑

创建作业后,它将处于非活动状态。作业需要启动才能开始处理数据(这允许您稍后停止它们作为暂时暂停的一种方式,而无需删除配置)。

要启动作业,请执行以下命令:

resp = client.rollup.start_job(
    id="sensor",
)
print(resp)
response = client.rollup.start_job(
  id: 'sensor'
)
puts response
const response = await client.rollup.startJob({
  id: "sensor",
});
console.log(response);
POST _rollup/job/sensor/_start

搜索汇总结果

编辑

作业运行并处理了一些数据后,我们可以使用Rollup 搜索端点进行一些搜索。Rollup 功能的设计使您可以使用您习惯的相同查询 DSL 语法……它只是恰好运行在汇总的数据上。

例如,请看此查询:

resp = client.rollup.rollup_search(
    index="sensor_rollup",
    size=0,
    aggregations={
        "max_temperature": {
            "max": {
                "field": "temperature"
            }
        }
    },
)
print(resp)
response = client.rollup.rollup_search(
  index: 'sensor_rollup',
  body: {
    size: 0,
    aggregations: {
      max_temperature: {
        max: {
          field: 'temperature'
        }
      }
    }
  }
)
puts response
const response = await client.rollup.rollupSearch({
  index: "sensor_rollup",
  size: 0,
  aggregations: {
    max_temperature: {
      max: {
        field: "temperature",
      },
    },
  },
});
console.log(response);
GET /sensor_rollup/_rollup_search
{
  "size": 0,
  "aggregations": {
    "max_temperature": {
      "max": {
        "field": "temperature"
      }
    }
  }
}

这是一个简单的聚合,它计算temperature字段的最大值。但您会注意到它被发送到sensor_rollup索引而不是原始sensor-*索引。您还会注意到它正在使用_rollup_search端点。否则,语法与您预期的一样。

如果您执行该查询,您将收到一个看起来像正常聚合响应的结果:

{
  "took" : 102,
  "timed_out" : false,
  "terminated_early" : false,
  "_shards" : ... ,
  "hits" : {
    "total" : {
        "value": 0,
        "relation": "eq"
    },
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "max_temperature" : {
      "value" : 202.0
    }
  }
}

唯一值得注意的区别是 Rollup 搜索结果没有hits,因为我们实际上不再搜索原始的实时数据。否则,语法是相同的。

这里有一些有趣的收获。首先,即使数据以每小时为间隔进行汇总并按节点名称进行分区,我们运行的查询只是计算所有文档的最高温度。groups在作业中配置的并不是查询的必填元素,它们只是您可以根据其进行分区的额外维度。其次,请求和响应语法与普通的 DSL 几乎相同,易于集成到仪表板和应用程序中。

最后,我们可以使用我们定义的那些分组字段来构建更复杂的查询:

resp = client.rollup.rollup_search(
    index="sensor_rollup",
    size=0,
    aggregations={
        "timeline": {
            "date_histogram": {
                "field": "timestamp",
                "fixed_interval": "7d"
            },
            "aggs": {
                "nodes": {
                    "terms": {
                        "field": "node"
                    },
                    "aggs": {
                        "max_temperature": {
                            "max": {
                                "field": "temperature"
                            }
                        },
                        "avg_voltage": {
                            "avg": {
                                "field": "voltage"
                            }
                        }
                    }
                }
            }
        }
    },
)
print(resp)
response = client.rollup.rollup_search(
  index: 'sensor_rollup',
  body: {
    size: 0,
    aggregations: {
      timeline: {
        date_histogram: {
          field: 'timestamp',
          fixed_interval: '7d'
        },
        aggregations: {
          nodes: {
            terms: {
              field: 'node'
            },
            aggregations: {
              max_temperature: {
                max: {
                  field: 'temperature'
                }
              },
              avg_voltage: {
                avg: {
                  field: 'voltage'
                }
              }
            }
          }
        }
      }
    }
  }
)
puts response
const response = await client.rollup.rollupSearch({
  index: "sensor_rollup",
  size: 0,
  aggregations: {
    timeline: {
      date_histogram: {
        field: "timestamp",
        fixed_interval: "7d",
      },
      aggs: {
        nodes: {
          terms: {
            field: "node",
          },
          aggs: {
            max_temperature: {
              max: {
                field: "temperature",
              },
            },
            avg_voltage: {
              avg: {
                field: "voltage",
              },
            },
          },
        },
      },
    },
  },
});
console.log(response);
GET /sensor_rollup/_rollup_search
{
  "size": 0,
  "aggregations": {
    "timeline": {
      "date_histogram": {
        "field": "timestamp",
        "fixed_interval": "7d"
      },
      "aggs": {
        "nodes": {
          "terms": {
            "field": "node"
          },
          "aggs": {
            "max_temperature": {
              "max": {
                "field": "temperature"
              }
            },
            "avg_voltage": {
              "avg": {
                "field": "voltage"
              }
            }
          }
        }
      }
    }
  }
}

这将返回相应的响应:

{
   "took" : 93,
   "timed_out" : false,
   "terminated_early" : false,
   "_shards" : ... ,
   "hits" : {
     "total" : {
        "value": 0,
        "relation": "eq"
     },
     "max_score" : 0.0,
     "hits" : [ ]
   },
   "aggregations" : {
     "timeline" : {
       "buckets" : [
         {
           "key_as_string" : "2018-01-18T00:00:00.000Z",
           "key" : 1516233600000,
           "doc_count" : 6,
           "nodes" : {
             "doc_count_error_upper_bound" : 0,
             "sum_other_doc_count" : 0,
             "buckets" : [
               {
                 "key" : "a",
                 "doc_count" : 2,
                 "max_temperature" : {
                   "value" : 202.0
                 },
                 "avg_voltage" : {
                   "value" : 5.1499998569488525
                 }
               },
               {
                 "key" : "b",
                 "doc_count" : 2,
                 "max_temperature" : {
                   "value" : 201.0
                 },
                 "avg_voltage" : {
                   "value" : 5.700000047683716
                 }
               },
               {
                 "key" : "c",
                 "doc_count" : 2,
                 "max_temperature" : {
                   "value" : 202.0
                 },
                 "avg_voltage" : {
                   "value" : 4.099999904632568
                 }
               }
             ]
           }
         }
       ]
     }
   }
}

除了更复杂(日期直方图和术语聚合,加上额外的平均值指标)之外,您还会注意到日期直方图使用7d间隔而不是60m

结论

编辑

此快速入门应已提供对 Rollup 提供的核心功能的简明概述。设置 Rollup 时还有更多提示和注意事项,您可以在本节的其余部分找到这些内容。您也可以浏览REST API以了解可用的功能。