Update By Query API
编辑Update By Query API
编辑更新匹配指定查询的文档。如果未指定查询,则对数据流或索引中的每个文档执行更新,而不修改源,这对于获取映射更改很有用。
resp = client.update_by_query( index="my-index-000001", conflicts="proceed", ) print(resp)
response = client.update_by_query( index: 'my-index-000001', conflicts: 'proceed' ) puts response
const response = await client.updateByQuery({ index: "my-index-000001", conflicts: "proceed", }); console.log(response);
POST my-index-000001/_update_by_query?conflicts=proceed
请求
编辑POST /<目标>/_update_by_query
描述
编辑您可以使用与搜索 API相同的语法,在请求 URI 或请求正文中指定查询条件。
当您提交按查询更新请求时,Elasticsearch 会在开始处理请求时获取数据流或索引的快照,并使用 internal
版本控制更新匹配的文档。当版本匹配时,文档将被更新,并且版本号会递增。如果在快照拍摄时间和更新操作处理之间文档发生更改,则会导致版本冲突并且操作失败。您可以选择计数版本冲突而不是停止并返回,方法是将 conflicts
设置为 proceed
。请注意,如果您选择计数版本冲突,则该操作可能会尝试从源更新比 max_docs
更多的文档,直到它成功更新了 max_docs
个文档,或者它已经遍历了源查询中的每个文档。
版本等于 0 的文档不能使用按查询更新进行更新,因为 internal
版本控制不支持 0 作为有效的版本号。
在处理按查询更新请求时,Elasticsearch 会依次执行多个搜索请求以查找所有匹配的文档。对每批匹配的文档执行批量更新请求。任何查询或更新失败都会导致按查询更新请求失败,并且失败会显示在响应中。任何成功完成的更新请求仍然会保留,它们不会被回滚。
刷新分片
编辑指定 refresh
参数会在请求完成后刷新所有分片。这与更新 API 的 refresh
参数不同,后者仅导致接收请求的分片被刷新。与更新 API 不同,它不支持 wait_for
。
异步运行按查询更新
编辑如果请求包含 wait_for_completion=false
,Elasticsearch 会执行一些预检,启动请求,并返回一个 task
,您可以使用该任务来取消或获取任务的状态。Elasticsearch 会在 .tasks/task/${taskId}
处以文档形式创建此任务的记录。
等待活动分片
编辑wait_for_active_shards
控制在继续执行请求之前必须有多少分片副本处于活动状态。有关详细信息,请参阅活动分片。timeout
控制每个写入请求等待不可用分片变为可用的时间。它们的工作方式与在批量 API中完全相同。按查询更新使用滚动搜索,因此您也可以指定 scroll
参数来控制它保持搜索上下文活动的时间,例如 ?scroll=10m
。默认值为 5 分钟。
限制更新请求
编辑要控制按查询更新发出批量更新操作的速率,您可以将 requests_per_second
设置为任何正十进制数。这会在每个批次中添加等待时间以限制速率。将 requests_per_second
设置为 -1
可禁用限制。
限制使用批次之间的等待时间,以便可以为内部滚动请求提供一个超时时间,该超时时间会将请求填充考虑在内。填充时间是批次大小除以 requests_per_second
与写入所花费时间之间的差值。默认情况下,批次大小为 1000
,因此如果 requests_per_second
设置为 500
target_time = 1000 / 500 per second = 2 seconds wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
由于批次是作为单个 _bulk
请求发出的,因此较大的批次大小会导致 Elasticsearch 创建许多请求并在启动下一组请求之前等待。这是“突发”而不是“平滑”。
路径参数
编辑-
<目标>
- (可选,字符串)要搜索的数据流、索引和别名的逗号分隔列表。支持通配符(
*
)。要搜索所有数据流或索引,请省略此参数或使用*
或_all
。
查询参数
编辑-
allow_no_indices
-
(可选,布尔值)如果为
false
,则当任何通配符表达式、索引别名或_all
值仅以缺失或关闭的索引为目标时,请求会返回错误。即使请求以其他打开的索引为目标,此行为也适用。例如,如果一个索引以foo
开头,但没有索引以bar
开头,则以foo*,bar*
为目标的请求会返回错误。默认为
true
。 -
analyzer
-
(可选,字符串)用于查询字符串的分析器。
此参数只能在指定
q
查询字符串参数时使用。 -
analyze_wildcard
-
(可选,布尔值)如果为
true
,则分析通配符和前缀查询。默认为false
。此参数只能在指定
q
查询字符串参数时使用。 -
conflicts
- (可选,字符串)如果按查询更新遇到版本冲突,则执行的操作:
abort
或proceed
。默认为abort
。 -
default_operator
-
(可选,字符串)查询字符串查询的默认运算符:AND 或 OR。默认为
OR
。此参数只能在指定
q
查询字符串参数时使用。 -
df
-
(可选,字符串)用作默认字段的字段,其中查询字符串中没有给出字段前缀。
此参数只能在指定
q
查询字符串参数时使用。 -
expand_wildcards
-
(可选,字符串)通配符模式可以匹配的索引类型。如果请求可以以数据流为目标,则此参数确定通配符表达式是否匹配隐藏的数据流。支持逗号分隔的值,例如
open,hidden
。有效值为-
all
- 匹配任何数据流或索引,包括隐藏的数据流或索引。
-
open
- 匹配打开的非隐藏索引。也匹配任何非隐藏的数据流。
-
closed
- 匹配关闭的非隐藏索引。也匹配任何非隐藏的数据流。数据流无法关闭。
-
hidden
- 匹配隐藏的数据流和隐藏的索引。必须与
open
、closed
或两者结合使用。 -
none
- 不接受通配符模式。
默认为
open
。 -
-
ignore_unavailable
- (可选,布尔值)如果为
false
,则如果请求以缺失或关闭的索引为目标,则会返回错误。默认为false
。 -
lenient
-
(可选,布尔值)如果为
true
,则会忽略查询字符串中基于格式的查询失败(例如,向数字字段提供文本)。默认为false
。此参数只能在指定
q
查询字符串参数时使用。 -
max_docs
- (可选,整数)要处理的最大文档数。默认为所有文档。当设置为小于或等于
scroll_size
的值时,将不会使用滚动来检索操作的结果。 -
pipeline
- (可选,字符串)用于预处理传入文档的管道的 ID。如果索引指定了默认的摄取管道,则将值设置为
_none
将禁用此请求的默认摄取管道。如果配置了最终管道,则无论此参数的值如何,它都将始终运行。 -
preference
- (可选,字符串)指定应在其上执行操作的节点或分片。默认为随机。
-
q
- (可选,字符串)Lucene 查询字符串语法中的查询。
-
request_cache
- (可选,布尔值)如果为
true
,则将对此请求使用请求缓存。默认为索引级别的设置。 -
refresh
- (可选,布尔值)如果为
true
,则 Elasticsearch 会刷新受影响的分片以使操作对搜索可见。默认为false
。 -
requests_per_second
- (可选,整数)此请求的限制,以每秒子请求数表示。默认为
-1
(无限制)。 -
routing
- (可选,字符串)用于将操作路由到特定分片的自定义值。
-
scroll
- (可选,时间值)保留搜索上下文以进行滚动的时间段。请参阅滚动搜索结果。
-
scroll_size
- (可选,整数)支持操作的滚动请求的大小。默认为 1000。
-
search_type
-
(可选,字符串)搜索操作的类型。可用选项
-
query_then_fetch
-
dfs_query_then_fetch
-
-
search_timeout
- (可选,时间单位)每个搜索请求的显式超时。默认为无超时。
-
slices
- (可选,整数)此任务应分为的切片数。默认为 1,表示任务未切片为子任务。
-
sort
- (可选,字符串)<字段>:<方向> 对的逗号分隔列表。
-
stats
- (可选,字符串)用于记录和统计目的的请求的特定
tag
。 -
terminate_after
-
(可选,整数)每个分片要收集的最大文档数。如果查询达到此限制,Elasticsearch 会提前终止查询。Elasticsearch 会在排序之前收集文档。
请谨慎使用。Elasticsearch 会将此参数应用于处理请求的每个分片。如果可能,请让 Elasticsearch 自动执行提前终止。避免为以跨多个数据层支持索引的数据流为目标的请求指定此参数。
-
timeout
-
(可选,时间单位)每个更新请求等待以下操作的周期
- 动态映射更新
- 等待活动分片
默认为
1m
(一分钟)。这保证了 Elasticsearch 在失败之前至少等待超时时间。实际等待时间可能会更长,尤其是在发生多次等待时。 -
version
- (可选,布尔值)如果为
true
,则返回文档版本作为命中的一部分。 -
wait_for_active_shards
-
(可选,字符串)在继续操作之前必须处于活动状态的每个分片的副本数。设置为
all
或任何非负整数,直到索引中每个分片的副本总数(number_of_replicas+1
)。默认为1
,表示仅等待每个主分片处于活动状态。请参阅 活动分片。
响应正文
编辑-
took
- 整个操作从开始到结束的毫秒数。
-
timed_out
- 如果在按查询执行更新期间执行的任何请求超时,则此标志设置为
true
。 -
total
- 已成功处理的文档数。
-
updated
- 已成功更新的文档数。
-
deleted
- 已成功删除的文档数。
-
batches
- 按查询更新拉回的滚动响应数。
-
version_conflicts
- 按查询更新命中的版本冲突数。
-
noops
- 由于用于按查询更新的脚本为
ctx.op
返回了noop
值而被忽略的文档数。 -
retries
- 按查询更新尝试的重试次数。
bulk
是重试的批量操作数,而search
是重试的搜索操作数。 -
throttled_millis
- 请求为了符合
requests_per_second
而休眠的毫秒数。 -
requests_per_second
- 按查询更新期间实际执行的每秒请求数。
-
throttled_until_millis
- 在
_update_by_query
响应中,此字段应始终等于零。它仅在使用 任务 API 时才有意义,在任务 API 中,它表示为了符合requests_per_second
,再次执行限制请求的下一个时间(自 epoch 以来的毫秒数)。 -
failures
- 如果在过程中存在任何不可恢复的错误,则会显示错误数组。如果此数组非空,则表示由于这些错误,请求中止。按查询更新使用批处理实现。任何失败都会导致整个过程中止,但当前批次中的所有失败都将收集到数组中。您可以使用
conflicts
选项来防止重新索引因版本冲突而中止。
示例
编辑_update_by_query
的最简单用法只是在数据流或索引中的每个文档上执行更新,而不更改源。这对于获取新属性或某些其他在线映射更改非常有用。
要更新所选文档,请在请求正文中指定查询
resp = client.update_by_query( index="my-index-000001", conflicts="proceed", query={ "term": { "user.id": "kimchy" } }, ) print(resp)
response = client.update_by_query( index: 'my-index-000001', conflicts: 'proceed', body: { query: { term: { 'user.id' => 'kimchy' } } } ) puts response
const response = await client.updateByQuery({ index: "my-index-000001", conflicts: "proceed", query: { term: { "user.id": "kimchy", }, }, }); console.log(response);
POST my-index-000001/_update_by_query?conflicts=proceed { "query": { "term": { "user.id": "kimchy" } } }
必须将查询作为值传递给 |
更新多个数据流或索引中的文档
resp = client.update_by_query( index="my-index-000001,my-index-000002", ) print(resp)
response = client.update_by_query( index: 'my-index-000001,my-index-000002' ) puts response
const response = await client.updateByQuery({ index: "my-index-000001,my-index-000002", }); console.log(response);
POST my-index-000001,my-index-000002/_update_by_query
将按查询更新操作限制为具有特定路由值的分片
resp = client.update_by_query( index="my-index-000001", routing="1", ) print(resp)
response = client.update_by_query( index: 'my-index-000001', routing: 1 ) puts response
const response = await client.updateByQuery({ index: "my-index-000001", routing: 1, }); console.log(response);
POST my-index-000001/_update_by_query?routing=1
默认情况下,按查询更新使用 1000 个滚动批次。您可以使用 scroll_size
参数更改批次大小
resp = client.update_by_query( index="my-index-000001", scroll_size="100", ) print(resp)
response = client.update_by_query( index: 'my-index-000001', scroll_size: 100 ) puts response
const response = await client.updateByQuery({ index: "my-index-000001", scroll_size: 100, }); console.log(response);
POST my-index-000001/_update_by_query?scroll_size=100
使用唯一属性更新文档
resp = client.update_by_query( index="my-index-000001", query={ "term": { "user.id": "kimchy" } }, max_docs=1, ) print(resp)
response = client.update_by_query( index: 'my-index-000001', body: { query: { term: { 'user.id' => 'kimchy' } }, max_docs: 1 } ) puts response
const response = await client.updateByQuery({ index: "my-index-000001", query: { term: { "user.id": "kimchy", }, }, max_docs: 1, }); console.log(response);
POST my-index-000001/_update_by_query { "query": { "term": { "user.id": "kimchy" } }, "max_docs": 1 }
更新文档源
编辑按查询更新支持使用脚本来更新文档源。例如,以下请求会递增 my-index-000001
中 user.id
为 kimchy
的所有文档的 count
字段
resp = client.update_by_query( index="my-index-000001", script={ "source": "ctx._source.count++", "lang": "painless" }, query={ "term": { "user.id": "kimchy" } }, ) print(resp)
response = client.update_by_query( index: 'my-index-000001', body: { script: { source: 'ctx._source.count++', lang: 'painless' }, query: { term: { 'user.id' => 'kimchy' } } } ) puts response
const response = await client.updateByQuery({ index: "my-index-000001", script: { source: "ctx._source.count++", lang: "painless", }, query: { term: { "user.id": "kimchy", }, }, }); console.log(response);
POST my-index-000001/_update_by_query { "script": { "source": "ctx._source.count++", "lang": "painless" }, "query": { "term": { "user.id": "kimchy" } } }
请注意,此示例中未指定 conflicts=proceed
。在这种情况下,版本冲突应停止该过程,以便您可以处理失败。
与 更新 API 一样,您可以设置 ctx.op
来更改执行的操作
|
如果您的脚本确定它无需进行任何更改,请设置 |
|
如果您的脚本确定应删除文档,请设置 |
按查询更新仅支持 index
、noop
和 delete
。将 ctx.op
设置为其他任何内容都是错误的。设置 ctx
中的任何其他字段都是错误的。此 API 仅允许您修改匹配文档的源,而不能移动它们。
使用引入管道更新文档
编辑按查询更新可以通过指定 pipeline
来使用 引入管道功能
resp = client.ingest.put_pipeline( id="set-foo", description="sets foo", processors=[ { "set": { "field": "foo", "value": "bar" } } ], ) print(resp) resp1 = client.update_by_query( index="my-index-000001", pipeline="set-foo", ) print(resp1)
response = client.ingest.put_pipeline( id: 'set-foo', body: { description: 'sets foo', processors: [ { set: { field: 'foo', value: 'bar' } } ] } ) puts response response = client.update_by_query( index: 'my-index-000001', pipeline: 'set-foo' ) puts response
const response = await client.ingest.putPipeline({ id: "set-foo", description: "sets foo", processors: [ { set: { field: "foo", value: "bar", }, }, ], }); console.log(response); const response1 = await client.updateByQuery({ index: "my-index-000001", pipeline: "set-foo", }); console.log(response1);
PUT _ingest/pipeline/set-foo { "description" : "sets foo", "processors" : [ { "set" : { "field": "foo", "value": "bar" } } ] } POST my-index-000001/_update_by_query?pipeline=set-foo
获取按查询更新操作的状态
编辑您可以使用 任务 API 获取所有正在运行的按查询更新请求的状态
$response = $client->tasks()->list();
resp = client.tasks.list( detailed=True, actions="*byquery", ) print(resp)
response = client.tasks.list( detailed: true, actions: '*byquery' ) puts response
res, err := es.Tasks.List( es.Tasks.List.WithActions("*byquery"), es.Tasks.List.WithDetailed(true), ) fmt.Println(res, err)
const response = await client.tasks.list({ detailed: "true", actions: "*byquery", }); console.log(response);
GET _tasks?detailed=true&actions=*byquery
响应如下所示
{ "nodes" : { "r1A2WoRbTwKZ516z6NEs5A" : { "name" : "r1A2WoR", "transport_address" : "127.0.0.1:9300", "host" : "127.0.0.1", "ip" : "127.0.0.1:9300", "attributes" : { "testattr" : "test", "portsfile" : "true" }, "tasks" : { "r1A2WoRbTwKZ516z6NEs5A:36619" : { "node" : "r1A2WoRbTwKZ516z6NEs5A", "id" : 36619, "type" : "transport", "action" : "indices:data/write/update/byquery", "status" : { "total" : 6154, "updated" : 3500, "created" : 0, "deleted" : 0, "batches" : 4, "version_conflicts" : 0, "noops" : 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0 }, "description" : "" } } } } }
此对象包含实际状态。它类似于响应 JSON,但重要增加了 |
使用任务 ID,您可以直接查找任务。以下示例检索有关任务 r1A2WoRbTwKZ516z6NEs5A:36619
的信息
$params = [ 'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619', ]; $response = $client->tasks()->get($params);
resp = client.tasks.get( task_id="r1A2WoRbTwKZ516z6NEs5A:36619", ) print(resp)
response = client.tasks.get( task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619' ) puts response
res, err := es.Tasks.Get( "r1A2WoRbTwKZ516z6NEs5A:36619", ) fmt.Println(res, err)
const response = await client.tasks.get({ task_id: "r1A2WoRbTwKZ516z6NEs5A:36619", }); console.log(response);
GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619
此 API 的优势在于,它与 wait_for_completion=false
集成,以透明地返回已完成任务的状态。如果任务已完成,并且在其上设置了 wait_for_completion=false
,则它将返回 results
或 error
字段。此功能的代价是 wait_for_completion=false
在 .tasks/task/${taskId}
创建的文档。您需要删除该文档。
取消按查询更新操作
编辑可以使用 任务取消 API 取消任何按查询更新
$params = [ 'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619', ]; $response = $client->tasks()->cancel($params);
resp = client.tasks.cancel( task_id="r1A2WoRbTwKZ516z6NEs5A:36619", ) print(resp)
response = client.tasks.cancel( task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619' ) puts response
res, err := es.Tasks.Cancel( es.Tasks.Cancel.WithTaskID("r1A2WoRbTwKZ516z6NEs5A:36619"), ) fmt.Println(res, err)
const response = await client.tasks.cancel({ task_id: "r1A2WoRbTwKZ516z6NEs5A:36619", }); console.log(response);
POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel
可以使用 任务 API 找到任务 ID。
取消操作应该会很快完成,但可能需要几秒钟。上面的任务状态 API 将继续列出按查询更新任务,直到此任务检查到它已被取消并自行终止。
更改请求的限制
编辑可以使用 _rethrottle
API 更改正在运行的按查询更新的 requests_per_second
值
$params = [ 'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619', ]; $response = $client->updateByQueryRethrottle($params);
resp = client.update_by_query_rethrottle( task_id="r1A2WoRbTwKZ516z6NEs5A:36619", requests_per_second="-1", ) print(resp)
response = client.update_by_query_rethrottle( task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619', requests_per_second: -1 ) puts response
res, err := es.UpdateByQueryRethrottle( "r1A2WoRbTwKZ516z6NEs5A:36619", esapi.IntPtr(-1), ) fmt.Println(res, err)
const response = await client.updateByQueryRethrottle({ task_id: "r1A2WoRbTwKZ516z6NEs5A:36619", requests_per_second: "-1", }); console.log(response);
POST _update_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1
可以使用 任务 API 找到任务 ID。
就像在 _update_by_query
API 上设置它一样,requests_per_second
可以是 -1
以禁用限制,也可以是任何十进制数,如 1.7
或 12
以将限制设置为该级别。加速查询的重新限制会立即生效,但减慢查询的重新限制会在完成当前批次后生效。这可以防止滚动超时。
手动切片
编辑通过为每个请求提供切片 ID 和切片总数,手动对按查询更新进行切片
resp = client.update_by_query( index="my-index-000001", slice={ "id": 0, "max": 2 }, script={ "source": "ctx._source['extra'] = 'test'" }, ) print(resp) resp1 = client.update_by_query( index="my-index-000001", slice={ "id": 1, "max": 2 }, script={ "source": "ctx._source['extra'] = 'test'" }, ) print(resp1)
response = client.update_by_query( index: 'my-index-000001', body: { slice: { id: 0, max: 2 }, script: { source: "ctx._source['extra'] = 'test'" } } ) puts response response = client.update_by_query( index: 'my-index-000001', body: { slice: { id: 1, max: 2 }, script: { source: "ctx._source['extra'] = 'test'" } } ) puts response
const response = await client.updateByQuery({ index: "my-index-000001", slice: { id: 0, max: 2, }, script: { source: "ctx._source['extra'] = 'test'", }, }); console.log(response); const response1 = await client.updateByQuery({ index: "my-index-000001", slice: { id: 1, max: 2, }, script: { source: "ctx._source['extra'] = 'test'", }, }); console.log(response1);
POST my-index-000001/_update_by_query { "slice": { "id": 0, "max": 2 }, "script": { "source": "ctx._source['extra'] = 'test'" } } POST my-index-000001/_update_by_query { "slice": { "id": 1, "max": 2 }, "script": { "source": "ctx._source['extra'] = 'test'" } }
您可以通过以下方式验证是否正常工作
resp = client.indices.refresh() print(resp) resp1 = client.search( index="my-index-000001", size="0", q="extra:test", filter_path="hits.total", ) print(resp1)
response = client.indices.refresh puts response response = client.search( index: 'my-index-000001', size: 0, q: 'extra:test', filter_path: 'hits.total' ) puts response
const response = await client.indices.refresh(); console.log(response); const response1 = await client.search({ index: "my-index-000001", size: 0, q: "extra:test", filter_path: "hits.total", }); console.log(response1);
GET _refresh POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total
这将产生一个合理的 total
,如下所示
{ "hits": { "total": { "value": 120, "relation": "eq" } } }
使用自动切片
编辑您还可以让按查询更新使用 切片滚动在 _id
上进行切片,从而自动并行化。使用 slices
指定要使用的切片数
resp = client.update_by_query( index="my-index-000001", refresh=True, slices="5", script={ "source": "ctx._source['extra'] = 'test'" }, ) print(resp)
response = client.update_by_query( index: 'my-index-000001', refresh: true, slices: 5, body: { script: { source: "ctx._source['extra'] = 'test'" } } ) puts response
const response = await client.updateByQuery({ index: "my-index-000001", refresh: "true", slices: 5, script: { source: "ctx._source['extra'] = 'test'", }, }); console.log(response);
POST my-index-000001/_update_by_query?refresh&slices=5 { "script": { "source": "ctx._source['extra'] = 'test'" } }
您也可以通过以下方式验证是否正常工作
resp = client.search( index="my-index-000001", size="0", q="extra:test", filter_path="hits.total", ) print(resp)
response = client.search( index: 'my-index-000001', size: 0, q: 'extra:test', filter_path: 'hits.total' ) puts response
const response = await client.search({ index: "my-index-000001", size: 0, q: "extra:test", filter_path: "hits.total", }); console.log(response);
POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total
这将产生一个合理的 total
,如下所示
{ "hits": { "total": { "value": 120, "relation": "eq" } } }
将 slices
设置为 auto
将使 Elasticsearch 选择要使用的切片数。此设置将每个分片使用一个切片,直到达到某个限制。如果有多个源数据流或索引,它将根据分片数最少的索引或支持索引选择切片数。
将 slices
添加到 _update_by_query
只是自动化了上面部分中使用的手动过程,创建子请求,这意味着它有一些怪癖
- 您可以在 任务 API 中看到这些请求。这些子请求是具有
slices
的请求任务的“子”任务。 - 获取具有
slices
的请求的任务状态仅包含已完成切片的状态。 - 这些子请求可以通过以下方式单独寻址,例如取消和重新限制。
- 重新限制具有
slices
的请求将按比例重新限制未完成的子请求。 - 取消具有
slices
的请求将取消每个子请求。 - 由于
slices
的性质,每个子请求都不会获得完全均匀的文档部分。所有文档都将被寻址,但某些切片可能比其他切片大。预期较大的切片具有更均匀的分布。 - 具有
slices
的请求上的requests_per_second
和max_docs
等参数会按比例分配给每个子请求。将其与上面关于分布不均匀的点相结合,您应该得出结论,将max_docs
与slices
一起使用可能不会导致更新正好max_docs
个文档。 - 每个子请求都会获得源数据流或索引的稍微不同的快照,尽管这些快照都是在近似相同的时间拍摄的。
获取新属性
编辑假设您创建了一个没有动态映射的索引,用数据填充了它,然后添加了一个映射值以从数据中获取更多字段
$params = [ 'index' => 'test', 'body' => [ 'mappings' => [ 'dynamic' => false, 'properties' => [ 'text' => [ 'type' => 'text', ], ], ], ], ]; $response = $client->indices()->create($params); $params = [ 'index' => 'test', 'body' => [ 'text' => 'words words', 'flag' => 'bar', ], ]; $response = $client->index($params); $params = [ 'index' => 'test', 'body' => [ 'text' => 'words words', 'flag' => 'foo', ], ]; $response = $client->index($params); $params = [ 'index' => 'test', 'body' => [ 'properties' => [ 'text' => [ 'type' => 'text', ], 'flag' => [ 'type' => 'text', 'analyzer' => 'keyword', ], ], ], ]; $response = $client->indices()->putMapping($params);
resp = client.indices.create( index="test", mappings={ "dynamic": False, "properties": { "text": { "type": "text" } } }, ) print(resp) resp1 = client.index( index="test", refresh=True, document={ "text": "words words", "flag": "bar" }, ) print(resp1) resp2 = client.index( index="test", refresh=True, document={ "text": "words words", "flag": "foo" }, ) print(resp2) resp3 = client.indices.put_mapping( index="test", properties={ "text": { "type": "text" }, "flag": { "type": "text", "analyzer": "keyword" } }, ) print(resp3)
response = client.indices.create( index: 'test', body: { mappings: { dynamic: false, properties: { text: { type: 'text' } } } } ) puts response response = client.index( index: 'test', refresh: true, body: { text: 'words words', flag: 'bar' } ) puts response response = client.index( index: 'test', refresh: true, body: { text: 'words words', flag: 'foo' } ) puts response response = client.indices.put_mapping( index: 'test', body: { properties: { text: { type: 'text' }, flag: { type: 'text', analyzer: 'keyword' } } } ) puts response
{ res, err := es.Indices.Create( "test", es.Indices.Create.WithBody(strings.NewReader(`{ "mappings": { "dynamic": false, "properties": { "text": { "type": "text" } } } }`)), ) fmt.Println(res, err) } { res, err := es.Index( "test", strings.NewReader(`{ "text": "words words", "flag": "bar" }`), es.Index.WithRefresh("true"), es.Index.WithPretty(), ) fmt.Println(res, err) } { res, err := es.Index( "test", strings.NewReader(`{ "text": "words words", "flag": "foo" }`), es.Index.WithRefresh("true"), es.Index.WithPretty(), ) fmt.Println(res, err) } { res, err := es.Indices.PutMapping( []string{"test"}, strings.NewReader(`{ "properties": { "text": { "type": "text" }, "flag": { "type": "text", "analyzer": "keyword" } } }`), ) fmt.Println(res, err) }
const response = await client.indices.create({ index: "test", mappings: { dynamic: false, properties: { text: { type: "text", }, }, }, }); console.log(response); const response1 = await client.index({ index: "test", refresh: "true", document: { text: "words words", flag: "bar", }, }); console.log(response1); const response2 = await client.index({ index: "test", refresh: "true", document: { text: "words words", flag: "foo", }, }); console.log(response2); const response3 = await client.indices.putMapping({ index: "test", properties: { text: { type: "text", }, flag: { type: "text", analyzer: "keyword", }, }, }); console.log(response3);
PUT test { "mappings": { "dynamic": false, "properties": { "text": {"type": "text"} } } } POST test/_doc?refresh { "text": "words words", "flag": "bar" } POST test/_doc?refresh { "text": "words words", "flag": "foo" } PUT test/_mapping { "properties": { "text": {"type": "text"}, "flag": {"type": "text", "analyzer": "keyword"} } }
搜索数据不会找到任何内容
$params = [ 'index' => 'test', 'body' => [ 'query' => [ 'match' => [ 'flag' => 'foo', ], ], ], ]; $response = $client->search($params);
resp = client.search( index="test", filter_path="hits.total", query={ "match": { "flag": "foo" } }, ) print(resp)
response = client.search( index: 'test', filter_path: 'hits.total', body: { query: { match: { flag: 'foo' } } } ) puts response
res, err := es.Search( es.Search.WithIndex("test"), es.Search.WithBody(strings.NewReader(`{ "query": { "match": { "flag": "foo" } } }`)), es.Search.WithFilterPath("hits.total"), es.Search.WithPretty(), ) fmt.Println(res, err)
const response = await client.search({ index: "test", filter_path: "hits.total", query: { match: { flag: "foo", }, }, }); console.log(response);
POST test/_search?filter_path=hits.total { "query": { "match": { "flag": "foo" } } }
{ "hits" : { "total": { "value": 0, "relation": "eq" } } }
但是您可以发出一个 _update_by_query
请求来获取新的映射
$params = [ 'index' => 'test', ]; $response = $client->updateByQuery($params); $params = [ 'index' => 'test', 'body' => [ 'query' => [ 'match' => [ 'flag' => 'foo', ], ], ], ]; $response = $client->search($params);
resp = client.update_by_query( index="test", refresh=True, conflicts="proceed", ) print(resp) resp1 = client.search( index="test", filter_path="hits.total", query={ "match": { "flag": "foo" } }, ) print(resp1)
response = client.update_by_query( index: 'test', refresh: true, conflicts: 'proceed' ) puts response response = client.search( index: 'test', filter_path: 'hits.total', body: { query: { match: { flag: 'foo' } } } ) puts response
{ res, err := es.UpdateByQuery( []string{"test"}, es.UpdateByQuery.WithConflicts("proceed"), es.UpdateByQuery.WithRefresh(true), ) fmt.Println(res, err) } { res, err := es.Search( es.Search.WithIndex("test"), es.Search.WithBody(strings.NewReader(`{ "query": { "match": { "flag": "foo" } } }`)), es.Search.WithFilterPath("hits.total"), es.Search.WithPretty(), ) fmt.Println(res, err) }
const response = await client.updateByQuery({ index: "test", refresh: "true", conflicts: "proceed", }); console.log(response); const response1 = await client.search({ index: "test", filter_path: "hits.total", query: { match: { flag: "foo", }, }, }); console.log(response1);
POST test/_update_by_query?refresh&conflicts=proceed POST test/_search?filter_path=hits.total { "query": { "match": { "flag": "foo" } } }
{ "hits" : { "total": { "value": 1, "relation": "eq" } } }
当向多字段添加字段时,您可以执行完全相同的操作。