Reindex API
编辑Reindex API
编辑将文档从源复制到目标。
源可以是任何现有的索引、别名或数据流。目标必须与源不同。例如,你不能将数据流重新索引到其自身。
resp = client.reindex( source={ "index": "my-index-000001" }, dest={ "index": "my-new-index-000001" }, ) print(resp)
response = client.reindex( body: { source: { index: 'my-index-000001' }, dest: { index: 'my-new-index-000001' } } ) puts response
const response = await client.reindex({ source: { index: "my-index-000001", }, dest: { index: "my-new-index-000001", }, }); console.log(response);
POST _reindex { "source": { "index": "my-index-000001" }, "dest": { "index": "my-new-index-000001" } }
请求
编辑POST /_reindex
前提条件
编辑描述
编辑从源索引中提取 文档源,并将文档索引到目标索引中。你可以将所有文档复制到目标索引,或者重新索引文档的子集。
就像 _update_by_query
一样,_reindex
获取源的快照,但其目标必须 不同,因此不太可能发生版本冲突。可以像 index API 一样配置 dest
元素以控制乐观并发控制。省略 version_type
或将其设置为 internal
会导致 Elasticsearch 将文档盲目地转储到目标中,覆盖任何具有相同 ID 的文档。
将 version_type
设置为 external
会导致 Elasticsearch 保留源中的 version
,创建任何丢失的文档,并更新目标中版本较旧的任何文档。
将 op_type
设置为 create
会导致 _reindex
仅在目标中创建丢失的文档。所有现有文档都将导致版本冲突。
由于数据流是 仅追加 的,因此对目标数据流的任何重新索引请求都必须具有 op_type
of create
。重新索引只能向目标数据流添加新文档。它无法更新目标数据流中的现有文档。
默认情况下,版本冲突会中止 _reindex
进程。要在存在冲突的情况下继续重新索引,请将 "conflicts"
请求主体参数设置为 proceed
。在这种情况下,响应将包括遇到的版本冲突的计数。请注意,其他错误类型的处理不受 "conflicts"
参数的影响。此外,如果你选择计算版本冲突,则该操作可能会尝试从源中重新索引比 max_docs
更多的文档,直到它成功将 max_docs
文档索引到目标中,或者它已遍历了源查询中的每个文档。
异步运行重新索引
编辑如果请求包含 wait_for_completion=false
,则 Elasticsearch 会执行一些预检,启动请求,并返回一个 task
,你可以使用该任务来取消或获取任务的状态。Elasticsearch 会在 _tasks/<task_id>
中创建此任务的记录作为文档。
从多个源重新索引
编辑如果你有多个源要重新索引,通常最好一次重新索引一个源,而不是使用 glob 模式来选取多个源。这样,如果出现任何错误,你可以通过删除部分完成的源并重新开始来恢复该过程。这也使得并行处理过程非常简单:拆分要重新索引的源列表并并行运行每个列表。
一次性 bash 脚本似乎很适合此操作
for index in i1 i2 i3 i4 i5; do curl -HContent-Type:application/json -XPOST localhost:9200/_reindex?pretty -d'{ "source": { "index": "'$index'" }, "dest": { "index": "'$index'-reindexed" } }' done
节流
编辑将 requests_per_second
设置为任何正十进制数(1.4
、6
、1000
等),以限制 _reindex
发出索引操作批次的速度。通过用等待时间填充每个批次来限制请求。要禁用节流,请将 requests_per_second
设置为 -1
。
节流是通过在批次之间等待来完成的,以便可以为 _reindex
在内部使用的 scroll
提供考虑到填充的超时时间。填充时间是批次大小除以 requests_per_second
与写入所花费的时间之间的差值。默认情况下,批次大小为 1000
,因此,如果将 requests_per_second
设置为 500
target_time = 1000 / 500 per second = 2 seconds wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
由于批次是作为单个 _bulk
请求发出的,因此较大的批次大小会导致 Elasticsearch 创建许多请求,然后在开始下一组之前等待一段时间。这是“突发”而不是“平稳”。
重新节流
编辑可以使用 _rethrottle
API 在正在运行的重新索引上更改 requests_per_second
的值
$params = [ 'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619', ]; $response = $client->reindexRethrottle($params);
resp = client.reindex_rethrottle( task_id="r1A2WoRbTwKZ516z6NEs5A:36619", requests_per_second="-1", ) print(resp)
response = client.reindex_rethrottle( task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619', requests_per_second: -1 ) puts response
res, err := es.ReindexRethrottle( "r1A2WoRbTwKZ516z6NEs5A:36619", esapi.IntPtr(-1), ) fmt.Println(res, err)
const response = await client.reindexRethrottle({ task_id: "r1A2WoRbTwKZ516z6NEs5A:36619", requests_per_second: "-1", }); console.log(response);
POST _reindex/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1
可以使用 任务 API 找到任务 ID。
就像在 Reindex API 上设置它一样,requests_per_second
可以是 -1
以禁用节流,也可以是任何十进制数,例如 1.7
或 12
以将节流限制在该级别。加快查询速度的重新节流会立即生效,但是减慢查询速度的重新节流将在完成当前批次后生效。这样可以防止滚动超时。
切片
编辑Reindex 支持 切片滚动 以并行化重新索引过程。这种并行化可以提高效率,并提供一种将请求分解为较小部分的便捷方式。
手动切片
编辑通过为每个请求提供切片 ID 和切片总数来手动切片重新索引请求
resp = client.reindex( source={ "index": "my-index-000001", "slice": { "id": 0, "max": 2 } }, dest={ "index": "my-new-index-000001" }, ) print(resp) resp1 = client.reindex( source={ "index": "my-index-000001", "slice": { "id": 1, "max": 2 } }, dest={ "index": "my-new-index-000001" }, ) print(resp1)
response = client.reindex( body: { source: { index: 'my-index-000001', slice: { id: 0, max: 2 } }, dest: { index: 'my-new-index-000001' } } ) puts response response = client.reindex( body: { source: { index: 'my-index-000001', slice: { id: 1, max: 2 } }, dest: { index: 'my-new-index-000001' } } ) puts response
const response = await client.reindex({ source: { index: "my-index-000001", slice: { id: 0, max: 2, }, }, dest: { index: "my-new-index-000001", }, }); console.log(response); const response1 = await client.reindex({ source: { index: "my-index-000001", slice: { id: 1, max: 2, }, }, dest: { index: "my-new-index-000001", }, }); console.log(response1);
POST _reindex { "source": { "index": "my-index-000001", "slice": { "id": 0, "max": 2 } }, "dest": { "index": "my-new-index-000001" } } POST _reindex { "source": { "index": "my-index-000001", "slice": { "id": 1, "max": 2 } }, "dest": { "index": "my-new-index-000001" } }
你可以通过以下方式验证此操作是否有效
resp = client.indices.refresh() print(resp) resp1 = client.search( index="my-new-index-000001", size="0", filter_path="hits.total", ) print(resp1)
response = client.indices.refresh puts response response = client.search( index: 'my-new-index-000001', size: 0, filter_path: 'hits.total' ) puts response
const response = await client.indices.refresh(); console.log(response); const response1 = await client.search({ index: "my-new-index-000001", size: 0, filter_path: "hits.total", }); console.log(response1);
GET _refresh POST my-new-index-000001/_search?size=0&filter_path=hits.total
这将产生一个像这样的合理 total
{ "hits": { "total" : { "value": 120, "relation": "eq" } } }
自动切片
编辑你还可以让 _reindex
使用 切片滚动 在 _id
上进行切片来自动并行化。使用 slices
指定要使用的切片数
resp = client.reindex( slices="5", refresh=True, source={ "index": "my-index-000001" }, dest={ "index": "my-new-index-000001" }, ) print(resp)
response = client.reindex( slices: 5, refresh: true, body: { source: { index: 'my-index-000001' }, dest: { index: 'my-new-index-000001' } } ) puts response
const response = await client.reindex({ slices: 5, refresh: "true", source: { index: "my-index-000001", }, dest: { index: "my-new-index-000001", }, }); console.log(response);
POST _reindex?slices=5&refresh { "source": { "index": "my-index-000001" }, "dest": { "index": "my-new-index-000001" } }
你还可以通过以下方式验证此操作是否有效
resp = client.search( index="my-new-index-000001", size="0", filter_path="hits.total", ) print(resp)
response = client.search( index: 'my-new-index-000001', size: 0, filter_path: 'hits.total' ) puts response
const response = await client.search({ index: "my-new-index-000001", size: 0, filter_path: "hits.total", }); console.log(response);
POST my-new-index-000001/_search?size=0&filter_path=hits.total
这将产生一个像这样的合理 total
{ "hits": { "total" : { "value": 120, "relation": "eq" } } }
将 slices
设置为 auto
将让 Elasticsearch 选择要使用的切片数。此设置将为每个分片使用一个切片,最多达到一定限制。如果存在多个源,它将根据分片数最少的索引或 后备索引 来选择切片数。
将 slices
添加到 _reindex
只是自动化了上述部分中使用的手动过程,创建了子请求,这意味着它有一些怪癖
- 你可以在 任务 API 中查看这些请求。这些子请求是具有
slices
的请求任务的“子”任务。 - 获取具有
slices
的请求的任务状态仅包含已完成切片的状态。 - 这些子请求可以单独寻址,以进行诸如取消和重新节流之类的操作。
- 使用
slices
重新节流请求将按比例重新节流未完成的子请求。 - 使用
slices
取消请求将取消每个子请求。 - 由于
slices
的性质,每个子请求都不会获得完全均匀的文档部分。将处理所有文档,但是某些切片可能比其他切片大。期望较大的切片具有更均匀的分布。 - 具有
slices
的请求上的requests_per_second
和max_docs
之类的参数按比例分配给每个子请求。将此与上面有关分布不均匀的观点相结合,你应该得出结论,将max_docs
与slices
一起使用可能不会导致重新索引正好max_docs
个文档。 - 每个子请求都会获得稍微不同的源快照,尽管这些快照都是在大约同一时间拍摄的。
选择切片数
编辑如果自动切片,将 slices
设置为 auto
将为大多数索引选择合理的数字。如果手动切片或以其他方式调整自动切片,请使用以下准则。
当 slices
的数量等于索引中的分片数时,查询性能最有效。如果该数字很大(例如 500),请选择较小的数字,因为过多的 slices
会损害性能。将 slices
设置为高于分片数通常不会提高效率,并且会增加开销。
索引性能会随着切片数量线性扩展到可用资源。
查询性能还是索引性能在运行时中占主导地位取决于要重新索引的文档和集群资源。
重新索引路由
编辑默认情况下,如果 _reindex
看到具有路由的文档,则除非脚本对其进行更改,否则会保留该路由。你可以在 dest
请求上设置 routing
以更改此行为
-
keep
- 将每个匹配项发送的批量请求中的路由设置为匹配项上的路由。这是默认值。
-
discard
- 将每个匹配项发送的批量请求上的路由设置为
null
。 -
=<一些文本>
- 将每个匹配项的批量请求的路由设置为
=
之后的所有文本。
例如,您可以使用以下请求将所有公司名称为 cat
的文档从 source
复制到 dest
,并将路由设置为 cat
。
$params = [ 'body' => [ 'source' => [ 'index' => 'source', 'query' => [ 'match' => [ 'company' => 'cat', ], ], ], 'dest' => [ 'index' => 'dest', 'routing' => '=cat', ], ], ]; $response = $client->reindex($params);
resp = client.reindex( source={ "index": "source", "query": { "match": { "company": "cat" } } }, dest={ "index": "dest", "routing": "=cat" }, ) print(resp)
response = client.reindex( body: { source: { index: 'source', query: { match: { company: 'cat' } } }, dest: { index: 'dest', routing: '=cat' } } ) puts response
res, err := es.Reindex( strings.NewReader(`{ "source": { "index": "source", "query": { "match": { "company": "cat" } } }, "dest": { "index": "dest", "routing": "=cat" } }`)) fmt.Println(res, err)
const response = await client.reindex({ source: { index: "source", query: { match: { company: "cat", }, }, }, dest: { index: "dest", routing: "=cat", }, }); console.log(response);
POST _reindex { "source": { "index": "source", "query": { "match": { "company": "cat" } } }, "dest": { "index": "dest", "routing": "=cat" } }
默认情况下,_reindex
使用 1000 个滚动批次。您可以使用 source
元素中的 size
字段更改批次大小。
$params = [ 'body' => [ 'source' => [ 'index' => 'source', 'size' => 100, ], 'dest' => [ 'index' => 'dest', 'routing' => '=cat', ], ], ]; $response = $client->reindex($params);
resp = client.reindex( source={ "index": "source", "size": 100 }, dest={ "index": "dest", "routing": "=cat" }, ) print(resp)
response = client.reindex( body: { source: { index: 'source', size: 100 }, dest: { index: 'dest', routing: '=cat' } } ) puts response
res, err := es.Reindex( strings.NewReader(`{ "source": { "index": "source", "size": 100 }, "dest": { "index": "dest", "routing": "=cat" } }`)) fmt.Println(res, err)
const response = await client.reindex({ source: { index: "source", size: 100, }, dest: { index: "dest", routing: "=cat", }, }); console.log(response);
POST _reindex { "source": { "index": "source", "size": 100 }, "dest": { "index": "dest", "routing": "=cat" } }
使用 Ingest 管道重新索引
编辑通过指定一个 pipeline
,Reindex 还可以使用 Ingest 管道功能,如下所示
$params = [ 'body' => [ 'source' => [ 'index' => 'source', ], 'dest' => [ 'index' => 'dest', 'pipeline' => 'some_ingest_pipeline', ], ], ]; $response = $client->reindex($params);
resp = client.reindex( source={ "index": "source" }, dest={ "index": "dest", "pipeline": "some_ingest_pipeline" }, ) print(resp)
response = client.reindex( body: { source: { index: 'source' }, dest: { index: 'dest', pipeline: 'some_ingest_pipeline' } } ) puts response
res, err := es.Reindex( strings.NewReader(`{ "source": { "index": "source" }, "dest": { "index": "dest", "pipeline": "some_ingest_pipeline" } }`)) fmt.Println(res, err)
const response = await client.reindex({ source: { index: "source", }, dest: { index: "dest", pipeline: "some_ingest_pipeline", }, }); console.log(response);
POST _reindex { "source": { "index": "source" }, "dest": { "index": "dest", "pipeline": "some_ingest_pipeline" } }
查询参数
编辑-
refresh
- (可选,布尔值)如果为
true
,则请求会刷新受影响的分片,以使此操作对搜索可见。默认为false
。 -
timeout
-
(可选,时间单位)每个索引等待以下操作的周期
默认为
1m
(一分钟)。这保证了 Elasticsearch 在失败前至少等待超时时间。实际等待时间可能更长,尤其是在发生多次等待时。 -
wait_for_active_shards
-
(可选,字符串)在继续操作之前必须处于活动状态的每个分片的副本数。设置为
all
或任何非负整数,直到索引中每个分片的副本总数 (number_of_replicas+1
)。默认为1
,表示仅等待每个主分片处于活动状态。请参阅 活动分片。
-
wait_for_completion
- (可选,布尔值)如果为
true
,则请求将阻塞,直到操作完成。默认为true
。 -
requests_per_second
- (可选,整数)此请求的节流限制,以每秒子请求数为单位。默认为
-1
(不节流)。 -
require_alias
- (可选,布尔值)如果为
true
,则目标必须是 索引别名。默认为false
。 -
scroll
- (可选,时间单位)指定应为滚动搜索维护索引一致视图的时长。
-
slices
- (可选,整数)此任务应划分为的切片数。默认为 1,表示任务未切片为子任务。
-
max_docs
- (可选,整数)要处理的最大文档数。默认为所有文档。当设置为小于或等于
scroll_size
的值时,将不会使用滚动来检索操作结果。
请求体
编辑-
conflicts
- (可选,枚举)设置为
proceed
可继续重新索引,即使存在冲突。默认为abort
。 -
max_docs
- (可选,整数)要重新索引的最大文档数。如果 冲突 等于
proceed
,则重新索引可能会尝试从源重新索引比max_docs
更多的文档,直到它已成功将max_docs
个文档索引到目标中,或者它已遍历源查询中的每个文档。 -
source
-
-
index
- (必需,字符串)您要复制自其中的数据流、索引或别名的名称。也接受以逗号分隔的列表以从多个源重新索引。
-
query
- (可选,查询对象)使用查询 DSL 指定要重新索引的文档。
-
remote
-
size
- (可选,整数)每个批次要索引的文档数。从远程索引时使用,以确保批次适合堆上的缓冲区,该缓冲区默认为最大 100 MB。
-
slice
-
-
id
- (可选,整数)用于手动切片的切片 ID。
-
max
- (可选,整数)切片的总数。
-
-
sort
-
(可选,列表)以逗号分隔的
<field>:<direction>
对列表,用于在索引前进行排序。与max_docs
结合使用,以控制重新索引的文档。在 7.6 中已弃用。
重新索引中的排序已弃用。重新索引中的排序从未保证按顺序索引文档,并且会阻止重新索引的进一步开发,例如弹性性能改进。如果与
max_docs
结合使用,请考虑改用查询过滤器。 -
_source
- (可选,字符串)如果为
true
,则重新索引所有源字段。设置为列表以重新索引选定的字段。默认为true
。
-
-
dest
-
script
-
-
source
- (可选,字符串)在重新索引时运行以更新文档源或元数据的脚本。
-
lang
- (可选,枚举)脚本语言:
painless
、expression
、mustache
、java
。有关详细信息,请参阅 脚本。
-
响应体
编辑-
took
- (整数)整个操作所花费的总毫秒数。
-
timed_out
- (布尔值)如果重新索引期间执行的任何请求超时,则将此标志设置为
true
。 -
total
- (整数)已成功处理的文档数。
-
updated
- (整数)已成功更新的文档数,即重新索引更新它之前已存在的具有相同 ID 的文档。
-
created
- (整数)已成功创建的文档数。
-
deleted
- (整数)已成功删除的文档数。
-
batches
- (整数)重新索引拉回的滚动响应数。
-
noops
- (整数)由于用于重新索引的脚本为
ctx.op
返回noop
值而被忽略的文档数。 -
version_conflicts
- (整数)重新索引命中的版本冲突数。
-
retries
- (整数)重新索引尝试的重试次数。
bulk
是重试的批量操作数,search
是重试的搜索操作数。 -
throttled_millis
- (整数)请求为了符合
requests_per_second
而休眠的毫秒数。 -
requests_per_second
- (整数)重新索引期间有效执行的每秒请求数。
-
throttled_until_millis
- (整数)此字段在
_reindex
响应中应始终等于零。它仅在使用 任务 API 时才有意义,它指示为了符合requests_per_second
,节流请求将再次执行的下一个时间(以自纪元以来的毫秒为单位)。 -
failures
- (数组)如果在处理过程中出现任何无法恢复的错误,则为失败数组。如果此数组不为空,则由于这些失败,请求已中止。重新索引是使用批次实现的,任何失败都会导致整个过程中止,但当前批次中的所有失败都会收集到数组中。您可以使用
conflicts
选项来防止重新索引在版本冲突时中止。
示例
编辑使用查询重新索引选定的文档
编辑您可以通过向 source
添加查询来限制文档。例如,以下请求仅将 user.id
为 kimchy
的文档复制到 my-new-index-000001
resp = client.reindex( source={ "index": "my-index-000001", "query": { "term": { "user.id": "kimchy" } } }, dest={ "index": "my-new-index-000001" }, ) print(resp)
response = client.reindex( body: { source: { index: 'my-index-000001', query: { term: { 'user.id' => 'kimchy' } } }, dest: { index: 'my-new-index-000001' } } ) puts response
const response = await client.reindex({ source: { index: "my-index-000001", query: { term: { "user.id": "kimchy", }, }, }, dest: { index: "my-new-index-000001", }, }); console.log(response);
POST _reindex { "source": { "index": "my-index-000001", "query": { "term": { "user.id": "kimchy" } } }, "dest": { "index": "my-new-index-000001" } }
使用 max_docs
重新索引选定的文档
编辑您可以通过设置 max_docs
来限制处理的文档数。例如,此请求将单个文档从 my-index-000001
复制到 my-new-index-000001
resp = client.reindex( max_docs=1, source={ "index": "my-index-000001" }, dest={ "index": "my-new-index-000001" }, ) print(resp)
response = client.reindex( body: { max_docs: 1, source: { index: 'my-index-000001' }, dest: { index: 'my-new-index-000001' } } ) puts response
const response = await client.reindex({ max_docs: 1, source: { index: "my-index-000001", }, dest: { index: "my-new-index-000001", }, }); console.log(response);
POST _reindex { "max_docs": 1, "source": { "index": "my-index-000001" }, "dest": { "index": "my-new-index-000001" } }
从多个源重新索引
编辑source
中的 index
属性可以是列表,允许您在一个请求中从多个源复制。这将从 my-index-000001
和 my-index-000002
索引复制文档
resp = client.reindex( source={ "index": [ "my-index-000001", "my-index-000002" ] }, dest={ "index": "my-new-index-000002" }, ) print(resp)
response = client.reindex( body: { source: { index: [ 'my-index-000001', 'my-index-000002' ] }, dest: { index: 'my-new-index-000002' } } ) puts response
const response = await client.reindex({ source: { index: ["my-index-000001", "my-index-000002"], }, dest: { index: "my-new-index-000002", }, }); console.log(response);
POST _reindex { "source": { "index": ["my-index-000001", "my-index-000002"] }, "dest": { "index": "my-new-index-000002" } }
Reindex API 不会尽力处理 ID 冲突,因此最后写入的文档将“获胜”,但顺序通常是不可预测的,因此依赖此行为不是一个好主意。相反,请确保使用脚本使 ID 唯一。
使用源过滤器重新索引选定的字段
编辑您可以使用源筛选来重新索引原始文档中的字段子集。例如,以下请求仅重新索引每个文档的 user.id
和 _doc
字段
resp = client.reindex( source={ "index": "my-index-000001", "_source": [ "user.id", "_doc" ] }, dest={ "index": "my-new-index-000001" }, ) print(resp)
response = client.reindex( body: { source: { index: 'my-index-000001', _source: [ 'user.id', '_doc' ] }, dest: { index: 'my-new-index-000001' } } ) puts response
const response = await client.reindex({ source: { index: "my-index-000001", _source: ["user.id", "_doc"], }, dest: { index: "my-new-index-000001", }, }); console.log(response);
POST _reindex { "source": { "index": "my-index-000001", "_source": ["user.id", "_doc"] }, "dest": { "index": "my-new-index-000001" } }
重新索引以更改字段的名称
编辑_reindex
可用于构建具有重命名字段的索引副本。假设您创建了一个索引,其中包含如下所示的文档
resp = client.index( index="my-index-000001", id="1", refresh=True, document={ "text": "words words", "flag": "foo" }, ) print(resp)
response = client.index( index: 'my-index-000001', id: 1, refresh: true, body: { text: 'words words', flag: 'foo' } ) puts response
const response = await client.index({ index: "my-index-000001", id: 1, refresh: "true", document: { text: "words words", flag: "foo", }, }); console.log(response);
POST my-index-000001/_doc/1?refresh { "text": "words words", "flag": "foo" }
但您不喜欢名称 flag
,并且希望将其替换为 tag
。_reindex
可以为您创建另一个索引
resp = client.reindex( source={ "index": "my-index-000001" }, dest={ "index": "my-new-index-000001" }, script={ "source": "ctx._source.tag = ctx._source.remove(\"flag\")" }, ) print(resp)
response = client.reindex( body: { source: { index: 'my-index-000001' }, dest: { index: 'my-new-index-000001' }, script: { source: 'ctx._source.tag = ctx._source.remove("flag")' } } ) puts response
const response = await client.reindex({ source: { index: "my-index-000001", }, dest: { index: "my-new-index-000001", }, script: { source: 'ctx._source.tag = ctx._source.remove("flag")', }, }); console.log(response);
POST _reindex { "source": { "index": "my-index-000001" }, "dest": { "index": "my-new-index-000001" }, "script": { "source": "ctx._source.tag = ctx._source.remove(\"flag\")" } }
现在您可以获取新文档
resp = client.get( index="my-new-index-000001", id="1", ) print(resp)
response = client.get( index: 'my-new-index-000001', id: 1 ) puts response
const response = await client.get({ index: "my-new-index-000001", id: 1, }); console.log(response);
GET my-new-index-000001/_doc/1
这将返回
{ "found": true, "_id": "1", "_index": "my-new-index-000001", "_version": 1, "_seq_no": 44, "_primary_term": 1, "_source": { "text": "words words", "tag": "foo" } }
重新索引每日索引
编辑您可以将 _reindex
与 Painless 结合使用,以重新索引每日索引,从而将新模板应用于现有文档。
假设您的索引包含如下所示的文档
$params = [ 'index' => 'metricbeat-2016.05.30', 'id' => '1', 'body' => [ 'system.cpu.idle.pct' => 0.908, ], ]; $response = $client->index($params); $params = [ 'index' => 'metricbeat-2016.05.31', 'id' => '1', 'body' => [ 'system.cpu.idle.pct' => 0.105, ], ]; $response = $client->index($params);
resp = client.index( index="metricbeat-2016.05.30", id="1", refresh=True, document={ "system.cpu.idle.pct": 0.908 }, ) print(resp) resp1 = client.index( index="metricbeat-2016.05.31", id="1", refresh=True, document={ "system.cpu.idle.pct": 0.105 }, ) print(resp1)
response = client.index( index: 'metricbeat-2016.05.30', id: 1, refresh: true, body: { 'system.cpu.idle.pct' => 0.908 } ) puts response response = client.index( index: 'metricbeat-2016.05.31', id: 1, refresh: true, body: { 'system.cpu.idle.pct' => 0.105 } ) puts response
{ res, err := es.Index( "metricbeat-2016.05.30", strings.NewReader(`{ "system.cpu.idle.pct": 0.908 }`), es.Index.WithDocumentID("1"), es.Index.WithRefresh("true"), es.Index.WithPretty(), ) fmt.Println(res, err) } { res, err := es.Index( "metricbeat-2016.05.31", strings.NewReader(`{ "system.cpu.idle.pct": 0.105 }`), es.Index.WithDocumentID("1"), es.Index.WithRefresh("true"), es.Index.WithPretty(), ) fmt.Println(res, err) }
const response = await client.index({ index: "metricbeat-2016.05.30", id: 1, refresh: "true", document: { "system.cpu.idle.pct": 0.908, }, }); console.log(response); const response1 = await client.index({ index: "metricbeat-2016.05.31", id: 1, refresh: "true", document: { "system.cpu.idle.pct": 0.105, }, }); console.log(response1);
PUT metricbeat-2016.05.30/_doc/1?refresh {"system.cpu.idle.pct": 0.908} PUT metricbeat-2016.05.31/_doc/1?refresh {"system.cpu.idle.pct": 0.105}
metricbeat-*
索引的新模板已加载到 Elasticsearch 中,但它仅适用于新创建的索引。Painless 可用于重新索引现有文档并应用新模板。
以下脚本从索引名称中提取日期,并创建一个附加了 -1
的新索引。所有来自 metricbeat-2016.05.31
的数据都将被重新索引到 metricbeat-2016.05.31-1
中。
$params = [ 'body' => [ 'source' => [ 'index' => 'metricbeat-*', ], 'dest' => [ 'index' => 'metricbeat', ], 'script' => [ 'lang' => 'painless', 'source' => 'ctx._index = \'metricbeat-\' + (ctx._index.substring(\'metricbeat-\'.length(), ctx._index.length())) + \'-1\'', ], ], ]; $response = $client->reindex($params);
resp = client.reindex( source={ "index": "metricbeat-*" }, dest={ "index": "metricbeat" }, script={ "lang": "painless", "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'" }, ) print(resp)
response = client.reindex( body: { source: { index: 'metricbeat-*' }, dest: { index: 'metricbeat' }, script: { lang: 'painless', source: "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'" } } ) puts response
res, err := es.Reindex( strings.NewReader(`{ "source": { "index": "metricbeat-*" }, "dest": { "index": "metricbeat" }, "script": { "lang": "painless", "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'" } }`)) fmt.Println(res, err)
const response = await client.reindex({ source: { index: "metricbeat-*", }, dest: { index: "metricbeat", }, script: { lang: "painless", source: "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'", }, }); console.log(response);
POST _reindex { "source": { "index": "metricbeat-*" }, "dest": { "index": "metricbeat" }, "script": { "lang": "painless", "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'" } }
现在可以在 *-1
索引中找到之前 metricbeat 索引中的所有文档。
$params = [ 'index' => 'metricbeat-2016.05.30-1', 'id' => '1', ]; $response = $client->get($params); $params = [ 'index' => 'metricbeat-2016.05.31-1', 'id' => '1', ]; $response = $client->get($params);
resp = client.get( index="metricbeat-2016.05.30-1", id="1", ) print(resp) resp1 = client.get( index="metricbeat-2016.05.31-1", id="1", ) print(resp1)
response = client.get( index: 'metricbeat-2016.05.30-1', id: 1 ) puts response response = client.get( index: 'metricbeat-2016.05.31-1', id: 1 ) puts response
{ res, err := es.Get("metricbeat-2016.05.30-1", "1", es.Get.WithPretty()) fmt.Println(res, err) } { res, err := es.Get("metricbeat-2016.05.31-1", "1", es.Get.WithPretty()) fmt.Println(res, err) }
const response = await client.get({ index: "metricbeat-2016.05.30-1", id: 1, }); console.log(response); const response1 = await client.get({ index: "metricbeat-2016.05.31-1", id: 1, }); console.log(response1);
GET metricbeat-2016.05.30-1/_doc/1 GET metricbeat-2016.05.31-1/_doc/1
先前的方法还可以与更改字段名称结合使用,以仅将现有数据加载到新索引中,并根据需要重命名字段。
提取源的随机子集
编辑_reindex
可以用来提取源的随机子集以进行测试。
resp = client.reindex( max_docs=10, source={ "index": "my-index-000001", "query": { "function_score": { "random_score": {}, "min_score": 0.9 } } }, dest={ "index": "my-new-index-000001" }, ) print(resp)
response = client.reindex( body: { max_docs: 10, source: { index: 'my-index-000001', query: { function_score: { random_score: {}, min_score: 0.9 } } }, dest: { index: 'my-new-index-000001' } } ) puts response
const response = await client.reindex({ max_docs: 10, source: { index: "my-index-000001", query: { function_score: { random_score: {}, min_score: 0.9, }, }, }, dest: { index: "my-new-index-000001", }, }); console.log(response);
在重新索引期间修改文档
编辑与 _update_by_query
类似,_reindex
支持修改文档的脚本。与 _update_by_query
不同,该脚本允许修改文档的元数据。此示例会增加源文档的版本。
resp = client.reindex( source={ "index": "my-index-000001" }, dest={ "index": "my-new-index-000001", "version_type": "external" }, script={ "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", "lang": "painless" }, ) print(resp)
response = client.reindex( body: { source: { index: 'my-index-000001' }, dest: { index: 'my-new-index-000001', version_type: 'external' }, script: { source: "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", lang: 'painless' } } ) puts response
const response = await client.reindex({ source: { index: "my-index-000001", }, dest: { index: "my-new-index-000001", version_type: "external", }, script: { source: "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", lang: "painless", }, }); console.log(response);
POST _reindex { "source": { "index": "my-index-000001" }, "dest": { "index": "my-new-index-000001", "version_type": "external" }, "script": { "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", "lang": "painless" } }
就像在 _update_by_query
中一样,您可以设置 ctx.op
来更改在目标上执行的操作。
将 ctx.op
设置为其他任何值将返回错误,设置 ctx
中的任何其他字段也会返回错误。
想想这些可能性!请务必小心;您可以更改
-
_id
-
_index
-
_version
-
_routing
将 _version
设置为 null
或从 ctx
映射中清除它,就像在索引请求中不发送版本一样;这将导致文档在目标中被覆盖,而无论目标上的版本或您在 _reindex
请求中使用的版本类型。
从远程重新索引
编辑重新索引支持从远程 Elasticsearch 集群重新索引
resp = client.reindex( source={ "remote": { "host": "http://otherhost:9200", "username": "user", "password": "pass" }, "index": "my-index-000001", "query": { "match": { "test": "data" } } }, dest={ "index": "my-new-index-000001" }, ) print(resp)
response = client.reindex( body: { source: { remote: { host: 'http://otherhost:9200', username: 'user', password: 'pass' }, index: 'my-index-000001', query: { match: { test: 'data' } } }, dest: { index: 'my-new-index-000001' } } ) puts response
const response = await client.reindex({ source: { remote: { host: "http://otherhost:9200", username: "user", password: "pass", }, index: "my-index-000001", query: { match: { test: "data", }, }, }, dest: { index: "my-new-index-000001", }, }); console.log(response);
POST _reindex { "source": { "remote": { "host": "http://otherhost:9200", "username": "user", "password": "pass" }, "index": "my-index-000001", "query": { "match": { "test": "data" } } }, "dest": { "index": "my-new-index-000001" } }
host
参数必须包含方案、主机、端口(例如 https://otherhost:9200
)和可选路径(例如 https://otherhost:9200/proxy
)。username
和 password
参数是可选的,当它们存在时,_reindex
将使用基本身份验证连接到远程 Elasticsearch 节点。使用基本身份验证时,请务必使用 https
,否则密码将以纯文本形式发送。有一系列的设置可用于配置 https
连接的行为。
使用 Elastic Cloud 时,也可以通过使用有效的 API 密钥对远程集群进行身份验证。
resp = client.reindex( source={ "remote": { "host": "http://otherhost:9200", "headers": { "Authorization": "ApiKey API_KEY_VALUE" } }, "index": "my-index-000001", "query": { "match": { "test": "data" } } }, dest={ "index": "my-new-index-000001" }, ) print(resp)
response = client.reindex( body: { source: { remote: { host: 'http://otherhost:9200', headers: { "Authorization": 'ApiKey API_KEY_VALUE' } }, index: 'my-index-000001', query: { match: { test: 'data' } } }, dest: { index: 'my-new-index-000001' } } ) puts response
const response = await client.reindex({ source: { remote: { host: "http://otherhost:9200", headers: { Authorization: "ApiKey API_KEY_VALUE", }, }, index: "my-index-000001", query: { match: { test: "data", }, }, }, dest: { index: "my-new-index-000001", }, }); console.log(response);
POST _reindex { "source": { "remote": { "host": "http://otherhost:9200", "headers": { "Authorization": "ApiKey API_KEY_VALUE" } }, "index": "my-index-000001", "query": { "match": { "test": "data" } } }, "dest": { "index": "my-new-index-000001" } }
必须在 elasticsearch.yml
中使用 reindex.remote.whitelist
属性显式允许远程主机。它可以设置为允许的远程 host
和 port
组合的逗号分隔列表。忽略方案,仅使用主机和端口。例如
reindex.remote.whitelist: [otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*"]
必须在任何将协调重新索引的节点上配置允许的主机列表。
此功能应适用于您可能找到的任何版本的 Elasticsearch 远程集群。这应该允许您通过从旧版本的集群重新索引来将任何版本的 Elasticsearch 升级到当前版本。
Elasticsearch 不支持跨主要版本的前向兼容性。例如,您不能从 7.x 集群重新索引到 6.x 集群。
为了启用发送到旧版本 Elasticsearch 的查询,query
参数将直接发送到远程主机,而无需验证或修改。
从远程服务器重新索引使用一个默认最大大小为 100MB 的堆上缓冲区。如果远程索引包含非常大的文档,则需要使用较小的批处理大小。下面的示例将批处理大小设置为 10
,这是非常小的。
POST _reindex { "source": { "remote": { "host": "http://otherhost:9200", ... }, "index": "source", "size": 10, "query": { "match": { "test": "data" } } }, "dest": { "index": "dest" } }
也可以使用 socket_timeout
字段设置远程连接上的套接字读取超时,并使用 connect_timeout
字段设置连接超时。两者默认都为 30 秒。此示例将套接字读取超时设置为一分钟,将连接超时设置为 10 秒。
POST _reindex { "source": { "remote": { "host": "http://otherhost:9200", ..., "socket_timeout": "1m", "connect_timeout": "10s" }, "index": "source", "query": { "match": { "test": "data" } } }, "dest": { "index": "dest" } }
配置 SSL 参数
编辑从远程重新索引支持可配置的 SSL 设置。这些必须在 elasticsearch.yml
文件中指定,安全设置除外,安全设置是在 Elasticsearch 密钥库中添加的。无法在 _reindex
请求的正文中配置 SSL。
支持以下设置
-
reindex.ssl.certificate_authorities
- 应信任的 PEM 编码证书文件的路径列表。您不能同时指定
reindex.ssl.certificate_authorities
和reindex.ssl.truststore.path
。 -
reindex.ssl.truststore.path
- 包含要信任的证书的 Java 密钥库文件的路径。此密钥库可以是“JKS”或“PKCS#12”格式。您不能同时指定
reindex.ssl.certificate_authorities
和reindex.ssl.truststore.path
。 -
reindex.ssl.truststore.password
- 密钥库 (
reindex.ssl.truststore.path
) 的密码。 [7.17.0] 在 7.17.0 中已弃用。 请优先使用reindex.ssl.truststore.secure_password
。此设置不能与reindex.ssl.truststore.secure_password
一起使用。 -
reindex.ssl.truststore.secure_password
(安全) - 密钥库 (
reindex.ssl.truststore.path
) 的密码。此设置不能与reindex.ssl.truststore.password
一起使用。 -
reindex.ssl.truststore.type
- 密钥库 (
reindex.ssl.truststore.path
) 的类型。必须为jks
或PKCS12
。如果密钥库路径以“.p12”、“.pfx”或“pkcs12”结尾,则此设置默认为PKCS12
。否则,它默认为jks
。 -
reindex.ssl.verification_mode
- 指示用于防止中间人攻击和证书伪造的验证类型。可以是
full
(验证主机名和证书路径)、certificate
(验证证书路径,但不验证主机名)或none
(不执行任何验证 - 强烈建议不要在生产环境中使用)。默认为full
。 -
reindex.ssl.certificate
- 指定用于 HTTP 客户端身份验证的 PEM 编码证书(或证书链)的路径(如果远程集群需要)。此设置要求还设置
reindex.ssl.key
。您不能同时指定reindex.ssl.certificate
和reindex.ssl.keystore.path
。 -
reindex.ssl.key
- 指定与用于客户端身份验证的证书关联的 PEM 编码私钥的路径(
reindex.ssl.certificate
)。您不能同时指定reindex.ssl.key
和reindex.ssl.keystore.path
。 -
reindex.ssl.key_passphrase
- 指定用于解密 PEM 编码私钥 (
reindex.ssl.key
) 的密码(如果已加密)。 [7.17.0] 在 7.17.0 中已弃用。 请优先使用reindex.ssl.secure_key_passphrase
。不能与reindex.ssl.secure_key_passphrase
一起使用。 -
reindex.ssl.secure_key_passphrase
(安全) - 指定用于解密 PEM 编码私钥 (
reindex.ssl.key
) 的密码(如果已加密)。不能与reindex.ssl.key_passphrase
一起使用。 -
reindex.ssl.keystore.path
- 指定密钥库的路径,该密钥库包含用于 HTTP 客户端身份验证的私钥和证书(如果远程集群需要)。此密钥库可以是“JKS”或“PKCS#12”格式。您不能同时指定
reindex.ssl.key
和reindex.ssl.keystore.path
。 -
reindex.ssl.keystore.type
- 密钥库 (
reindex.ssl.keystore.path
) 的类型。必须为jks
或PKCS12
。如果密钥库路径以“.p12”、“.pfx”或“pkcs12”结尾,则此设置默认为PKCS12
。否则,它默认为jks
。 -
reindex.ssl.keystore.password
- 密钥库 (
reindex.ssl.keystore.path
) 的密码。 [7.17.0] 在 7.17.0 中已弃用。 请优先使用reindex.ssl.keystore.secure_password
。此设置不能与reindex.ssl.keystore.secure_password
一起使用。 -
reindex.ssl.keystore.secure_password
(安全) - 密钥库 (
reindex.ssl.keystore.path
) 的密码。此设置不能与reindex.ssl.keystore.password
一起使用。 -
reindex.ssl.keystore.key_password
- 密钥库 (
reindex.ssl.keystore.path
) 中密钥的密码。默认为密钥库密码。 [7.17.0] 在 7.17.0 中已弃用。 请优先使用reindex.ssl.keystore.secure_key_password
。此设置不能与reindex.ssl.keystore.secure_key_password
一起使用。 -
reindex.ssl.keystore.secure_key_password
(安全) - 密钥库 (
reindex.ssl.keystore.path
) 中密钥的密码。默认为密钥库密码。此设置不能与reindex.ssl.keystore.key_password
一起使用。