批量 API
编辑批量 API
编辑在单个 API 调用中执行多个索引或删除操作。这可以减少开销并大大提高索引速度。
$params = [ 'body' => [ [ 'index' => [ '_index' => 'test', '_id' => '1', ], ], [ 'field1' => 'value1', ], [ 'delete' => [ '_index' => 'test', '_id' => '2', ], ], [ 'create' => [ '_index' => 'test', '_id' => '3', ], ], [ 'field1' => 'value3', ], [ 'update' => [ '_id' => '1', '_index' => 'test', ], ], [ 'doc' => [ 'field2' => 'value2', ], ], ], ]; $response = $client->bulk($params);
resp = client.bulk( operations=[ { "index": { "_index": "test", "_id": "1" } }, { "field1": "value1" }, { "delete": { "_index": "test", "_id": "2" } }, { "create": { "_index": "test", "_id": "3" } }, { "field1": "value3" }, { "update": { "_id": "1", "_index": "test" } }, { "doc": { "field2": "value2" } } ], ) print(resp)
response = client.bulk( body: [ { index: { _index: 'test', _id: '1' } }, { "field1": 'value1' }, { delete: { _index: 'test', _id: '2' } }, { create: { _index: 'test', _id: '3' } }, { "field1": 'value3' }, { update: { _id: '1', _index: 'test' } }, { doc: { "field2": 'value2' } } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} } `), ) fmt.Println(res, err)
const response = await client.bulk({ operations: [ { index: { _index: "test", _id: "1", }, }, { field1: "value1", }, { delete: { _index: "test", _id: "2", }, }, { create: { _index: "test", _id: "3", }, }, { field1: "value3", }, { update: { _id: "1", _index: "test", }, }, { doc: { field2: "value2", }, }, ], }); console.log(response);
POST _bulk { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} }
先决条件
编辑-
如果启用了 Elasticsearch 安全功能,您必须拥有目标数据流、索引或索引别名的以下索引权限
- 要使用
create
操作,您必须拥有create_doc
、create
、index
或write
索引权限。数据流仅支持create
操作。 - 要使用
index
操作,您必须拥有create
、index
或write
索引权限。 - 要使用
delete
操作,您必须拥有delete
或write
索引权限。 - 要使用
update
操作,您必须拥有index
或write
索引权限。 - 要使用批量 API 请求自动创建数据流或索引,您必须拥有
auto_configure
、create_index
或manage
索引权限。 - 要使批量操作的结果通过
refresh
参数对搜索可见,您必须拥有maintenance
或manage
索引权限。
- 要使用
- 自动数据流创建需要一个启用了数据流的匹配索引模板。请参阅设置数据流。
描述
编辑提供了一种在单个请求中执行多个 index
、create
、delete
和 update
操作的方法。
使用换行符分隔的 JSON (NDJSON) 结构在请求正文中指定操作
action_and_meta_data\n optional_source\n action_and_meta_data\n optional_source\n .... action_and_meta_data\n optional_source\n
index
和 create
操作期望在下一行有源,并且具有与标准索引 API 中的 op_type
参数相同的语义:如果目标中已存在具有相同 ID 的文档,则 create
失败,index
根据需要添加或替换文档。
数据流仅支持 create
操作。 要更新或删除数据流中的文档,您必须以包含该文档的后备索引为目标。 请参阅更新或删除后备索引中的文档。
update
期望在下一行指定部分文档、upsert 和脚本及其选项。
delete
不期望下一行有源,并且具有与标准删除 API 相同的语义。
数据的最后一行必须以换行符 \n
结尾。每个换行符前面可以有一个回车符 \r
。将 NDJSON 数据发送到 _bulk
端点时,请使用 application/json
或 application/x-ndjson
的 Content-Type
标头。
由于此格式使用字面量 \n
作为分隔符,请确保 JSON 操作和源不是经过格式化打印的。
如果您在请求路径中提供 <target>
,则它用于任何未明确指定 _index
参数的操作。
关于格式的说明:这里的想法是使处理速度尽可能快。由于某些操作被重定向到其他节点上的其他分片,因此仅在接收节点端解析 action_meta_data
。
使用此协议的客户端库应尝试在客户端执行类似的操作,并尽可能减少缓冲。
在单个批量请求中执行的操作没有“正确”的数量。尝试使用不同的设置来找到适合您的特定工作负载的最佳大小。请注意,Elasticsearch 默认将 HTTP 请求的最大大小限制为 100mb
,因此客户端必须确保任何请求都不会超过此大小。无法索引超过大小限制的单个文档,因此您必须在将任何此类文档发送到 Elasticsearch 之前将其预处理为较小的片段。例如,在索引文档之前将文档拆分为页或章,或将原始二进制数据存储在 Elasticsearch 之外的系统中,并将原始数据替换为您发送到 Elasticsearch 的文档中指向外部系统的链接。
批量请求的客户端支持
编辑一些官方支持的客户端提供了帮助进行批量请求和重新索引的助手
- Go
- 请参阅 esutil.BulkIndexer
- Perl
- 请参阅 Search::Elasticsearch::Client::5_0::Bulk 和 Search::Elasticsearch::Client::5_0::Scroll
- Python
- 请参阅 elasticsearch.helpers.*
- JavaScript
- 请参阅 client.helpers.*
- .NET
- 请参阅
BulkAllObservable
- PHP
- 请参阅 批量索引
使用 cURL 提交批量请求
编辑如果您要向 curl
提供文本文件输入,则必须使用 --data-binary
标志而不是普通的 -d
。后者不会保留换行符。示例
$ cat requests { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } $ curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo {"took":7, "errors": false, "items":[{"index":{"_index":"test","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
乐观并发控制
编辑批量 API 调用中的每个 index
和 delete
操作都可以在其各自的操作和元数据行中包含 if_seq_no
和 if_primary_term
参数。if_seq_no
和 if_primary_term
参数根据对现有文档的最后修改来控制操作的执行方式。有关更多详细信息,请参阅乐观并发控制。
版本控制
编辑每个批量项都可以使用 version
字段包含版本值。它会自动遵循基于 _version
映射的索引/删除操作的行为。它还支持 version_type
(请参阅版本控制)。
路由
编辑每个批量项都可以使用 routing
字段包含路由值。它会自动遵循基于 _routing
映射的索引/删除操作的行为。
数据流不支持自定义路由,除非它们是在模板中启用了 allow_custom_routing
设置的情况下创建的。
等待活动分片
编辑在进行批量调用时,您可以设置 wait_for_active_shards
参数,以要求在开始处理批量请求之前至少有最少数量的分片副本处于活动状态。有关更多详细信息和使用示例,请参阅此处。
刷新
编辑控制此请求所做的更改何时对搜索可见。请参阅刷新。
只有接收批量请求的分片才会受到 refresh
的影响。假设一个 _bulk?refresh=wait_for
请求中有三个文档,这些文档恰好被路由到一个具有五个分片的索引中的不同分片。该请求将仅等待这三个分片刷新。构成索引的其他两个分片根本不参与 _bulk
请求。
安全性
编辑请参阅基于 URL 的访问控制。
路径参数
编辑-
<target>
- (可选,字符串)要对其执行批量操作的数据流、索引或索引别称的名称。
查询参数
编辑-
list_executed_pipelines
- (可选,布尔值)如果为
true
,则响应将包括为每个index
或create
执行的提取管道。默认为false
。 -
pipeline
- (可选,字符串)用于预处理传入文档的管道的 ID。如果索引指定了默认提取管道,则将该值设置为
_none
将禁用此请求的默认提取管道。如果配置了最终管道,则无论此参数的值如何,它都将始终运行。 -
refresh
- (可选,枚举)如果为
true
,则 Elasticsearch 将刷新受影响的分片以使此操作对搜索可见;如果为wait_for
,则等待刷新以使此操作对搜索可见;如果为false
,则不进行任何刷新。有效值为:true
、false
、wait_for
。默认值:false
。 -
require_alias
- (可选,布尔值)如果为
true
,则请求的操作必须以索引别名为目标。默认为false
。 -
routing
- (可选,字符串)用于将操作路由到特定分片的自定义值。
-
_source
- (可选,字符串)为 true 或 false,以返回或不返回
_source
字段,或要返回的字段列表。 -
_source_excludes
-
(可选,字符串)要从响应中排除的源字段的逗号分隔列表。
您还可以使用此参数从
_source_includes
查询参数中指定的子集中排除字段。如果
_source
参数为false
,则忽略此参数。 -
_source_includes
-
(可选,字符串)要包含在响应中的源字段的逗号分隔列表。
如果指定了此参数,则仅返回这些源字段。您可以使用
_source_excludes
查询参数从此子集中排除字段。如果
_source
参数为false
,则忽略此参数。 -
timeout
-
(可选,时间单位)每个操作等待后续操作的时间段
默认为
1m
(一分钟)。这保证了 Elasticsearch 在失败之前至少等待超时时间。实际等待时间可能会更长,尤其是在发生多次等待时。 -
wait_for_active_shards
-
(可选,字符串)在继续操作之前必须处于活动状态的每个分片的副本数。设置为
all
或任何非负整数,最大值为索引中每个分片的总副本数(number_of_replicas+1
)。默认为1
,表示仅等待每个主分片处于活动状态。请参阅 活动分片。
请求体
编辑请求体包含一个换行符分隔的 create
、delete
、index
和 update
操作及其相关源数据列表。
-
create
-
(可选,字符串)如果指定的文档尚不存在,则对其进行索引。下一行必须包含要索引的源数据。
-
_index
- (可选,字符串)要执行操作的数据流、索引或索引别名的名称。如果请求路径中未指定
<target>
,则此参数是必需的。 -
_id
- (可选,字符串)文档 ID。如果未指定 ID,则会自动生成文档 ID。
-
list_executed_pipelines
- (可选,布尔值)如果为
true
,则响应将包含已执行的摄取管道。默认为false
。 -
require_alias
- (可选,布尔值)如果为
true
,则操作必须以索引别名为目标。默认为false
。 -
dynamic_templates
- (可选,映射)从字段的完整名称到 动态模板 的名称的映射。默认为空映射。如果名称与动态模板匹配,则无论模板中定义的其他匹配谓词如何,都将应用该模板。如果字段已在映射中定义,则不会使用此参数。
-
-
delete
-
(可选,字符串)从索引中删除指定的文档。
-
_index
- (可选,字符串)要对其执行操作的索引或索引别名的名称。如果请求路径中未指定
<target>
,则此参数是必需的。 -
_id
- (必需,字符串)文档 ID。
-
require_alias
- (可选,布尔值)如果为
true
,则操作必须以索引别名为目标。默认为false
。
-
-
index
-
(可选,字符串)索引指定的文档。如果文档存在,则替换文档并递增版本。下一行必须包含要索引的源数据。
-
_index
- (可选,字符串)要对其执行操作的索引或索引别名的名称。如果请求路径中未指定
<target>
,则此参数是必需的。 -
_id
- (可选,字符串)文档 ID。如果未指定 ID,则会自动生成文档 ID。
-
list_executed_pipelines
- (可选,布尔值)如果为
true
,则响应将包含已执行的摄取管道。默认为false
。 -
require_alias
- (可选,布尔值)如果为
true
,则操作必须以索引别名为目标。默认为false
。 -
dynamic_templates
- (可选,映射)从字段的完整名称到 动态模板 的名称的映射。默认为空映射。如果名称与动态模板匹配,则无论模板中定义的其他匹配谓词如何,都将应用该模板。如果字段已在映射中定义,则不会使用此参数。
-
-
update
-
(可选,字符串)执行部分文档更新。下一行必须包含部分文档和更新选项。
-
_index
- (可选,字符串)要对其执行操作的索引或索引别名的名称。如果请求路径中未指定
<target>
,则此参数是必需的。 -
_id
- (必需,字符串)文档 ID。
-
require_alias
- (可选,布尔值)如果为
true
,则操作必须以索引别名为目标。默认为false
。
-
-
doc
- (可选,对象)要索引的部分文档。对于
update
操作是必需的。 -
<fields>
- (可选,对象)要索引的文档源。对于
create
和index
操作是必需的。
响应体
编辑批量 API 的响应包含请求中每个操作的单独结果,按照提交顺序返回。单个操作的成功或失败不会影响请求中的其他操作。
-
took
- (整数)处理批量请求所花费的时间,以毫秒为单位。
-
errors
- (布尔值)如果为
true
,则批量请求中的一个或多个操作未成功完成。 -
items
-
(对象数组)包含批量请求中每个操作的结果,按照提交顺序排列。
items
对象的属性- <action>
-
(对象)参数名称是与操作关联的操作。可能的值为
create
、delete
、index
和update
。参数值是一个对象,其中包含关联操作的信息。
<action>
的属性-
_index
- (字符串)与操作关联的索引的名称。如果操作以数据流为目标,则这是写入文档的后备索引。
-
_id
- (整数)与操作关联的文档 ID。
-
_version
-
(整数)与操作关联的文档版本。每次更新文档时,文档版本都会递增。
此参数仅针对成功的操作返回。
-
result
- (字符串)操作的结果。成功的值为
created
、deleted
和updated
。其他有效值为noop
和not_found
。 -
_shards
-
(对象)包含操作的分片信息。
此参数仅针对成功的操作返回。
_shards
的属性-
total
- (整数)操作尝试在其上执行的分片数。
-
successful
- (整数)操作成功的分片数。
-
failed
- (整数)操作尝试在其上执行但失败的分片数。
-
-
_seq_no
-
(整数)为操作分配给文档的序列号。序列号用于确保较旧版本的文档不会覆盖较新版本的文档。请参阅 乐观并发控制。
此参数仅针对成功的操作返回。
-
_primary_term
-
(整数)为操作分配给文档的主项。请参阅 乐观并发控制。
此参数仅针对成功的操作返回。
-
status
- (整数)为操作返回的 HTTP 状态代码。
-
error
-
(对象)包含有关失败操作的附加信息。
该参数仅针对失败的操作返回。
error
的属性-
type
- (字符串)操作的错误类型。
-
reason
- (字符串)操作失败的原因。
-
index_uuid
- (字符串)与失败操作关联的索引的通用唯一标识符 (UUID)。
-
shard
- (字符串)与失败操作关联的分片 ID。
-
index
- (字符串)与失败操作关联的索引的名称。如果操作以数据流为目标,则这是尝试写入文档的后备索引。
-
-
示例
编辑$params = [ 'body' => [ [ 'index' => [ '_index' => 'test', '_id' => '1', ], ], [ 'field1' => 'value1', ], [ 'delete' => [ '_index' => 'test', '_id' => '2', ], ], [ 'create' => [ '_index' => 'test', '_id' => '3', ], ], [ 'field1' => 'value3', ], [ 'update' => [ '_id' => '1', '_index' => 'test', ], ], [ 'doc' => [ 'field2' => 'value2', ], ], ], ]; $response = $client->bulk($params);
resp = client.bulk( operations=[ { "index": { "_index": "test", "_id": "1" } }, { "field1": "value1" }, { "delete": { "_index": "test", "_id": "2" } }, { "create": { "_index": "test", "_id": "3" } }, { "field1": "value3" }, { "update": { "_id": "1", "_index": "test" } }, { "doc": { "field2": "value2" } } ], ) print(resp)
response = client.bulk( body: [ { index: { _index: 'test', _id: '1' } }, { "field1": 'value1' }, { delete: { _index: 'test', _id: '2' } }, { create: { _index: 'test', _id: '3' } }, { "field1": 'value3' }, { update: { _id: '1', _index: 'test' } }, { doc: { "field2": 'value2' } } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} } `), ) fmt.Println(res, err)
const response = await client.bulk({ operations: [ { index: { _index: "test", _id: "1", }, }, { field1: "value1", }, { delete: { _index: "test", _id: "2", }, }, { create: { _index: "test", _id: "3", }, }, { field1: "value3", }, { update: { _id: "1", _index: "test", }, }, { doc: { field2: "value2", }, }, ], }); console.log(response);
POST _bulk { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} }
API 返回以下结果
{ "took": 30, "errors": false, "items": [ { "index": { "_index": "test", "_id": "1", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 201, "_seq_no" : 0, "_primary_term": 1 } }, { "delete": { "_index": "test", "_id": "2", "_version": 1, "result": "not_found", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 404, "_seq_no" : 1, "_primary_term" : 2 } }, { "create": { "_index": "test", "_id": "3", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 201, "_seq_no" : 2, "_primary_term" : 3 } }, { "update": { "_index": "test", "_id": "1", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200, "_seq_no" : 3, "_primary_term" : 4 } } ] }
批量更新示例
编辑使用 update
操作时,可以将 retry_on_conflict
用作操作本身的字段(而不是在额外的有效负载行中),以指定在发生版本冲突时应重试更新的次数。
update
操作有效负载支持以下选项:doc
(部分文档)、upsert
、doc_as_upsert
、script
、params
(用于脚本)、lang
(用于脚本)和 _source
。有关选项的详细信息,请参阅更新文档。带有更新操作的示例
$params = [ 'body' => [ [ 'update' => [ '_id' => '1', '_index' => 'index1', 'retry_on_conflict' => 3, ], ], [ 'doc' => [ 'field' => 'value', ], ], [ 'update' => [ '_id' => '0', '_index' => 'index1', 'retry_on_conflict' => 3, ], ], [ 'script' => [ 'source' => 'ctx._source.counter += params.param1', 'lang' => 'painless', 'params' => [ 'param1' => 1, ], ], 'upsert' => [ 'counter' => 1, ], ], [ 'update' => [ '_id' => '2', '_index' => 'index1', 'retry_on_conflict' => 3, ], ], [ 'doc' => [ 'field' => 'value', ], 'doc_as_upsert' => true, ], [ 'update' => [ '_id' => '3', '_index' => 'index1', '_source' => true, ], ], [ 'doc' => [ 'field' => 'value', ], ], [ 'update' => [ '_id' => '4', '_index' => 'index1', ], ], [ 'doc' => [ 'field' => 'value', ], '_source' => true, ], ], ]; $response = $client->bulk($params);
resp = client.bulk( operations=[ { "update": { "_id": "1", "_index": "index1", "retry_on_conflict": 3 } }, { "doc": { "field": "value" } }, { "update": { "_id": "0", "_index": "index1", "retry_on_conflict": 3 } }, { "script": { "source": "ctx._source.counter += params.param1", "lang": "painless", "params": { "param1": 1 } }, "upsert": { "counter": 1 } }, { "update": { "_id": "2", "_index": "index1", "retry_on_conflict": 3 } }, { "doc": { "field": "value" }, "doc_as_upsert": True }, { "update": { "_id": "3", "_index": "index1", "_source": True } }, { "doc": { "field": "value" } }, { "update": { "_id": "4", "_index": "index1" } }, { "doc": { "field": "value" }, "_source": True } ], ) print(resp)
response = client.bulk( body: [ { update: { _id: '1', _index: 'index1', retry_on_conflict: 3 } }, { doc: { field: 'value' } }, { update: { _id: '0', _index: 'index1', retry_on_conflict: 3 } }, { script: { source: 'ctx._source.counter += params.param1', lang: 'painless', params: { "param1": 1 } }, upsert: { counter: 1 } }, { update: { _id: '2', _index: 'index1', retry_on_conflict: 3 } }, { doc: { field: 'value' }, doc_as_upsert: true }, { update: { _id: '3', _index: 'index1', _source: true } }, { doc: { field: 'value' } }, { update: { _id: '4', _index: 'index1' } }, { doc: { field: 'value' }, _source: true } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} } { "doc" : {"field" : "value"} } { "update" : { "_id" : "0", "_index" : "index1", "retry_on_conflict" : 3} } { "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}} { "update" : {"_id" : "2", "_index" : "index1", "retry_on_conflict" : 3} } { "doc" : {"field" : "value"}, "doc_as_upsert" : true } { "update" : {"_id" : "3", "_index" : "index1", "_source" : true} } { "doc" : {"field" : "value"} } { "update" : {"_id" : "4", "_index" : "index1"} } { "doc" : {"field" : "value"}, "_source": true} `), ) fmt.Println(res, err)
const response = await client.bulk({ operations: [ { update: { _id: "1", _index: "index1", retry_on_conflict: 3, }, }, { doc: { field: "value", }, }, { update: { _id: "0", _index: "index1", retry_on_conflict: 3, }, }, { script: { source: "ctx._source.counter += params.param1", lang: "painless", params: { param1: 1, }, }, upsert: { counter: 1, }, }, { update: { _id: "2", _index: "index1", retry_on_conflict: 3, }, }, { doc: { field: "value", }, doc_as_upsert: true, }, { update: { _id: "3", _index: "index1", _source: true, }, }, { doc: { field: "value", }, }, { update: { _id: "4", _index: "index1", }, }, { doc: { field: "value", }, _source: true, }, ], }); console.log(response);
POST _bulk { "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} } { "doc" : {"field" : "value"} } { "update" : { "_id" : "0", "_index" : "index1", "retry_on_conflict" : 3} } { "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}} { "update" : {"_id" : "2", "_index" : "index1", "retry_on_conflict" : 3} } { "doc" : {"field" : "value"}, "doc_as_upsert" : true } { "update" : {"_id" : "3", "_index" : "index1", "_source" : true} } { "doc" : {"field" : "value"} } { "update" : {"_id" : "4", "_index" : "index1"} } { "doc" : {"field" : "value"}, "_source": true}
带有失败操作的示例
编辑以下批量 API 请求包括更新不存在文档的操作。
$params = [ 'body' => [ [ 'update' => [ '_id' => '5', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'foo', ], ], [ 'update' => [ '_id' => '6', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'foo', ], ], [ 'create' => [ '_id' => '7', '_index' => 'index1', ], ], [ 'my_field' => 'foo', ], ], ]; $response = $client->bulk($params);
resp = client.bulk( operations=[ { "update": { "_id": "5", "_index": "index1" } }, { "doc": { "my_field": "foo" } }, { "update": { "_id": "6", "_index": "index1" } }, { "doc": { "my_field": "foo" } }, { "create": { "_id": "7", "_index": "index1" } }, { "my_field": "foo" } ], ) print(resp)
response = client.bulk( body: [ { update: { _id: '5', _index: 'index1' } }, { doc: { my_field: 'foo' } }, { update: { _id: '6', _index: 'index1' } }, { doc: { my_field: 'foo' } }, { create: { _id: '7', _index: 'index1' } }, { my_field: 'foo' } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "update": {"_id": "5", "_index": "index1"} } { "doc": {"my_field": "foo"} } { "update": {"_id": "6", "_index": "index1"} } { "doc": {"my_field": "foo"} } { "create": {"_id": "7", "_index": "index1"} } { "my_field": "foo" } `), ) fmt.Println(res, err)
const response = await client.bulk({ operations: [ { update: { _id: "5", _index: "index1", }, }, { doc: { my_field: "foo", }, }, { update: { _id: "6", _index: "index1", }, }, { doc: { my_field: "foo", }, }, { create: { _id: "7", _index: "index1", }, }, { my_field: "foo", }, ], }); console.log(response);
POST /_bulk { "update": {"_id": "5", "_index": "index1"} } { "doc": {"my_field": "foo"} } { "update": {"_id": "6", "_index": "index1"} } { "doc": {"my_field": "foo"} } { "create": {"_id": "7", "_index": "index1"} } { "my_field": "foo" }
由于这些操作无法成功完成,因此 API 返回的响应的 errors
标志为 true
。
响应还包括任何失败操作的 error
对象。error
对象包含有关失败的其他信息,例如错误类型和原因。
{ "took": 486, "errors": true, "items": [ { "update": { "_index": "index1", "_id": "5", "status": 404, "error": { "type": "document_missing_exception", "reason": "[5]: document missing", "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA", "shard": "0", "index": "index1" } } }, { "update": { "_index": "index1", "_id": "6", "status": 404, "error": { "type": "document_missing_exception", "reason": "[6]: document missing", "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA", "shard": "0", "index": "index1" } } }, { "create": { "_index": "index1", "_id": "7", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "_seq_no": 0, "_primary_term": 1, "status": 201 } } ] }
要仅返回有关失败操作的信息,请使用 filter_path
查询参数,参数为 items.*.error
。
$params = [ 'body' => [ [ 'update' => [ '_id' => '5', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'baz', ], ], [ 'update' => [ '_id' => '6', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'baz', ], ], [ 'update' => [ '_id' => '7', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'baz', ], ], ], ]; $response = $client->bulk($params);
resp = client.bulk( filter_path="items.*.error", operations=[ { "update": { "_id": "5", "_index": "index1" } }, { "doc": { "my_field": "baz" } }, { "update": { "_id": "6", "_index": "index1" } }, { "doc": { "my_field": "baz" } }, { "update": { "_id": "7", "_index": "index1" } }, { "doc": { "my_field": "baz" } } ], ) print(resp)
response = client.bulk( filter_path: 'items.*.error', body: [ { update: { _id: '5', _index: 'index1' } }, { doc: { my_field: 'baz' } }, { update: { _id: '6', _index: 'index1' } }, { doc: { my_field: 'baz' } }, { update: { _id: '7', _index: 'index1' } }, { doc: { my_field: 'baz' } } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "update": {"_id": "5", "_index": "index1"} } { "doc": {"my_field": "baz"} } { "update": {"_id": "6", "_index": "index1"} } { "doc": {"my_field": "baz"} } { "update": {"_id": "7", "_index": "index1"} } { "doc": {"my_field": "baz"} } `), es.Bulk.WithFilterPath("items.*.error"), ) fmt.Println(res, err)
const response = await client.bulk({ filter_path: "items.*.error", operations: [ { update: { _id: "5", _index: "index1", }, }, { doc: { my_field: "baz", }, }, { update: { _id: "6", _index: "index1", }, }, { doc: { my_field: "baz", }, }, { update: { _id: "7", _index: "index1", }, }, { doc: { my_field: "baz", }, }, ], }); console.log(response);
POST /_bulk?filter_path=items.*.error { "update": {"_id": "5", "_index": "index1"} } { "doc": {"my_field": "baz"} } { "update": {"_id": "6", "_index": "index1"} } { "doc": {"my_field": "baz"} } { "update": {"_id": "7", "_index": "index1"} } { "doc": {"my_field": "baz"} }
API 返回以下结果。
{ "items": [ { "update": { "error": { "type": "document_missing_exception", "reason": "[5]: document missing", "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA", "shard": "0", "index": "index1" } } }, { "update": { "error": { "type": "document_missing_exception", "reason": "[6]: document missing", "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA", "shard": "0", "index": "index1" } } } ] }
带有动态模板参数的示例
编辑以下示例创建一个动态模板,然后执行一个由带有 dynamic_templates
参数的索引/创建请求组成的批量请求。
resp = client.indices.create( index="my-index", mappings={ "dynamic_templates": [ { "geo_point": { "mapping": { "type": "geo_point" } } } ] }, ) print(resp) resp1 = client.bulk( operations=[ { "index": { "_index": "my_index", "_id": "1", "dynamic_templates": { "work_location": "geo_point" } } }, { "field": "value1", "work_location": "41.12,-71.34", "raw_location": "41.12,-71.34" }, { "create": { "_index": "my_index", "_id": "2", "dynamic_templates": { "home_location": "geo_point" } } }, { "field": "value2", "home_location": "41.12,-71.34" } ], ) print(resp1)
response = client.indices.create( index: 'my-index', body: { mappings: { dynamic_templates: [ { geo_point: { mapping: { type: 'geo_point' } } } ] } } ) puts response response = client.bulk( body: [ { index: { _index: 'my_index', _id: '1', dynamic_templates: { work_location: 'geo_point' } } }, { field: 'value1', work_location: '41.12,-71.34', raw_location: '41.12,-71.34' }, { create: { _index: 'my_index', _id: '2', dynamic_templates: { home_location: 'geo_point' } } }, { field: 'value2', home_location: '41.12,-71.34' } ] ) puts response
const response = await client.indices.create({ index: "my-index", mappings: { dynamic_templates: [ { geo_point: { mapping: { type: "geo_point", }, }, }, ], }, }); console.log(response); const response1 = await client.bulk({ operations: [ { index: { _index: "my_index", _id: "1", dynamic_templates: { work_location: "geo_point", }, }, }, { field: "value1", work_location: "41.12,-71.34", raw_location: "41.12,-71.34", }, { create: { _index: "my_index", _id: "2", dynamic_templates: { home_location: "geo_point", }, }, }, { field: "value2", home_location: "41.12,-71.34", }, ], }); console.log(response1);
PUT my-index/ { "mappings": { "dynamic_templates": [ { "geo_point": { "mapping": { "type" : "geo_point" } } } ] } } POST /_bulk { "index" : { "_index" : "my_index", "_id" : "1", "dynamic_templates": {"work_location": "geo_point"}} } { "field" : "value1", "work_location": "41.12,-71.34", "raw_location": "41.12,-71.34"} { "create" : { "_index" : "my_index", "_id" : "2", "dynamic_templates": {"home_location": "geo_point"}} } { "field" : "value2", "home_location": "41.12,-71.34"}
批量请求根据 dynamic_templates
参数创建两个新字段 work_location
和 home_location
,类型为 geo_point
;但是,raw_location
字段是使用默认动态映射规则创建的,在这种情况下,由于它在 JSON 文档中以字符串形式提供,因此它是一个 text
字段。