批量 API
编辑批量 API编辑
在单个 API 调用中执行多个索引或删除操作。这减少了开销,可以大大提高索引速度。
$params = [ 'body' => [ [ 'index' => [ '_index' => 'test', '_id' => '1', ], ], [ 'field1' => 'value1', ], [ 'delete' => [ '_index' => 'test', '_id' => '2', ], ], [ 'create' => [ '_index' => 'test', '_id' => '3', ], ], [ 'field1' => 'value3', ], [ 'update' => [ '_id' => '1', '_index' => 'test', ], ], [ 'doc' => [ 'field2' => 'value2', ], ], ], ]; $response = $client->bulk($params);
resp = client.bulk( body=[ {"index": {"_index": "test", "_id": "1"}}, {"field1": "value1"}, {"delete": {"_index": "test", "_id": "2"}}, {"create": {"_index": "test", "_id": "3"}}, {"field1": "value3"}, {"update": {"_id": "1", "_index": "test"}}, {"doc": {"field2": "value2"}}, ], ) print(resp)
response = client.bulk( body: [ { index: { _index: 'test', _id: '1' } }, { "field1": 'value1' }, { delete: { _index: 'test', _id: '2' } }, { create: { _index: 'test', _id: '3' } }, { "field1": 'value3' }, { update: { _id: '1', _index: 'test' } }, { doc: { "field2": 'value2' } } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} } `), ) fmt.Println(res, err)
const response = await client.bulk({ body: [ { index: { _index: 'test', _id: '1' } }, { field1: 'value1' }, { delete: { _index: 'test', _id: '2' } }, { create: { _index: 'test', _id: '3' } }, { field1: 'value3' }, { update: { _id: '1', _index: 'test' } }, { doc: { field2: 'value2' } } ] }) console.log(response)
POST _bulk { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} }
先决条件编辑
-
如果启用了 Elasticsearch 安全功能,则您必须对目标数据流、索引或索引别名拥有以下 索引权限
- 要使用
create
操作,您必须拥有create_doc
、create
、index
或write
索引权限。数据流仅支持create
操作。 - 要使用
index
操作,您必须拥有create
、index
或write
索引权限。 - 要使用
delete
操作,您必须拥有delete
或write
索引权限。 - 要使用
update
操作,您必须拥有index
或write
索引权限。 - 要使用批量 API 请求自动创建数据流或索引,您必须拥有
auto_configure
、create_index
或manage
索引权限。 - 要使用
refresh
参数使批量操作的结果对搜索可见,您必须拥有maintenance
或manage
索引权限。
- 要使用
- 自动数据流创建需要匹配的索引模板,并启用数据流。请参阅 设置数据流。
描述编辑
提供了一种在单个请求中执行多个 index
、create
、delete
和 update
操作的方法。
使用换行符分隔的 JSON (NDJSON) 结构在请求正文中指定操作
action_and_meta_data\n optional_source\n action_and_meta_data\n optional_source\n .... action_and_meta_data\n optional_source\n
index
和 create
操作期望在下一行提供源,并且与标准索引 API 中的 op_type
参数具有相同的语义:如果目标中已存在具有相同 ID 的文档,则 create
失败,index
根据需要添加或替换文档。
数据流 仅支持 create
操作。要更新或删除数据流中的文档,您必须定位包含该文档的支持索引。请参阅 更新或删除支持索引中的文档。
update
期望在下一行指定部分文档、upsert 以及脚本及其选项。
delete
不期望在下一行提供源,并且与标准删除 API 具有相同的语义。
数据的最后一行必须以换行符 \n
结尾。每个换行符前面可以有一个回车符 \r
。将 NDJSON 数据发送到 _bulk
端点时,请使用 Content-Type
标头为 application/json
或 application/x-ndjson
。
因为此格式使用文字 \n
作为分隔符,所以请确保 JSON 操作和源没有进行漂亮的打印。
如果您在请求路径中提供 <target>
,则它将用于未明确指定 _index
参数的任何操作。
关于格式的说明:这里的想法是尽可能快地处理它。由于某些操作被重定向到其他节点上的其他分片,因此在接收节点侧只解析 action_meta_data
。
使用此协议的客户端库应尝试在客户端执行类似的操作,并尽可能减少缓冲。
在单个批量请求中执行的操作数量没有“正确”的数字。尝试使用不同的设置来找到适合您特定工作负载的最佳大小。请注意,Elasticsearch 默认将 HTTP 请求的最大大小限制为 100mb
,因此客户端必须确保没有请求超过此大小。无法索引超过大小限制的单个文档,因此您必须在将任何此类文档发送到 Elasticsearch 之前将其预处理为较小的部分。例如,在索引文档之前将文档拆分为页面或章节,或者将原始二进制数据存储在 Elasticsearch 之外的系统中,并在发送到 Elasticsearch 的文档中将原始数据替换为指向外部系统的链接。
客户端对批量请求的支持编辑
一些官方支持的客户端提供了帮助程序来协助处理批量请求和重新索引
- Go
- 请参阅 esutil.BulkIndexer
- Perl
- 请参阅 Search::Elasticsearch::Client::5_0::Bulk 和 Search::Elasticsearch::Client::5_0::Scroll
- Python
- 请参阅 elasticsearch.helpers.*
- JavaScript
- 请参阅 client.helpers.*
- .NET
- 请参阅
BulkAllObservable
- PHP
- 请参阅 批量索引
使用 cURL 提交批量请求编辑
如果您要向 curl
提供文本文件输入,则 必须 使用 --data-binary
标志而不是普通的 -d
。后者不保留换行符。示例
$ cat requests { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } $ curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo {"took":7, "errors": false, "items":[{"index":{"_index":"test","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
乐观并发控制编辑
批量 API 调用中的每个 index
和 delete
操作都可以在其各自的操作和元数据行中包含 if_seq_no
和 if_primary_term
参数。if_seq_no
和 if_primary_term
参数根据对现有文档的最后一次修改来控制操作的执行方式。有关更多详细信息,请参阅 乐观并发控制。
版本控制编辑
每个批量项目都可以使用 version
字段包含版本值。它会根据 _version
映射自动遵循索引/删除操作的行为。它还支持 version_type
(请参阅 版本控制)。
路由编辑
每个批量项目都可以使用 routing
字段包含路由值。它会根据 _routing
映射自动遵循索引/删除操作的行为。
数据流不支持自定义路由,除非它们是在模板中启用了 allow_custom_routing
设置的情况下创建的。
等待活动分片编辑
进行批量调用时,您可以设置 wait_for_active_shards
参数,以要求在开始处理批量请求之前至少有指定数量的分片副本处于活动状态。有关更多详细信息和使用示例,请参阅 此处。
刷新编辑
控制此请求所做的更改何时对搜索可见。请参阅 刷新。
只有接收批量请求的分片才会受到 refresh
的影响。假设有一个 _bulk?refresh=wait_for
请求,其中包含三个文档,这些文档恰好被路由到具有五个分片的索引中的不同分片。该请求将只等待这三个分片刷新。构成索引的其他两个分片根本不参与 _bulk
请求。
安全编辑
请参阅 基于 URL 的访问控制。
路径参数编辑
-
<target>
- (可选,字符串)要对其执行批量操作的数据流、索引或索引别名的名称。
查询参数编辑
-
list_executed_pipelines
- (可选,布尔值)如果为
true
,则响应将包含为每个index
或create
执行的摄取管道。默认为false
。 -
pipeline
- (可选,字符串)用于预处理传入文档的管道的 ID。如果索引指定了默认摄取管道,则将值设置为
_none
将禁用此请求的默认摄取管道。如果配置了最终管道,则它将始终运行,而不管此参数的值如何。 -
refresh
- (可选,枚举)如果为
true
,则 Elasticsearch 会刷新受影响的分片,以使此操作对搜索可见;如果为wait_for
,则等待刷新以使此操作对搜索可见;如果为false
,则不对刷新执行任何操作。有效值:true
、false
、wait_for
。默认值:false
。 -
require_alias
- (可选,布尔值)如果为
true
,则请求的操作必须以索引别名为目标。默认为false
。 -
routing
- (可选,字符串)用于将操作路由到特定分片的自定义值。
-
_source
- (可选,字符串)返回
_source
字段或不返回的真或假,或要返回的字段列表。 -
_source_excludes
-
(可选,字符串)以逗号分隔的 源字段 列表,用于从响应中排除。
您还可以使用此参数从
_source_includes
查询参数指定的子集中排除字段。如果
_source
参数为false
,则忽略此参数。 -
_source_includes
-
(可选,字符串)以逗号分隔的 源字段 列表,用于包含在响应中。
如果指定了此参数,则仅返回这些源字段。您可以使用
_source_excludes
查询参数从此子集中排除字段。如果
_source
参数为false
,则忽略此参数。 -
timeout
-
(可选,时间单位)每个操作等待以下操作的时间段
默认为
1m
(一分钟)。这保证 Elasticsearch 在失败之前至少等待超时时间。实际等待时间可能会更长,尤其是在发生多次等待时。 -
wait_for_active_shards
-
(可选,字符串)在继续操作之前必须处于活动状态的分片副本数。设置为
all
或任何正整数,最大值为索引中的分片总数 (number_of_replicas+1
)。默认值:1,即主分片。请参阅 活动分片。
请求正文编辑
请求正文包含以换行符分隔的 create
、delete
、index
和 update
操作及其关联的源数据列表。
-
create
-
(可选,字符串)如果指定的文档不存在,则对其进行索引。下一行必须包含要索引的源数据。
-
_index
- (可选,字符串)要对其执行操作的数据流、索引或索引别名的名称。如果在请求路径中未指定
<target>
,则此参数是必需的。 -
_id
- (可选,字符串)文档 ID。如果未指定 ID,则会自动生成文档 ID。
-
list_executed_pipelines
- (可选,布尔值)如果为
true
,则响应将包含已执行的摄取管道。默认为false
。 -
require_alias
- (可选,布尔值)如果为
true
,则操作必须针对 索引别名。默认为false
。 -
dynamic_templates
- (可选,映射)从字段的全名到 动态模板 名称的映射。默认为空映射。如果名称与动态模板匹配,则无论模板中定义的其他匹配谓词如何,都将应用该模板。如果映射中已定义了字段,则不会使用此参数。
-
-
delete
-
(可选,字符串)从索引中删除指定的文档。
-
_index
- (可选,字符串)要对其执行操作的索引或索引别名的名称。如果在请求路径中未指定
<target>
,则此参数是必需的。 -
_id
- (必填,字符串)文档 ID。
-
require_alias
- (可选,布尔值)如果为
true
,则操作必须针对 索引别名。默认为false
。
-
-
index
-
(可选,字符串)对指定的文档进行索引。如果文档存在,则替换文档并增加版本号。下一行必须包含要索引的源数据。
-
_index
- (可选,字符串)要对其执行操作的索引或索引别名的名称。如果在请求路径中未指定
<target>
,则此参数是必需的。 -
_id
- (可选,字符串)文档 ID。如果未指定 ID,则会自动生成文档 ID。
-
list_executed_pipelines
- (可选,布尔值)如果为
true
,则响应将包含已执行的摄取管道。默认为false
。 -
require_alias
- (可选,布尔值)如果为
true
,则操作必须针对 索引别名。默认为false
。 -
dynamic_templates
- (可选,映射)从字段的全名到 动态模板 名称的映射。默认为空映射。如果名称与动态模板匹配,则无论模板中定义的其他匹配谓词如何,都将应用该模板。如果映射中已定义了字段,则不会使用此参数。
-
-
update
-
(可选,字符串)执行部分文档更新。下一行必须包含部分文档和更新选项。
-
_index
- (可选,字符串)要对其执行操作的索引或索引别名的名称。如果在请求路径中未指定
<target>
,则此参数是必需的。 -
_id
- (必填,字符串)文档 ID。
-
require_alias
- (可选,布尔值)如果为
true
,则操作必须针对 索引别名。默认为false
。
-
-
doc
- (可选,对象)要索引的部分文档。对于
update
操作是必需的。 -
<fields>
- (可选,对象)要索引的文档源。对于
create
和index
操作是必需的。
响应正文编辑
批量 API 的响应包含请求中每个操作的单独结果,按提交顺序返回。单个操作的成功或失败不会影响请求中的其他操作。
-
took
- (整数)处理批量请求所花费的时间(以毫秒为单位)。
-
errors
- (布尔值)如果为
true
,则批量请求中的一个或多个操作未成功完成。 -
items
-
(对象数组)包含批量请求中每个操作的结果,按提交顺序排列。
items
对象的属性- <action>
-
(对象)参数名称是与操作关联的操作。可能的值为
create
、delete
、index
和update
。参数值是一个对象,其中包含关联操作的信息。
<action>
的属性-
_index
- (字符串)与操作关联的索引的名称。如果操作针对的是数据流,则这是写入文档的支持索引。
-
_id
- (整数)与操作关联的文档 ID。
-
_version
-
(整数)与操作关联的文档版本。每次更新文档时,文档版本都会递增。
此参数仅针对成功的操作返回。
-
result
- (字符串)操作的结果。成功的值为
created
、deleted
和updated
。其他有效值为noop
和not_found
。 -
_shards
-
(对象)包含操作的分片信息。
此参数仅针对成功的操作返回。
_shards
的属性-
total
- (整数)操作尝试在其上执行的分片数。
-
successful
- (整数)操作在其上成功的分片数。
-
failed
- (整数)操作尝试在其上执行但失败的分片数。
-
-
_seq_no
-
(整数)为操作分配给文档的序列号。序列号用于确保文档的旧版本不会覆盖新版本。请参阅 乐观并发控制。
此参数仅针对成功的操作返回。
-
_primary_term
-
(整数)为操作分配给文档的主项。请参阅 乐观并发控制。
此参数仅针对成功的操作返回。
-
status
- (整数)为操作返回的 HTTP 状态代码。
-
error
-
(对象)包含有关失败操作的附加信息。
该参数仅针对失败的操作返回。
error
的属性-
type
- (字符串)操作的错误类型。
-
reason
- (字符串)操作失败的原因。
-
index_uuid
- (字符串)与失败操作关联的索引的通用唯一标识符 (UUID)。
-
shard
- (字符串)与失败操作关联的分片的 ID。
-
index
- (字符串)与失败操作关联的索引的名称。如果操作针对的是数据流,则这是尝试写入文档的支持索引。
-
-
示例编辑
$params = [ 'body' => [ [ 'index' => [ '_index' => 'test', '_id' => '1', ], ], [ 'field1' => 'value1', ], [ 'delete' => [ '_index' => 'test', '_id' => '2', ], ], [ 'create' => [ '_index' => 'test', '_id' => '3', ], ], [ 'field1' => 'value3', ], [ 'update' => [ '_id' => '1', '_index' => 'test', ], ], [ 'doc' => [ 'field2' => 'value2', ], ], ], ]; $response = $client->bulk($params);
resp = client.bulk( body=[ {"index": {"_index": "test", "_id": "1"}}, {"field1": "value1"}, {"delete": {"_index": "test", "_id": "2"}}, {"create": {"_index": "test", "_id": "3"}}, {"field1": "value3"}, {"update": {"_id": "1", "_index": "test"}}, {"doc": {"field2": "value2"}}, ], ) print(resp)
response = client.bulk( body: [ { index: { _index: 'test', _id: '1' } }, { "field1": 'value1' }, { delete: { _index: 'test', _id: '2' } }, { create: { _index: 'test', _id: '3' } }, { "field1": 'value3' }, { update: { _id: '1', _index: 'test' } }, { doc: { "field2": 'value2' } } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} } `), ) fmt.Println(res, err)
const response = await client.bulk({ body: [ { index: { _index: 'test', _id: '1' } }, { field1: 'value1' }, { delete: { _index: 'test', _id: '2' } }, { create: { _index: 'test', _id: '3' } }, { field1: 'value3' }, { update: { _id: '1', _index: 'test' } }, { doc: { field2: 'value2' } } ] }) console.log(response)
POST _bulk { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} }
API 返回以下结果
{ "took": 30, "errors": false, "items": [ { "index": { "_index": "test", "_id": "1", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 201, "_seq_no" : 0, "_primary_term": 1 } }, { "delete": { "_index": "test", "_id": "2", "_version": 1, "result": "not_found", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 404, "_seq_no" : 1, "_primary_term" : 2 } }, { "create": { "_index": "test", "_id": "3", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 201, "_seq_no" : 2, "_primary_term" : 3 } }, { "update": { "_index": "test", "_id": "1", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200, "_seq_no" : 3, "_primary_term" : 4 } } ] }
批量更新示例编辑
使用 update
操作时,retry_on_conflict
可以用作操作本身中的字段(而不是在额外的有效负载行中),以指定在发生版本冲突时应重试更新的次数。
update
操作有效负载支持以下选项:doc
(部分文档)、upsert
、doc_as_upsert
、script
、params
(用于脚本)、lang
(用于脚本)和 _source
。有关选项的详细信息,请参阅更新文档。包含更新操作的示例
$params = [ 'body' => [ [ 'update' => [ '_id' => '1', '_index' => 'index1', 'retry_on_conflict' => 3, ], ], [ 'doc' => [ 'field' => 'value', ], ], [ 'update' => [ '_id' => '0', '_index' => 'index1', 'retry_on_conflict' => 3, ], ], [ 'script' => [ 'source' => 'ctx._source.counter += params.param1', 'lang' => 'painless', 'params' => [ 'param1' => 1, ], ], 'upsert' => [ 'counter' => 1, ], ], [ 'update' => [ '_id' => '2', '_index' => 'index1', 'retry_on_conflict' => 3, ], ], [ 'doc' => [ 'field' => 'value', ], 'doc_as_upsert' => true, ], [ 'update' => [ '_id' => '3', '_index' => 'index1', '_source' => true, ], ], [ 'doc' => [ 'field' => 'value', ], ], [ 'update' => [ '_id' => '4', '_index' => 'index1', ], ], [ 'doc' => [ 'field' => 'value', ], '_source' => true, ], ], ]; $response = $client->bulk($params);
resp = client.bulk( body=[ { "update": { "_id": "1", "_index": "index1", "retry_on_conflict": 3, } }, {"doc": {"field": "value"}}, { "update": { "_id": "0", "_index": "index1", "retry_on_conflict": 3, } }, { "script": { "source": "ctx._source.counter += params.param1", "lang": "painless", "params": {"param1": 1}, }, "upsert": {"counter": 1}, }, { "update": { "_id": "2", "_index": "index1", "retry_on_conflict": 3, } }, {"doc": {"field": "value"}, "doc_as_upsert": True}, {"update": {"_id": "3", "_index": "index1", "_source": True}}, {"doc": {"field": "value"}}, {"update": {"_id": "4", "_index": "index1"}}, {"doc": {"field": "value"}, "_source": True}, ], ) print(resp)
response = client.bulk( body: [ { update: { _id: '1', _index: 'index1', retry_on_conflict: 3 } }, { doc: { field: 'value' } }, { update: { _id: '0', _index: 'index1', retry_on_conflict: 3 } }, { script: { source: 'ctx._source.counter += params.param1', lang: 'painless', params: { "param1": 1 } }, upsert: { counter: 1 } }, { update: { _id: '2', _index: 'index1', retry_on_conflict: 3 } }, { doc: { field: 'value' }, doc_as_upsert: true }, { update: { _id: '3', _index: 'index1', _source: true } }, { doc: { field: 'value' } }, { update: { _id: '4', _index: 'index1' } }, { doc: { field: 'value' }, _source: true } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} } { "doc" : {"field" : "value"} } { "update" : { "_id" : "0", "_index" : "index1", "retry_on_conflict" : 3} } { "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}} { "update" : {"_id" : "2", "_index" : "index1", "retry_on_conflict" : 3} } { "doc" : {"field" : "value"}, "doc_as_upsert" : true } { "update" : {"_id" : "3", "_index" : "index1", "_source" : true} } { "doc" : {"field" : "value"} } { "update" : {"_id" : "4", "_index" : "index1"} } { "doc" : {"field" : "value"}, "_source": true} `), ) fmt.Println(res, err)
const response = await client.bulk({ body: [ { update: { _id: '1', _index: 'index1', retry_on_conflict: 3 } }, { doc: { field: 'value' } }, { update: { _id: '0', _index: 'index1', retry_on_conflict: 3 } }, { script: { source: 'ctx._source.counter += params.param1', lang: 'painless', params: { param1: 1 } }, upsert: { counter: 1 } }, { update: { _id: '2', _index: 'index1', retry_on_conflict: 3 } }, { doc: { field: 'value' }, doc_as_upsert: true }, { update: { _id: '3', _index: 'index1', _source: true } }, { doc: { field: 'value' } }, { update: { _id: '4', _index: 'index1' } }, { doc: { field: 'value' }, _source: true } ] }) console.log(response)
POST _bulk { "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} } { "doc" : {"field" : "value"} } { "update" : { "_id" : "0", "_index" : "index1", "retry_on_conflict" : 3} } { "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}} { "update" : {"_id" : "2", "_index" : "index1", "retry_on_conflict" : 3} } { "doc" : {"field" : "value"}, "doc_as_upsert" : true } { "update" : {"_id" : "3", "_index" : "index1", "_source" : true} } { "doc" : {"field" : "value"} } { "update" : {"_id" : "4", "_index" : "index1"} } { "doc" : {"field" : "value"}, "_source": true}
包含失败操作的示例编辑
以下批量 API 请求包含更新不存在的文档的操作。
$params = [ 'body' => [ [ 'update' => [ '_id' => '5', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'foo', ], ], [ 'update' => [ '_id' => '6', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'foo', ], ], [ 'create' => [ '_id' => '7', '_index' => 'index1', ], ], [ 'my_field' => 'foo', ], ], ]; $response = $client->bulk($params);
resp = client.bulk( body=[ {"update": {"_id": "5", "_index": "index1"}}, {"doc": {"my_field": "foo"}}, {"update": {"_id": "6", "_index": "index1"}}, {"doc": {"my_field": "foo"}}, {"create": {"_id": "7", "_index": "index1"}}, {"my_field": "foo"}, ], ) print(resp)
response = client.bulk( body: [ { update: { _id: '5', _index: 'index1' } }, { doc: { my_field: 'foo' } }, { update: { _id: '6', _index: 'index1' } }, { doc: { my_field: 'foo' } }, { create: { _id: '7', _index: 'index1' } }, { my_field: 'foo' } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "update": {"_id": "5", "_index": "index1"} } { "doc": {"my_field": "foo"} } { "update": {"_id": "6", "_index": "index1"} } { "doc": {"my_field": "foo"} } { "create": {"_id": "7", "_index": "index1"} } { "my_field": "foo" } `), ) fmt.Println(res, err)
const response = await client.bulk({ body: [ { update: { _id: '5', _index: 'index1' } }, { doc: { my_field: 'foo' } }, { update: { _id: '6', _index: 'index1' } }, { doc: { my_field: 'foo' } }, { create: { _id: '7', _index: 'index1' } }, { my_field: 'foo' } ] }) console.log(response)
POST /_bulk { "update": {"_id": "5", "_index": "index1"} } { "doc": {"my_field": "foo"} } { "update": {"_id": "6", "_index": "index1"} } { "doc": {"my_field": "foo"} } { "create": {"_id": "7", "_index": "index1"} } { "my_field": "foo" }
由于这些操作无法成功完成,因此 API 返回的响应的 errors
标志为 true
。
响应还包括任何失败操作的 error
对象。error
对象包含有关失败的附加信息,例如错误类型和原因。
{ "took": 486, "errors": true, "items": [ { "update": { "_index": "index1", "_id": "5", "status": 404, "error": { "type": "document_missing_exception", "reason": "[5]: document missing", "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA", "shard": "0", "index": "index1" } } }, { "update": { "_index": "index1", "_id": "6", "status": 404, "error": { "type": "document_missing_exception", "reason": "[6]: document missing", "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA", "shard": "0", "index": "index1" } } }, { "create": { "_index": "index1", "_id": "7", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "_seq_no": 0, "_primary_term": 1, "status": 201 } } ] }
要仅返回有关失败操作的信息,请使用参数为 items.*.error
的 filter_path
查询参数。
$params = [ 'body' => [ [ 'update' => [ '_id' => '5', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'baz', ], ], [ 'update' => [ '_id' => '6', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'baz', ], ], [ 'update' => [ '_id' => '7', '_index' => 'index1', ], ], [ 'doc' => [ 'my_field' => 'baz', ], ], ], ]; $response = $client->bulk($params);
resp = client.bulk( filter_path="items.*.error", body=[ {"update": {"_id": "5", "_index": "index1"}}, {"doc": {"my_field": "baz"}}, {"update": {"_id": "6", "_index": "index1"}}, {"doc": {"my_field": "baz"}}, {"update": {"_id": "7", "_index": "index1"}}, {"doc": {"my_field": "baz"}}, ], ) print(resp)
response = client.bulk( filter_path: 'items.*.error', body: [ { update: { _id: '5', _index: 'index1' } }, { doc: { my_field: 'baz' } }, { update: { _id: '6', _index: 'index1' } }, { doc: { my_field: 'baz' } }, { update: { _id: '7', _index: 'index1' } }, { doc: { my_field: 'baz' } } ] ) puts response
res, err := es.Bulk( strings.NewReader(` { "update": {"_id": "5", "_index": "index1"} } { "doc": {"my_field": "baz"} } { "update": {"_id": "6", "_index": "index1"} } { "doc": {"my_field": "baz"} } { "update": {"_id": "7", "_index": "index1"} } { "doc": {"my_field": "baz"} } `), es.Bulk.WithFilterPath("items.*.error"), ) fmt.Println(res, err)
const response = await client.bulk({ filter_path: 'items.*.error', body: [ { update: { _id: '5', _index: 'index1' } }, { doc: { my_field: 'baz' } }, { update: { _id: '6', _index: 'index1' } }, { doc: { my_field: 'baz' } }, { update: { _id: '7', _index: 'index1' } }, { doc: { my_field: 'baz' } } ] }) console.log(response)
POST /_bulk?filter_path=items.*.error { "update": {"_id": "5", "_index": "index1"} } { "doc": {"my_field": "baz"} } { "update": {"_id": "6", "_index": "index1"} } { "doc": {"my_field": "baz"} } { "update": {"_id": "7", "_index": "index1"} } { "doc": {"my_field": "baz"} }
API 返回以下结果。
{ "items": [ { "update": { "error": { "type": "document_missing_exception", "reason": "[5]: document missing", "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA", "shard": "0", "index": "index1" } } }, { "update": { "error": { "type": "document_missing_exception", "reason": "[6]: document missing", "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA", "shard": "0", "index": "index1" } } } ] }
包含动态模板参数的示例编辑
以下示例创建一个动态模板,然后执行一个包含 dynamic_templates
参数的索引/创建请求的批量请求。
resp = client.indices.create( index="my-index", body={ "mappings": { "dynamic_templates": [ {"geo_point": {"mapping": {"type": "geo_point"}}} ] } }, ) print(resp) resp = client.bulk( body=[ { "index": { "_index": "my_index", "_id": "1", "dynamic_templates": {"work_location": "geo_point"}, } }, { "field": "value1", "work_location": "41.12,-71.34", "raw_location": "41.12,-71.34", }, { "create": { "_index": "my_index", "_id": "2", "dynamic_templates": {"home_location": "geo_point"}, } }, {"field": "value2", "home_location": "41.12,-71.34"}, ], ) print(resp)
response = client.indices.create( index: 'my-index', body: { mappings: { dynamic_templates: [ { geo_point: { mapping: { type: 'geo_point' } } } ] } } ) puts response response = client.bulk( body: [ { index: { _index: 'my_index', _id: '1', dynamic_templates: { work_location: 'geo_point' } } }, { field: 'value1', work_location: '41.12,-71.34', raw_location: '41.12,-71.34' }, { create: { _index: 'my_index', _id: '2', dynamic_templates: { home_location: 'geo_point' } } }, { field: 'value2', home_location: '41.12,-71.34' } ] ) puts response
PUT my-index/ { "mappings": { "dynamic_templates": [ { "geo_point": { "mapping": { "type" : "geo_point" } } } ] } } POST /_bulk { "index" : { "_index" : "my_index", "_id" : "1", "dynamic_templates": {"work_location": "geo_point"}} } { "field" : "value1", "work_location": "41.12,-71.34", "raw_location": "41.12,-71.34"} { "create" : { "_index" : "my_index", "_id" : "2", "dynamic_templates": {"home_location": "geo_point"}} } { "field" : "value2", "home_location": "41.12,-71.34"}
批量请求根据 dynamic_templates
参数创建两个类型为 geo_point
的新字段 work_location
和 home_location
;但是,raw_location
字段是使用默认动态映射规则创建的,在这种情况下,由于它在 JSON 文档中作为字符串提供,因此它是一个 text
字段。