使用 Elasticsearch 处理时序数据

编辑

使用 Elasticsearch 处理时序数据

编辑

Elasticsearch 提供了多种功能来帮助您存储、管理和搜索时序数据,例如日志和指标。将数据存储到 Elasticsearch 后,您可以使用 Kibana 和其他 Elastic Stack 功能来分析和可视化您的数据。

设置数据层

编辑

Elasticsearch 的 ILM 功能使用 数据层,随着数据老化,自动将旧数据移动到硬件成本较低的节点。这有助于提高性能并降低存储成本。

热层和内容层是必需的。温层、冷层和冻结层是可选的。

在热层和温层中使用高性能节点,以便更快地索引和搜索您的最新数据。在冷层和冻结层中使用速度较慢、成本较低的节点,以降低成本。

内容层通常不用于时序数据。但是,它对于创建系统索引和其他不属于数据流的索引是必需的。

设置数据层的步骤因您的部署类型而异

  1. 登录到 Elasticsearch Service 控制台
  2. 从 Elasticsearch Service 主页或部署页面添加或选择您的部署。
  3. 从您的部署菜单中,选择 编辑部署
  4. 要启用数据层,请单击 添加容量

启用自动缩放

自动缩放 会自动调整您的部署容量以满足您的存储需求。要启用自动缩放,请在 编辑部署 页面上选择 自动缩放此部署。自动缩放仅适用于 Elasticsearch Service。

注册快照存储库

编辑

冷层和冻结层可以使用 可搜索快照 来降低本地存储成本。

要使用可搜索快照,您必须注册一个受支持的快照存储库。注册此存储库的步骤因您的部署类型和存储提供商而异

当您创建集群时,Elasticsearch Service 会自动注册一个默认的 found-snapshots 存储库。此存储库支持可搜索快照。

found-snapshots 存储库是特定于您的集群的。要使用另一个集群的默认存储库,请参阅 Cloud 快照和还原 文档。

您还可以将以下任何自定义存储库类型与可搜索快照一起使用

创建或编辑索引生命周期策略

编辑

数据流 将您的数据存储在多个后备索引中。ILM 使用 索引生命周期策略 来自动将这些索引移动到您的数据层中。

如果您使用 Fleet 或 Elastic Agent,请编辑 Elasticsearch 的内置生命周期策略之一。如果您使用自定义应用程序,请创建您自己的策略。无论哪种情况,请确保您的策略

  • 包含为每个您已配置的数据层设置的阶段。
  • 计算从滚动更新过渡的阈值或 min_age
  • 如果需要,在冷阶段和冻结阶段使用可搜索快照。
  • 如果需要,包含删除阶段。

Fleet 和 Elastic Agent 使用以下内置生命周期策略

  • 日志
  • 指标
  • 合成数据

您可以根据您的性能、弹性和保留要求自定义这些策略。

要在 Kibana 中编辑策略,请打开主菜单并转到 堆栈管理 > 索引生命周期策略。单击您要编辑的策略。

您还可以使用 更新生命周期策略 API

resp = client.ilm.put_lifecycle(
    name="logs",
    policy={
        "phases": {
            "hot": {
                "actions": {
                    "rollover": {
                        "max_primary_shard_size": "50gb"
                    }
                }
            },
            "warm": {
                "min_age": "30d",
                "actions": {
                    "shrink": {
                        "number_of_shards": 1
                    },
                    "forcemerge": {
                        "max_num_segments": 1
                    }
                }
            },
            "cold": {
                "min_age": "60d",
                "actions": {
                    "searchable_snapshot": {
                        "snapshot_repository": "found-snapshots"
                    }
                }
            },
            "frozen": {
                "min_age": "90d",
                "actions": {
                    "searchable_snapshot": {
                        "snapshot_repository": "found-snapshots"
                    }
                }
            },
            "delete": {
                "min_age": "735d",
                "actions": {
                    "delete": {}
                }
            }
        }
    },
)
print(resp)
const response = await client.ilm.putLifecycle({
  name: "logs",
  policy: {
    phases: {
      hot: {
        actions: {
          rollover: {
            max_primary_shard_size: "50gb",
          },
        },
      },
      warm: {
        min_age: "30d",
        actions: {
          shrink: {
            number_of_shards: 1,
          },
          forcemerge: {
            max_num_segments: 1,
          },
        },
      },
      cold: {
        min_age: "60d",
        actions: {
          searchable_snapshot: {
            snapshot_repository: "found-snapshots",
          },
        },
      },
      frozen: {
        min_age: "90d",
        actions: {
          searchable_snapshot: {
            snapshot_repository: "found-snapshots",
          },
        },
      },
      delete: {
        min_age: "735d",
        actions: {
          delete: {},
        },
      },
    },
  },
});
console.log(response);
PUT _ilm/policy/logs
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_primary_shard_size": "50gb"
          }
        }
      },
      "warm": {
        "min_age": "30d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          }
        }
      },
      "cold": {
        "min_age": "60d",
        "actions": {
          "searchable_snapshot": {
            "snapshot_repository": "found-snapshots"
          }
        }
      },
      "frozen": {
        "min_age": "90d",
        "actions": {
          "searchable_snapshot": {
            "snapshot_repository": "found-snapshots"
          }
        }
      },
      "delete": {
        "min_age": "735d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

创建组件模板

编辑

如果您使用 Fleet 或 Elastic Agent,请跳到 搜索和可视化您的数据。Fleet 和 Elastic Agent 使用内置模板为您创建数据流。

如果您使用自定义应用程序,则需要设置您自己的数据流。数据流需要匹配的索引模板。在大多数情况下,您使用一个或多个组件模板来组成此索引模板。您通常为映射和索引设置使用单独的组件模板。这使您可以在多个索引模板中重用组件模板。

创建组件模板时,请包括

  • @timestamp 字段的 datedate_nanos 映射。如果您未指定映射,则 Elasticsearch 将 @timestamp 映射为具有默认选项的 date 字段。
  • index.lifecycle.name 索引设置中的生命周期策略。

在映射您的字段时使用 Elastic Common Schema (ECS)。默认情况下,ECS 字段与多个 Elastic Stack 功能集成。

如果您不确定如何映射您的字段,请使用 运行时字段 以在搜索时从 非结构化内容 中提取字段。例如,您可以将日志消息索引到 wildcard 字段,稍后在搜索期间从此字段中提取 IP 地址和其他数据。

要在 Kibana 中创建组件模板,请打开主菜单并转到 堆栈管理 > 索引管理。在 索引模板 视图中,单击 创建组件模板

您还可以使用 创建组件模板 API

resp = client.cluster.put_component_template(
    name="my-mappings",
    template={
        "mappings": {
            "properties": {
                "@timestamp": {
                    "type": "date",
                    "format": "date_optional_time||epoch_millis"
                },
                "message": {
                    "type": "wildcard"
                }
            }
        }
    },
    meta={
        "description": "Mappings for @timestamp and message fields",
        "my-custom-meta-field": "More arbitrary metadata"
    },
)
print(resp)

resp1 = client.cluster.put_component_template(
    name="my-settings",
    template={
        "settings": {
            "index.lifecycle.name": "my-lifecycle-policy"
        }
    },
    meta={
        "description": "Settings for ILM",
        "my-custom-meta-field": "More arbitrary metadata"
    },
)
print(resp1)
response = client.cluster.put_component_template(
  name: 'my-mappings',
  body: {
    template: {
      mappings: {
        properties: {
          "@timestamp": {
            type: 'date',
            format: 'date_optional_time||epoch_millis'
          },
          message: {
            type: 'wildcard'
          }
        }
      }
    },
    _meta: {
      description: 'Mappings for @timestamp and message fields',
      "my-custom-meta-field": 'More arbitrary metadata'
    }
  }
)
puts response

response = client.cluster.put_component_template(
  name: 'my-settings',
  body: {
    template: {
      settings: {
        'index.lifecycle.name' => 'my-lifecycle-policy'
      }
    },
    _meta: {
      description: 'Settings for ILM',
      "my-custom-meta-field": 'More arbitrary metadata'
    }
  }
)
puts response
const response = await client.cluster.putComponentTemplate({
  name: "my-mappings",
  template: {
    mappings: {
      properties: {
        "@timestamp": {
          type: "date",
          format: "date_optional_time||epoch_millis",
        },
        message: {
          type: "wildcard",
        },
      },
    },
  },
  _meta: {
    description: "Mappings for @timestamp and message fields",
    "my-custom-meta-field": "More arbitrary metadata",
  },
});
console.log(response);

const response1 = await client.cluster.putComponentTemplate({
  name: "my-settings",
  template: {
    settings: {
      "index.lifecycle.name": "my-lifecycle-policy",
    },
  },
  _meta: {
    description: "Settings for ILM",
    "my-custom-meta-field": "More arbitrary metadata",
  },
});
console.log(response1);
# Creates a component template for mappings
PUT _component_template/my-mappings
{
  "template": {
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "date_optional_time||epoch_millis"
        },
        "message": {
          "type": "wildcard"
        }
      }
    }
  },
  "_meta": {
    "description": "Mappings for @timestamp and message fields",
    "my-custom-meta-field": "More arbitrary metadata"
  }
}

# Creates a component template for index settings
PUT _component_template/my-settings
{
  "template": {
    "settings": {
      "index.lifecycle.name": "my-lifecycle-policy"
    }
  },
  "_meta": {
    "description": "Settings for ILM",
    "my-custom-meta-field": "More arbitrary metadata"
  }
}

创建索引模板

编辑

使用您的组件模板来创建索引模板。指定

  • 与数据流名称匹配的一个或多个索引模式。我们建议使用我们的 数据流命名方案
  • 该模板已启用数据流。
  • 包含您的映射和索引设置的任何组件模板。
  • 优先级高于 200,以避免与内置模板冲突。请参阅 避免索引模式冲突

要在 Kibana 中创建索引模板,请打开主菜单并转到 堆栈管理 > 索引管理。在 索引模板 视图中,单击 创建模板

您还可以使用 创建索引模板 API。包含 data_stream 对象以启用数据流。

resp = client.indices.put_index_template(
    name="my-index-template",
    index_patterns=[
        "my-data-stream*"
    ],
    data_stream={},
    composed_of=[
        "my-mappings",
        "my-settings"
    ],
    priority=500,
    meta={
        "description": "Template for my time series data",
        "my-custom-meta-field": "More arbitrary metadata"
    },
)
print(resp)
response = client.indices.put_index_template(
  name: 'my-index-template',
  body: {
    index_patterns: [
      'my-data-stream*'
    ],
    data_stream: {},
    composed_of: [
      'my-mappings',
      'my-settings'
    ],
    priority: 500,
    _meta: {
      description: 'Template for my time series data',
      "my-custom-meta-field": 'More arbitrary metadata'
    }
  }
)
puts response
const response = await client.indices.putIndexTemplate({
  name: "my-index-template",
  index_patterns: ["my-data-stream*"],
  data_stream: {},
  composed_of: ["my-mappings", "my-settings"],
  priority: 500,
  _meta: {
    description: "Template for my time series data",
    "my-custom-meta-field": "More arbitrary metadata",
  },
});
console.log(response);
PUT _index_template/my-index-template
{
  "index_patterns": ["my-data-stream*"],
  "data_stream": { },
  "composed_of": [ "my-mappings", "my-settings" ],
  "priority": 500,
  "_meta": {
    "description": "Template for my time series data",
    "my-custom-meta-field": "More arbitrary metadata"
  }
}

将数据添加到数据流

编辑

索引请求 将文档添加到数据流。这些请求必须使用 createop_type。文档必须包含 @timestamp 字段。

要自动创建您的数据流,请提交一个以流名称为目标的索引请求。此名称必须与您的索引模板的索引模式之一匹配。

resp = client.bulk(
    index="my-data-stream",
    operations=[
        {
            "create": {}
        },
        {
            "@timestamp": "2099-05-06T16:21:15.000Z",
            "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
        },
        {
            "create": {}
        },
        {
            "@timestamp": "2099-05-06T16:25:42.000Z",
            "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638"
        }
    ],
)
print(resp)

resp1 = client.index(
    index="my-data-stream",
    document={
        "@timestamp": "2099-05-06T16:21:15.000Z",
        "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
    },
)
print(resp1)
response = client.bulk(
  index: 'my-data-stream',
  body: [
    {
      create: {}
    },
    {
      "@timestamp": '2099-05-06T16:21:15.000Z',
      message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736'
    },
    {
      create: {}
    },
    {
      "@timestamp": '2099-05-06T16:25:42.000Z',
      message: '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638'
    }
  ]
)
puts response

response = client.index(
  index: 'my-data-stream',
  body: {
    "@timestamp": '2099-05-06T16:21:15.000Z',
    message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736'
  }
)
puts response
const response = await client.bulk({
  index: "my-data-stream",
  operations: [
    {
      create: {},
    },
    {
      "@timestamp": "2099-05-06T16:21:15.000Z",
      message:
        '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736',
    },
    {
      create: {},
    },
    {
      "@timestamp": "2099-05-06T16:25:42.000Z",
      message:
        '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638',
    },
  ],
});
console.log(response);

const response1 = await client.index({
  index: "my-data-stream",
  document: {
    "@timestamp": "2099-05-06T16:21:15.000Z",
    message:
      '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736',
  },
});
console.log(response1);
PUT my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" }

POST my-data-stream/_doc
{
  "@timestamp": "2099-05-06T16:21:15.000Z",
  "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
}

搜索和可视化您的数据

编辑

要在 Kibana 中浏览和搜索您的数据,请打开主菜单并选择 Discover。请参阅 Kibana 的 Discover 文档

使用 Kibana 的 仪表板 功能,以图表、表格、地图等形式可视化您的数据。请参阅 Kibana 的 仪表板文档

您还可以使用 搜索 API 来搜索和聚合您的数据。使用 运行时字段grok 模式 在搜索时从日志消息和其他非结构化内容中动态提取数据。

resp = client.search(
    index="my-data-stream",
    runtime_mappings={
        "source.ip": {
            "type": "ip",
            "script": "\n        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n        if (sourceip != null) emit(sourceip);\n      "
        }
    },
    query={
        "bool": {
            "filter": [
                {
                    "range": {
                        "@timestamp": {
                            "gte": "now-1d/d",
                            "lt": "now/d"
                        }
                    }
                },
                {
                    "range": {
                        "source.ip": {
                            "gte": "192.0.2.0",
                            "lte": "192.0.2.255"
                        }
                    }
                }
            ]
        }
    },
    fields=[
        "*"
    ],
    source=False,
    sort=[
        {
            "@timestamp": "desc"
        },
        {
            "source.ip": "desc"
        }
    ],
)
print(resp)
const response = await client.search({
  index: "my-data-stream",
  runtime_mappings: {
    "source.ip": {
      type: "ip",
      script:
        "\n        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n        if (sourceip != null) emit(sourceip);\n      ",
    },
  },
  query: {
    bool: {
      filter: [
        {
          range: {
            "@timestamp": {
              gte: "now-1d/d",
              lt: "now/d",
            },
          },
        },
        {
          range: {
            "source.ip": {
              gte: "192.0.2.0",
              lte: "192.0.2.255",
            },
          },
        },
      ],
    },
  },
  fields: ["*"],
  _source: false,
  sort: [
    {
      "@timestamp": "desc",
    },
    {
      "source.ip": "desc",
    },
  ],
});
console.log(response);
GET my-data-stream/_search
{
  "runtime_mappings": {
    "source.ip": {
      "type": "ip",
      "script": """
        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "message" ].value)?.sourceip;
        if (sourceip != null) emit(sourceip);
      """
    }
  },
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-1d/d",
              "lt": "now/d"
            }
          }
        },
        {
          "range": {
            "source.ip": {
              "gte": "192.0.2.0",
              "lte": "192.0.2.255"
            }
          }
        }
      ]
    }
  },
  "fields": [
    "*"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    },
    {
      "source.ip": "desc"
    }
  ]
}

Elasticsearch 搜索默认是同步的。跨冻结数据、长时间范围或大型数据集的搜索可能需要更长时间。使用异步搜索 API 在后台运行搜索。有关更多搜索选项,请参阅搜索 API

resp = client.async_search.submit(
    index="my-data-stream",
    runtime_mappings={
        "source.ip": {
            "type": "ip",
            "script": "\n        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n        if (sourceip != null) emit(sourceip);\n      "
        }
    },
    query={
        "bool": {
            "filter": [
                {
                    "range": {
                        "@timestamp": {
                            "gte": "now-2y/d",
                            "lt": "now/d"
                        }
                    }
                },
                {
                    "range": {
                        "source.ip": {
                            "gte": "192.0.2.0",
                            "lte": "192.0.2.255"
                        }
                    }
                }
            ]
        }
    },
    fields=[
        "*"
    ],
    source=False,
    sort=[
        {
            "@timestamp": "desc"
        },
        {
            "source.ip": "desc"
        }
    ],
)
print(resp)
const response = await client.asyncSearch.submit({
  index: "my-data-stream",
  runtime_mappings: {
    "source.ip": {
      type: "ip",
      script:
        "\n        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n        if (sourceip != null) emit(sourceip);\n      ",
    },
  },
  query: {
    bool: {
      filter: [
        {
          range: {
            "@timestamp": {
              gte: "now-2y/d",
              lt: "now/d",
            },
          },
        },
        {
          range: {
            "source.ip": {
              gte: "192.0.2.0",
              lte: "192.0.2.255",
            },
          },
        },
      ],
    },
  },
  fields: ["*"],
  _source: false,
  sort: [
    {
      "@timestamp": "desc",
    },
    {
      "source.ip": "desc",
    },
  ],
});
console.log(response);
POST my-data-stream/_async_search
{
  "runtime_mappings": {
    "source.ip": {
      "type": "ip",
      "script": """
        String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "message" ].value)?.sourceip;
        if (sourceip != null) emit(sourceip);
      """
    }
  },
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-2y/d",
              "lt": "now/d"
            }
          }
        },
        {
          "range": {
            "source.ip": {
              "gte": "192.0.2.0",
              "lte": "192.0.2.255"
            }
          }
        }
      ]
    }
  },
  "fields": [
    "*"
  ],
  "_source": false,
  "sort": [
    {
      "@timestamp": "desc"
    },
    {
      "source.ip": "desc"
    }
  ]
}