使用 Elasticsearch 处理时序数据
编辑使用 Elasticsearch 处理时序数据
编辑Elasticsearch 提供了多种功能来帮助您存储、管理和搜索时序数据,例如日志和指标。将数据存储到 Elasticsearch 后,您可以使用 Kibana 和其他 Elastic Stack 功能来分析和可视化您的数据。
设置数据层
编辑Elasticsearch 的 ILM 功能使用 数据层,随着数据老化,自动将旧数据移动到硬件成本较低的节点。这有助于提高性能并降低存储成本。
热层和内容层是必需的。温层、冷层和冻结层是可选的。
在热层和温层中使用高性能节点,以便更快地索引和搜索您的最新数据。在冷层和冻结层中使用速度较慢、成本较低的节点,以降低成本。
内容层通常不用于时序数据。但是,它对于创建系统索引和其他不属于数据流的索引是必需的。
设置数据层的步骤因您的部署类型而异
- 登录到 Elasticsearch Service 控制台。
- 从 Elasticsearch Service 主页或部署页面添加或选择您的部署。
- 从您的部署菜单中,选择 编辑部署。
- 要启用数据层,请单击 添加容量。
启用自动缩放
自动缩放 会自动调整您的部署容量以满足您的存储需求。要启用自动缩放,请在 编辑部署 页面上选择 自动缩放此部署。自动缩放仅适用于 Elasticsearch Service。
要将节点分配到数据层,请将相应的 节点角色 添加到节点的 elasticsearch.yml
文件中。更改现有节点的角色需要 滚动重启。
# Content tier node.roles: [ data_content ] # Hot tier node.roles: [ data_hot ] # Warm tier node.roles: [ data_warm ] # Cold tier node.roles: [ data_cold ] # Frozen tier node.roles: [ data_frozen ]
我们建议您在冻结层中使用专用节点。如果需要,您可以将其他节点分配给多个层。
node.roles: [ data_content, data_hot, data_warm ]
为您的节点分配集群所需的任何其他角色。例如,一个小型集群可能具有具有多个角色的节点。
node.roles: [ master, ingest, ml, data_hot, transform ]
注册快照存储库
编辑冷层和冻结层可以使用 可搜索快照 来降低本地存储成本。
要使用可搜索快照,您必须注册一个受支持的快照存储库。注册此存储库的步骤因您的部署类型和存储提供商而异
当您创建集群时,Elasticsearch Service 会自动注册一个默认的 found-snapshots
存储库。此存储库支持可搜索快照。
found-snapshots
存储库是特定于您的集群的。要使用另一个集群的默认存储库,请参阅 Cloud 快照和还原 文档。
您还可以将以下任何自定义存储库类型与可搜索快照一起使用
创建或编辑索引生命周期策略
编辑数据流 将您的数据存储在多个后备索引中。ILM 使用 索引生命周期策略 来自动将这些索引移动到您的数据层中。
如果您使用 Fleet 或 Elastic Agent,请编辑 Elasticsearch 的内置生命周期策略之一。如果您使用自定义应用程序,请创建您自己的策略。无论哪种情况,请确保您的策略
- 包含为每个您已配置的数据层设置的阶段。
- 计算从滚动更新过渡的阈值或
min_age
。 - 如果需要,在冷阶段和冻结阶段使用可搜索快照。
- 如果需要,包含删除阶段。
Fleet 和 Elastic Agent 使用以下内置生命周期策略
-
日志
-
指标
-
合成数据
您可以根据您的性能、弹性和保留要求自定义这些策略。
要在 Kibana 中编辑策略,请打开主菜单并转到 堆栈管理 > 索引生命周期策略。单击您要编辑的策略。
您还可以使用 更新生命周期策略 API。
resp = client.ilm.put_lifecycle( name="logs", policy={ "phases": { "hot": { "actions": { "rollover": { "max_primary_shard_size": "50gb" } } }, "warm": { "min_age": "30d", "actions": { "shrink": { "number_of_shards": 1 }, "forcemerge": { "max_num_segments": 1 } } }, "cold": { "min_age": "60d", "actions": { "searchable_snapshot": { "snapshot_repository": "found-snapshots" } } }, "frozen": { "min_age": "90d", "actions": { "searchable_snapshot": { "snapshot_repository": "found-snapshots" } } }, "delete": { "min_age": "735d", "actions": { "delete": {} } } } }, ) print(resp)
const response = await client.ilm.putLifecycle({ name: "logs", policy: { phases: { hot: { actions: { rollover: { max_primary_shard_size: "50gb", }, }, }, warm: { min_age: "30d", actions: { shrink: { number_of_shards: 1, }, forcemerge: { max_num_segments: 1, }, }, }, cold: { min_age: "60d", actions: { searchable_snapshot: { snapshot_repository: "found-snapshots", }, }, }, frozen: { min_age: "90d", actions: { searchable_snapshot: { snapshot_repository: "found-snapshots", }, }, }, delete: { min_age: "735d", actions: { delete: {}, }, }, }, }, }); console.log(response);
PUT _ilm/policy/logs { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_primary_shard_size": "50gb" } } }, "warm": { "min_age": "30d", "actions": { "shrink": { "number_of_shards": 1 }, "forcemerge": { "max_num_segments": 1 } } }, "cold": { "min_age": "60d", "actions": { "searchable_snapshot": { "snapshot_repository": "found-snapshots" } } }, "frozen": { "min_age": "90d", "actions": { "searchable_snapshot": { "snapshot_repository": "found-snapshots" } } }, "delete": { "min_age": "735d", "actions": { "delete": {} } } } } }
要在 Kibana 中创建策略,请打开主菜单并转到 堆栈管理 > 索引生命周期策略。单击 创建策略。
您还可以使用 更新生命周期策略 API。
resp = client.ilm.put_lifecycle( name="my-lifecycle-policy", policy={ "phases": { "hot": { "actions": { "rollover": { "max_primary_shard_size": "50gb" } } }, "warm": { "min_age": "30d", "actions": { "shrink": { "number_of_shards": 1 }, "forcemerge": { "max_num_segments": 1 } } }, "cold": { "min_age": "60d", "actions": { "searchable_snapshot": { "snapshot_repository": "found-snapshots" } } }, "frozen": { "min_age": "90d", "actions": { "searchable_snapshot": { "snapshot_repository": "found-snapshots" } } }, "delete": { "min_age": "735d", "actions": { "delete": {} } } } }, ) print(resp)
const response = await client.ilm.putLifecycle({ name: "my-lifecycle-policy", policy: { phases: { hot: { actions: { rollover: { max_primary_shard_size: "50gb", }, }, }, warm: { min_age: "30d", actions: { shrink: { number_of_shards: 1, }, forcemerge: { max_num_segments: 1, }, }, }, cold: { min_age: "60d", actions: { searchable_snapshot: { snapshot_repository: "found-snapshots", }, }, }, frozen: { min_age: "90d", actions: { searchable_snapshot: { snapshot_repository: "found-snapshots", }, }, }, delete: { min_age: "735d", actions: { delete: {}, }, }, }, }, }); console.log(response);
PUT _ilm/policy/my-lifecycle-policy { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_primary_shard_size": "50gb" } } }, "warm": { "min_age": "30d", "actions": { "shrink": { "number_of_shards": 1 }, "forcemerge": { "max_num_segments": 1 } } }, "cold": { "min_age": "60d", "actions": { "searchable_snapshot": { "snapshot_repository": "found-snapshots" } } }, "frozen": { "min_age": "90d", "actions": { "searchable_snapshot": { "snapshot_repository": "found-snapshots" } } }, "delete": { "min_age": "735d", "actions": { "delete": {} } } } } }
创建组件模板
编辑如果您使用 Fleet 或 Elastic Agent,请跳到 搜索和可视化您的数据。Fleet 和 Elastic Agent 使用内置模板为您创建数据流。
如果您使用自定义应用程序,则需要设置您自己的数据流。数据流需要匹配的索引模板。在大多数情况下,您使用一个或多个组件模板来组成此索引模板。您通常为映射和索引设置使用单独的组件模板。这使您可以在多个索引模板中重用组件模板。
创建组件模板时,请包括
@timestamp
字段的date
或date_nanos
映射。如果您未指定映射,则 Elasticsearch 将@timestamp
映射为具有默认选项的date
字段。index.lifecycle.name
索引设置中的生命周期策略。
在映射您的字段时使用 Elastic Common Schema (ECS)。默认情况下,ECS 字段与多个 Elastic Stack 功能集成。
如果您不确定如何映射您的字段,请使用 运行时字段 以在搜索时从 非结构化内容 中提取字段。例如,您可以将日志消息索引到 wildcard
字段,稍后在搜索期间从此字段中提取 IP 地址和其他数据。
要在 Kibana 中创建组件模板,请打开主菜单并转到 堆栈管理 > 索引管理。在 索引模板 视图中,单击 创建组件模板。
您还可以使用 创建组件模板 API。
resp = client.cluster.put_component_template( name="my-mappings", template={ "mappings": { "properties": { "@timestamp": { "type": "date", "format": "date_optional_time||epoch_millis" }, "message": { "type": "wildcard" } } } }, meta={ "description": "Mappings for @timestamp and message fields", "my-custom-meta-field": "More arbitrary metadata" }, ) print(resp) resp1 = client.cluster.put_component_template( name="my-settings", template={ "settings": { "index.lifecycle.name": "my-lifecycle-policy" } }, meta={ "description": "Settings for ILM", "my-custom-meta-field": "More arbitrary metadata" }, ) print(resp1)
response = client.cluster.put_component_template( name: 'my-mappings', body: { template: { mappings: { properties: { "@timestamp": { type: 'date', format: 'date_optional_time||epoch_millis' }, message: { type: 'wildcard' } } } }, _meta: { description: 'Mappings for @timestamp and message fields', "my-custom-meta-field": 'More arbitrary metadata' } } ) puts response response = client.cluster.put_component_template( name: 'my-settings', body: { template: { settings: { 'index.lifecycle.name' => 'my-lifecycle-policy' } }, _meta: { description: 'Settings for ILM', "my-custom-meta-field": 'More arbitrary metadata' } } ) puts response
const response = await client.cluster.putComponentTemplate({ name: "my-mappings", template: { mappings: { properties: { "@timestamp": { type: "date", format: "date_optional_time||epoch_millis", }, message: { type: "wildcard", }, }, }, }, _meta: { description: "Mappings for @timestamp and message fields", "my-custom-meta-field": "More arbitrary metadata", }, }); console.log(response); const response1 = await client.cluster.putComponentTemplate({ name: "my-settings", template: { settings: { "index.lifecycle.name": "my-lifecycle-policy", }, }, _meta: { description: "Settings for ILM", "my-custom-meta-field": "More arbitrary metadata", }, }); console.log(response1);
# Creates a component template for mappings PUT _component_template/my-mappings { "template": { "mappings": { "properties": { "@timestamp": { "type": "date", "format": "date_optional_time||epoch_millis" }, "message": { "type": "wildcard" } } } }, "_meta": { "description": "Mappings for @timestamp and message fields", "my-custom-meta-field": "More arbitrary metadata" } } # Creates a component template for index settings PUT _component_template/my-settings { "template": { "settings": { "index.lifecycle.name": "my-lifecycle-policy" } }, "_meta": { "description": "Settings for ILM", "my-custom-meta-field": "More arbitrary metadata" } }
创建索引模板
编辑使用您的组件模板来创建索引模板。指定
要在 Kibana 中创建索引模板,请打开主菜单并转到 堆栈管理 > 索引管理。在 索引模板 视图中,单击 创建模板。
您还可以使用 创建索引模板 API。包含 data_stream
对象以启用数据流。
resp = client.indices.put_index_template( name="my-index-template", index_patterns=[ "my-data-stream*" ], data_stream={}, composed_of=[ "my-mappings", "my-settings" ], priority=500, meta={ "description": "Template for my time series data", "my-custom-meta-field": "More arbitrary metadata" }, ) print(resp)
response = client.indices.put_index_template( name: 'my-index-template', body: { index_patterns: [ 'my-data-stream*' ], data_stream: {}, composed_of: [ 'my-mappings', 'my-settings' ], priority: 500, _meta: { description: 'Template for my time series data', "my-custom-meta-field": 'More arbitrary metadata' } } ) puts response
const response = await client.indices.putIndexTemplate({ name: "my-index-template", index_patterns: ["my-data-stream*"], data_stream: {}, composed_of: ["my-mappings", "my-settings"], priority: 500, _meta: { description: "Template for my time series data", "my-custom-meta-field": "More arbitrary metadata", }, }); console.log(response);
PUT _index_template/my-index-template { "index_patterns": ["my-data-stream*"], "data_stream": { }, "composed_of": [ "my-mappings", "my-settings" ], "priority": 500, "_meta": { "description": "Template for my time series data", "my-custom-meta-field": "More arbitrary metadata" } }
将数据添加到数据流
编辑索引请求 将文档添加到数据流。这些请求必须使用 create
的 op_type
。文档必须包含 @timestamp
字段。
要自动创建您的数据流,请提交一个以流名称为目标的索引请求。此名称必须与您的索引模板的索引模式之一匹配。
resp = client.bulk( index="my-data-stream", operations=[ { "create": {} }, { "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }, { "create": {} }, { "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" } ], ) print(resp) resp1 = client.index( index="my-data-stream", document={ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }, ) print(resp1)
response = client.bulk( index: 'my-data-stream', body: [ { create: {} }, { "@timestamp": '2099-05-06T16:21:15.000Z', message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736' }, { create: {} }, { "@timestamp": '2099-05-06T16:25:42.000Z', message: '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638' } ] ) puts response response = client.index( index: 'my-data-stream', body: { "@timestamp": '2099-05-06T16:21:15.000Z', message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736' } ) puts response
const response = await client.bulk({ index: "my-data-stream", operations: [ { create: {}, }, { "@timestamp": "2099-05-06T16:21:15.000Z", message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736', }, { create: {}, }, { "@timestamp": "2099-05-06T16:25:42.000Z", message: '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638', }, ], }); console.log(response); const response1 = await client.index({ index: "my-data-stream", document: { "@timestamp": "2099-05-06T16:21:15.000Z", message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736', }, }); console.log(response1);
PUT my-data-stream/_bulk { "create":{ } } { "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" } { "create":{ } } { "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" } POST my-data-stream/_doc { "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }
搜索和可视化您的数据
编辑要在 Kibana 中浏览和搜索您的数据,请打开主菜单并选择 Discover。请参阅 Kibana 的 Discover 文档。
使用 Kibana 的 仪表板 功能,以图表、表格、地图等形式可视化您的数据。请参阅 Kibana 的 仪表板文档。
您还可以使用 搜索 API 来搜索和聚合您的数据。使用 运行时字段 和 grok 模式 在搜索时从日志消息和其他非结构化内容中动态提取数据。
resp = client.search( index="my-data-stream", runtime_mappings={ "source.ip": { "type": "ip", "script": "\n String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n if (sourceip != null) emit(sourceip);\n " } }, query={ "bool": { "filter": [ { "range": { "@timestamp": { "gte": "now-1d/d", "lt": "now/d" } } }, { "range": { "source.ip": { "gte": "192.0.2.0", "lte": "192.0.2.255" } } } ] } }, fields=[ "*" ], source=False, sort=[ { "@timestamp": "desc" }, { "source.ip": "desc" } ], ) print(resp)
const response = await client.search({ index: "my-data-stream", runtime_mappings: { "source.ip": { type: "ip", script: "\n String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n if (sourceip != null) emit(sourceip);\n ", }, }, query: { bool: { filter: [ { range: { "@timestamp": { gte: "now-1d/d", lt: "now/d", }, }, }, { range: { "source.ip": { gte: "192.0.2.0", lte: "192.0.2.255", }, }, }, ], }, }, fields: ["*"], _source: false, sort: [ { "@timestamp": "desc", }, { "source.ip": "desc", }, ], }); console.log(response);
GET my-data-stream/_search { "runtime_mappings": { "source.ip": { "type": "ip", "script": """ String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "message" ].value)?.sourceip; if (sourceip != null) emit(sourceip); """ } }, "query": { "bool": { "filter": [ { "range": { "@timestamp": { "gte": "now-1d/d", "lt": "now/d" } } }, { "range": { "source.ip": { "gte": "192.0.2.0", "lte": "192.0.2.255" } } } ] } }, "fields": [ "*" ], "_source": false, "sort": [ { "@timestamp": "desc" }, { "source.ip": "desc" } ] }
Elasticsearch 搜索默认是同步的。跨冻结数据、长时间范围或大型数据集的搜索可能需要更长时间。使用异步搜索 API 在后台运行搜索。有关更多搜索选项,请参阅搜索 API。
resp = client.async_search.submit( index="my-data-stream", runtime_mappings={ "source.ip": { "type": "ip", "script": "\n String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n if (sourceip != null) emit(sourceip);\n " } }, query={ "bool": { "filter": [ { "range": { "@timestamp": { "gte": "now-2y/d", "lt": "now/d" } } }, { "range": { "source.ip": { "gte": "192.0.2.0", "lte": "192.0.2.255" } } } ] } }, fields=[ "*" ], source=False, sort=[ { "@timestamp": "desc" }, { "source.ip": "desc" } ], ) print(resp)
const response = await client.asyncSearch.submit({ index: "my-data-stream", runtime_mappings: { "source.ip": { type: "ip", script: "\n String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ \"message\" ].value)?.sourceip;\n if (sourceip != null) emit(sourceip);\n ", }, }, query: { bool: { filter: [ { range: { "@timestamp": { gte: "now-2y/d", lt: "now/d", }, }, }, { range: { "source.ip": { gte: "192.0.2.0", lte: "192.0.2.255", }, }, }, ], }, }, fields: ["*"], _source: false, sort: [ { "@timestamp": "desc", }, { "source.ip": "desc", }, ], }); console.log(response);
POST my-data-stream/_async_search { "runtime_mappings": { "source.ip": { "type": "ip", "script": """ String sourceip=grok('%{IPORHOST:sourceip} .*').extract(doc[ "message" ].value)?.sourceip; if (sourceip != null) emit(sourceip); """ } }, "query": { "bool": { "filter": [ { "range": { "@timestamp": { "gte": "now-2y/d", "lt": "now/d" } } }, { "range": { "source.ip": { "gte": "192.0.2.0", "lte": "192.0.2.255" } } } ] } }, "fields": [ "*" ], "_source": false, "sort": [ { "@timestamp": "desc" }, { "source.ip": "desc" } ] }