设置数据流

编辑

要设置数据流,请按照以下步骤操作

您还可以将索引别名转换为数据流

如果您使用 Fleet、Elastic Agent 或 Logstash,请跳过本教程。它们都会为您设置数据流。

对于 Fleet 和 Elastic Agent,请查看此数据流文档。对于 Logstash,请查看 elasticsearch output 插件的数据流设置

创建索引生命周期策略

编辑

虽然是可选的,但我们建议使用 ILM 来自动管理数据流的后备索引。ILM 需要索引生命周期策略。

要在 Kibana 中创建索引生命周期策略,请打开主菜单,然后转到Stack Management > Index Lifecycle Policies。单击创建策略

您还可以使用创建生命周期策略 API

resp = client.ilm.put_lifecycle(
    name="my-lifecycle-policy",
    policy={
        "phases": {
            "hot": {
                "actions": {
                    "rollover": {
                        "max_primary_shard_size": "50gb"
                    }
                }
            },
            "warm": {
                "min_age": "30d",
                "actions": {
                    "shrink": {
                        "number_of_shards": 1
                    },
                    "forcemerge": {
                        "max_num_segments": 1
                    }
                }
            },
            "cold": {
                "min_age": "60d",
                "actions": {
                    "searchable_snapshot": {
                        "snapshot_repository": "found-snapshots"
                    }
                }
            },
            "frozen": {
                "min_age": "90d",
                "actions": {
                    "searchable_snapshot": {
                        "snapshot_repository": "found-snapshots"
                    }
                }
            },
            "delete": {
                "min_age": "735d",
                "actions": {
                    "delete": {}
                }
            }
        }
    },
)
print(resp)
const response = await client.ilm.putLifecycle({
  name: "my-lifecycle-policy",
  policy: {
    phases: {
      hot: {
        actions: {
          rollover: {
            max_primary_shard_size: "50gb",
          },
        },
      },
      warm: {
        min_age: "30d",
        actions: {
          shrink: {
            number_of_shards: 1,
          },
          forcemerge: {
            max_num_segments: 1,
          },
        },
      },
      cold: {
        min_age: "60d",
        actions: {
          searchable_snapshot: {
            snapshot_repository: "found-snapshots",
          },
        },
      },
      frozen: {
        min_age: "90d",
        actions: {
          searchable_snapshot: {
            snapshot_repository: "found-snapshots",
          },
        },
      },
      delete: {
        min_age: "735d",
        actions: {
          delete: {},
        },
      },
    },
  },
});
console.log(response);
PUT _ilm/policy/my-lifecycle-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_primary_shard_size": "50gb"
          }
        }
      },
      "warm": {
        "min_age": "30d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          }
        }
      },
      "cold": {
        "min_age": "60d",
        "actions": {
          "searchable_snapshot": {
            "snapshot_repository": "found-snapshots"
          }
        }
      },
      "frozen": {
        "min_age": "90d",
        "actions": {
          "searchable_snapshot": {
            "snapshot_repository": "found-snapshots"
          }
        }
      },
      "delete": {
        "min_age": "735d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

创建组件模板

编辑

数据流需要匹配的索引模板。在大多数情况下,您可以使用一个或多个组件模板来构成此索引模板。您通常会为映射和索引设置使用单独的组件模板。这使您可以在多个索引模板中重用组件模板。

创建组件模板时,请包括

  • 用于 @timestamp 字段的 datedate_nanos 映射。如果您没有指定映射,则 Elasticsearch 会将 @timestamp 映射为具有默认选项的 date 字段。
  • 您的生命周期策略在 index.lifecycle.name 索引设置中。

映射字段时,请使用Elastic Common Schema (ECS)。默认情况下,ECS 字段与多个 Elastic Stack 功能集成。

如果您不确定如何映射字段,请使用运行时字段,以便在搜索时从非结构化内容中提取字段。例如,您可以将日志消息索引到 wildcard 字段,然后在搜索期间从此字段中提取 IP 地址和其他数据。

要在 Kibana 中创建组件模板,请打开主菜单,然后转到Stack Management > Index Management。在索引模板视图中,单击创建组件模板

您还可以使用创建组件模板 API

resp = client.cluster.put_component_template(
    name="my-mappings",
    template={
        "mappings": {
            "properties": {
                "@timestamp": {
                    "type": "date",
                    "format": "date_optional_time||epoch_millis"
                },
                "message": {
                    "type": "wildcard"
                }
            }
        }
    },
    meta={
        "description": "Mappings for @timestamp and message fields",
        "my-custom-meta-field": "More arbitrary metadata"
    },
)
print(resp)

resp1 = client.cluster.put_component_template(
    name="my-settings",
    template={
        "settings": {
            "index.lifecycle.name": "my-lifecycle-policy"
        }
    },
    meta={
        "description": "Settings for ILM",
        "my-custom-meta-field": "More arbitrary metadata"
    },
)
print(resp1)
response = client.cluster.put_component_template(
  name: 'my-mappings',
  body: {
    template: {
      mappings: {
        properties: {
          "@timestamp": {
            type: 'date',
            format: 'date_optional_time||epoch_millis'
          },
          message: {
            type: 'wildcard'
          }
        }
      }
    },
    _meta: {
      description: 'Mappings for @timestamp and message fields',
      "my-custom-meta-field": 'More arbitrary metadata'
    }
  }
)
puts response

response = client.cluster.put_component_template(
  name: 'my-settings',
  body: {
    template: {
      settings: {
        'index.lifecycle.name' => 'my-lifecycle-policy'
      }
    },
    _meta: {
      description: 'Settings for ILM',
      "my-custom-meta-field": 'More arbitrary metadata'
    }
  }
)
puts response
const response = await client.cluster.putComponentTemplate({
  name: "my-mappings",
  template: {
    mappings: {
      properties: {
        "@timestamp": {
          type: "date",
          format: "date_optional_time||epoch_millis",
        },
        message: {
          type: "wildcard",
        },
      },
    },
  },
  _meta: {
    description: "Mappings for @timestamp and message fields",
    "my-custom-meta-field": "More arbitrary metadata",
  },
});
console.log(response);

const response1 = await client.cluster.putComponentTemplate({
  name: "my-settings",
  template: {
    settings: {
      "index.lifecycle.name": "my-lifecycle-policy",
    },
  },
  _meta: {
    description: "Settings for ILM",
    "my-custom-meta-field": "More arbitrary metadata",
  },
});
console.log(response1);
# Creates a component template for mappings
PUT _component_template/my-mappings
{
  "template": {
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "date_optional_time||epoch_millis"
        },
        "message": {
          "type": "wildcard"
        }
      }
    }
  },
  "_meta": {
    "description": "Mappings for @timestamp and message fields",
    "my-custom-meta-field": "More arbitrary metadata"
  }
}

# Creates a component template for index settings
PUT _component_template/my-settings
{
  "template": {
    "settings": {
      "index.lifecycle.name": "my-lifecycle-policy"
    }
  },
  "_meta": {
    "description": "Settings for ILM",
    "my-custom-meta-field": "More arbitrary metadata"
  }
}

创建索引模板

编辑

使用组件模板创建索引模板。指定

  • 与数据流名称匹配的一个或多个索引模式。我们建议使用我们的数据流命名方案
  • 该模板已启用数据流。
  • 任何包含映射和索引设置的组件模板。
  • 优先级高于 200,以避免与内置模板发生冲突。请参阅避免索引模式冲突

要在 Kibana 中创建索引模板,请打开主菜单,然后转到Stack Management > Index Management。在索引模板视图中,单击创建模板

您还可以使用创建索引模板 API。包含 data_stream 对象以启用数据流。

resp = client.indices.put_index_template(
    name="my-index-template",
    index_patterns=[
        "my-data-stream*"
    ],
    data_stream={},
    composed_of=[
        "my-mappings",
        "my-settings"
    ],
    priority=500,
    meta={
        "description": "Template for my time series data",
        "my-custom-meta-field": "More arbitrary metadata"
    },
)
print(resp)
response = client.indices.put_index_template(
  name: 'my-index-template',
  body: {
    index_patterns: [
      'my-data-stream*'
    ],
    data_stream: {},
    composed_of: [
      'my-mappings',
      'my-settings'
    ],
    priority: 500,
    _meta: {
      description: 'Template for my time series data',
      "my-custom-meta-field": 'More arbitrary metadata'
    }
  }
)
puts response
const response = await client.indices.putIndexTemplate({
  name: "my-index-template",
  index_patterns: ["my-data-stream*"],
  data_stream: {},
  composed_of: ["my-mappings", "my-settings"],
  priority: 500,
  _meta: {
    description: "Template for my time series data",
    "my-custom-meta-field": "More arbitrary metadata",
  },
});
console.log(response);
PUT _index_template/my-index-template
{
  "index_patterns": ["my-data-stream*"],
  "data_stream": { },
  "composed_of": [ "my-mappings", "my-settings" ],
  "priority": 500,
  "_meta": {
    "description": "Template for my time series data",
    "my-custom-meta-field": "More arbitrary metadata"
  }
}

创建数据流

编辑

索引请求将文档添加到数据流。这些请求必须使用 op_typecreate。文档必须包含 @timestamp 字段。

要自动创建数据流,请提交一个以流名称为目标的索引请求。此名称必须与索引模板的索引模式之一匹配。

resp = client.bulk(
    index="my-data-stream",
    operations=[
        {
            "create": {}
        },
        {
            "@timestamp": "2099-05-06T16:21:15.000Z",
            "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
        },
        {
            "create": {}
        },
        {
            "@timestamp": "2099-05-06T16:25:42.000Z",
            "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638"
        }
    ],
)
print(resp)

resp1 = client.index(
    index="my-data-stream",
    document={
        "@timestamp": "2099-05-06T16:21:15.000Z",
        "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
    },
)
print(resp1)
response = client.bulk(
  index: 'my-data-stream',
  body: [
    {
      create: {}
    },
    {
      "@timestamp": '2099-05-06T16:21:15.000Z',
      message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736'
    },
    {
      create: {}
    },
    {
      "@timestamp": '2099-05-06T16:25:42.000Z',
      message: '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638'
    }
  ]
)
puts response

response = client.index(
  index: 'my-data-stream',
  body: {
    "@timestamp": '2099-05-06T16:21:15.000Z',
    message: '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736'
  }
)
puts response
const response = await client.bulk({
  index: "my-data-stream",
  operations: [
    {
      create: {},
    },
    {
      "@timestamp": "2099-05-06T16:21:15.000Z",
      message:
        '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736',
    },
    {
      create: {},
    },
    {
      "@timestamp": "2099-05-06T16:25:42.000Z",
      message:
        '192.0.2.255 - - [06/May/2099:16:25:42 +0000] "GET /favicon.ico HTTP/1.0" 200 3638',
    },
  ],
});
console.log(response);

const response1 = await client.index({
  index: "my-data-stream",
  document: {
    "@timestamp": "2099-05-06T16:21:15.000Z",
    message:
      '192.0.2.42 - - [06/May/2099:16:21:15 +0000] "GET /images/bg.jpg HTTP/1.0" 200 24736',
  },
});
console.log(response1);
PUT my-data-stream/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:25:42.000Z", "message": "192.0.2.255 - - [06/May/2099:16:25:42 +0000] \"GET /favicon.ico HTTP/1.0\" 200 3638" }

POST my-data-stream/_doc
{
  "@timestamp": "2099-05-06T16:21:15.000Z",
  "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"
}

您还可以使用创建数据流 API 手动创建流。流的名称仍然必须与模板的索引模式之一匹配。

resp = client.indices.create_data_stream(
    name="my-data-stream",
)
print(resp)
response = client.indices.create_data_stream(
  name: 'my-data-stream'
)
puts response
const response = await client.indices.createDataStream({
  name: "my-data-stream",
});
console.log(response);
PUT _data_stream/my-data-stream

保护数据流的安全

编辑

使用索引权限来控制对数据流的访问。授予数据流的权限会授予其后备索引相同的权限。

有关示例,请参阅数据流权限

将索引别名转换为数据流

编辑

在 Elasticsearch 7.9 之前,您通常会使用带有写入索引的索引别名来管理时间序列数据。数据流取代了此功能,所需的维护更少,并自动与数据层集成。

要将带有写入索引的索引别名转换为具有相同名称的数据流,请使用迁移到数据流 API。在转换期间,别名的索引将成为流的隐藏后备索引。别名的写入索引将成为流的写入索引。流仍然需要一个启用了数据流的匹配索引模板。

resp = client.indices.migrate_to_data_stream(
    name="my-time-series-data",
)
print(resp)
const response = await client.indices.migrateToDataStream({
  name: "my-time-series-data",
});
console.log(response);
POST _data_stream/_migrate/my-time-series-data

获取有关数据流的信息

编辑

要在 Kibana 中获取有关数据流的信息,请打开主菜单,然后转到Stack Management > Index Management。在数据流视图中,单击数据流的名称。

您还可以使用获取数据流 API

resp = client.indices.get_data_stream(
    name="my-data-stream",
)
print(resp)
response = client.indices.get_data_stream(
  name: 'my-data-stream'
)
puts response
const response = await client.indices.getDataStream({
  name: "my-data-stream",
});
console.log(response);
GET _data_stream/my-data-stream

删除数据流

编辑

要在 Kibana 中删除数据流及其后备索引,请打开主菜单,然后转到Stack Management > Index Management。在数据流视图中,单击垃圾桶图标。仅当您对数据流具有 delete_index 安全权限时,该图标才会显示。

您还可以使用删除数据流 API

resp = client.indices.delete_data_stream(
    name="my-data-stream",
)
print(resp)
response = client.indices.delete_data_stream(
  name: 'my-data-stream'
)
puts response
const response = await client.indices.deleteDataStream({
  name: "my-data-stream",
});
console.log(response);
DELETE _data_stream/my-data-stream