重新索引时间序列数据流 (TSDS)
编辑重新索引时间序列数据流 (TSDS)
编辑简介
编辑通过重新索引,您可以将旧的时间序列数据流 (TSDS)中的文档复制到新的数据流中。数据流通常支持重新索引,但有一些限制。此外,由于对每个包含的后备索引所接受的时间戳范围进行严格控制,时间序列数据流引入了额外的挑战。直接使用重新索引 API 可能会因为尝试插入时间戳超出当前接受窗口的文档而导致错误。
为了避免这些限制,请使用下面概述的流程
- 为将包含重新索引数据目标数据流创建索引模板。
-
更新模板以
- 将
index.time_series.start_time
和index.time_series.end_time
索引设置设置为与旧数据流中最低和最高的@timestamp
值匹配。 - 将
index.number_of_shards
索引设置设置为旧数据流的所有后备索引的所有主分片之和。 - 将
index.number_of_replicas
设置为零,并取消设置index.lifecycle.name
索引设置。
- 将
- 运行重新索引操作直至完成。
- 恢复目标索引模板中被覆盖的索引设置。
- 调用
rollover
API 以创建可以接收新文档的新后备索引。
此过程仅适用于没有降采样配置的时间序列数据流。具有降采样的数据流只能通过单独重新索引其后备索引并将其添加到空目标数据流来重新索引。
在接下来的内容中,我们将通过示例详细说明该过程的每个步骤。
创建 TSDS 模板以接受旧文档
编辑考虑一个具有以下模板的 TSDS
resp = client.cluster.put_component_template( name="source_template", template={ "settings": { "index": { "number_of_replicas": 2, "number_of_shards": 2, "mode": "time_series", "routing_path": [ "metricset" ] } }, "mappings": { "properties": { "@timestamp": { "type": "date" }, "metricset": { "type": "keyword", "time_series_dimension": True }, "k8s": { "properties": { "tx": { "type": "long" }, "rx": { "type": "long" } } } } } }, ) print(resp) resp1 = client.indices.put_index_template( name="1", index_patterns=[ "k8s*" ], composed_of=[ "source_template" ], data_stream={}, ) print(resp1)
response = client.cluster.put_component_template( name: 'source_template', body: { template: { settings: { index: { number_of_replicas: 2, number_of_shards: 2, mode: 'time_series', routing_path: [ 'metricset' ] } }, mappings: { properties: { "@timestamp": { type: 'date' }, metricset: { type: 'keyword', time_series_dimension: true }, "k8s": { properties: { tx: { type: 'long' }, rx: { type: 'long' } } } } } } } ) puts response response = client.indices.put_index_template( name: 1, body: { index_patterns: [ 'k8s*' ], composed_of: [ 'source_template' ], data_stream: {} } ) puts response
const response = await client.cluster.putComponentTemplate({ name: "source_template", template: { settings: { index: { number_of_replicas: 2, number_of_shards: 2, mode: "time_series", routing_path: ["metricset"], }, }, mappings: { properties: { "@timestamp": { type: "date", }, metricset: { type: "keyword", time_series_dimension: true, }, k8s: { properties: { tx: { type: "long", }, rx: { type: "long", }, }, }, }, }, }, }); console.log(response); const response1 = await client.indices.putIndexTemplate({ name: 1, index_patterns: ["k8s*"], composed_of: ["source_template"], data_stream: {}, }); console.log(response1);
POST /_component_template/source_template { "template": { "settings": { "index": { "number_of_replicas": 2, "number_of_shards": 2, "mode": "time_series", "routing_path": [ "metricset" ] } }, "mappings": { "properties": { "@timestamp": { "type": "date" }, "metricset": { "type": "keyword", "time_series_dimension": true }, "k8s": { "properties": { "tx": { "type": "long" }, "rx": { "type": "long" } } } } } } } POST /_index_template/1 { "index_patterns": [ "k8s*" ], "composed_of": [ "source_template" ], "data_stream": {} }
/k8s/_settings
的可能输出如下所示
{ ".ds-k8s-2023.09.01-000002": { "settings": { "index": { "mode": "time_series", "routing": { "allocation": { "include": { "_tier_preference": "data_hot" } } }, "hidden": "true", "number_of_shards": "2", "time_series": { "end_time": "2023-09-01T14:00:00.000Z", "start_time": "2023-09-01T10:00:00.000Z" }, "provided_name": ".ds-k9s-2023.09.01-000002", "creation_date": "1694439857608", "number_of_replicas": "2", "routing_path": [ "metricset" ], ... } } }, ".ds-k8s-2023.09.01-000001": { "settings": { "index": { "mode": "time_series", "routing": { "allocation": { "include": { "_tier_preference": "data_hot" } } }, "hidden": "true", "number_of_shards": "2", "time_series": { "end_time": "2023-09-01T10:00:00.000Z", "start_time": "2023-09-01T06:00:00.000Z" }, "provided_name": ".ds-k9s-2023.09.01-000001", "creation_date": "1694439837126", "number_of_replicas": "2", "routing_path": [ "metricset" ], ... } } } }
要重新索引此 TSDS,请不要在目标数据流中重用其索引模板,以避免影响其功能。相反,克隆源 TSDS 的模板并应用以下修改
- 显式设置
index.time_series.start_time
和index.time_series.end_time
索引设置。它们的值应基于要重新索引的数据流中最低和最高的@timestamp
值。这样,初始后备索引可以加载源数据流中包含的所有数据。 - 将
index.number_of_shards
索引设置设置为源数据流的所有后备索引的所有主分片之和。这有助于保持相同的搜索并行级别,因为每个分片都在单独的线程中处理(或更多)。 - 取消设置
index.lifecycle.name
索引设置(如果有)。这可以防止 ILM 在重新索引期间修改目标数据流。 - (可选)将
index.number_of_replicas
设置为零。这有助于加快重新索引操作。由于数据会被复制,因此由于缺少副本而导致数据丢失的风险有限。
使用上面的示例作为源 TSDS,目标 TSDS 的模板将是
const response = await client.cluster.putComponentTemplate({ name: "destination_template", template: { settings: { index: { number_of_replicas: 0, number_of_shards: 4, mode: "time_series", routing_path: ["metricset"], time_series: { end_time: "2023-09-01T14:00:00.000Z", start_time: "2023-09-01T06:00:00.000Z", }, }, }, mappings: { properties: { "@timestamp": { type: "date", }, metricset: { type: "keyword", time_series_dimension: true, }, k8s: { properties: { tx: { type: "long", }, rx: { type: "long", }, }, }, }, }, }, }); console.log(response); const response1 = await client.indices.putIndexTemplate({ name: 2, index_patterns: ["k9s*"], composed_of: ["destination_template"], data_stream: {}, }); console.log(response1);
POST /_component_template/destination_template { "template": { "settings": { "index": { "number_of_replicas": 0, "number_of_shards": 4, "mode": "time_series", "routing_path": [ "metricset" ], "time_series": { "end_time": "2023-09-01T14:00:00.000Z", "start_time": "2023-09-01T06:00:00.000Z" } } }, "mappings": { "properties": { "@timestamp": { "type": "date" }, "metricset": { "type": "keyword", "time_series_dimension": true }, "k8s": { "properties": { "tx": { "type": "long" }, "rx": { "type": "long" } } } } } } } POST /_index_template/2 { "index_patterns": [ "k9s*" ], "composed_of": [ "destination_template" ], "data_stream": {} }
重新索引
编辑调用重新索引 API,例如
resp = client.reindex( source={ "index": "k8s" }, dest={ "index": "k9s", "op_type": "create" }, ) print(resp)
response = client.reindex( body: { source: { index: 'k8s' }, dest: { index: 'k9s', op_type: 'create' } } ) puts response
const response = await client.reindex({ source: { index: "k8s", }, dest: { index: "k9s", op_type: "create", }, }); console.log(response);
POST /_reindex { "source": { "index": "k8s" }, "dest": { "index": "k9s", "op_type": "create" } }
恢复目标索引模板
编辑一旦重新索引操作完成,请按如下方式恢复目标 TSDS 的索引模板
- 删除对
index.time_series.start_time
和index.time_series.end_time
的覆盖。 - 恢复
index.number_of_shards
、index.number_of_replicas
和index.lifecycle.name
的值(如果适用)。
使用前面的示例,目标模板修改如下
resp = client.cluster.put_component_template( name="destination_template", template={ "settings": { "index": { "number_of_replicas": 2, "number_of_shards": 2, "mode": "time_series", "routing_path": [ "metricset" ] } }, "mappings": { "properties": { "@timestamp": { "type": "date" }, "metricset": { "type": "keyword", "time_series_dimension": True }, "k8s": { "properties": { "tx": { "type": "long" }, "rx": { "type": "long" } } } } } }, ) print(resp)
response = client.cluster.put_component_template( name: 'destination_template', body: { template: { settings: { index: { number_of_replicas: 2, number_of_shards: 2, mode: 'time_series', routing_path: [ 'metricset' ] } }, mappings: { properties: { "@timestamp": { type: 'date' }, metricset: { type: 'keyword', time_series_dimension: true }, "k8s": { properties: { tx: { type: 'long' }, rx: { type: 'long' } } } } } } } ) puts response
const response = await client.cluster.putComponentTemplate({ name: "destination_template", template: { settings: { index: { number_of_replicas: 2, number_of_shards: 2, mode: "time_series", routing_path: ["metricset"], }, }, mappings: { properties: { "@timestamp": { type: "date", }, metricset: { type: "keyword", time_series_dimension: true, }, k8s: { properties: { tx: { type: "long", }, rx: { type: "long", }, }, }, }, }, }, }); console.log(response);
POST /_component_template/destination_template { "template": { "settings": { "index": { "number_of_replicas": 2, "number_of_shards": 2, "mode": "time_series", "routing_path": [ "metricset" ] } }, "mappings": { "properties": { "@timestamp": { "type": "date" }, "metricset": { "type": "keyword", "time_series_dimension": true }, "k8s": { "properties": { "tx": { "type": "long" }, "rx": { "type": "long" } } } } } } }
接下来,在目标数据流上调用 rollover
API,无需设置任何条件。
resp = client.indices.rollover( alias="k9s", ) print(resp)
response = client.indices.rollover( alias: 'k9s' ) puts response
const response = await client.indices.rollover({ alias: "k9s", }); console.log(response);
POST /k9s/_rollover/
这将创建一个具有更新的索引设置的新后备索引。目标数据流现在已准备好接受新文档。
请注意,初始后备索引仍可以接受源数据流中导出的时间戳范围内的文档。如果不需要这样,请将其显式标记为只读。