使用数据流
编辑使用数据流
编辑在您设置数据流之后,您可以执行以下操作
向数据流添加文档
编辑resp = client.index( index="my-data-stream", document={ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }, ) print(resp)
response = client.index( index: 'my-data-stream', body: { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' } ) puts response
const response = await client.index({ index: "my-data-stream", document: { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, }); console.log(response);
POST /my-data-stream/_doc/ { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
您不能使用索引 API 的 PUT /<target>/_doc/<_id>
请求格式向数据流添加新文档。要指定文档 ID,请改用 PUT /<target>/_create/<_id>
格式。仅支持 op_type
为 create
。
要通过单个请求添加多个文档,请使用批量 API。 仅支持 create
操作。
resp = client.bulk( index="my-data-stream", refresh=True, operations=[ { "create": {} }, { "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }, { "create": {} }, { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }, { "create": {} }, { "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" } ], ) print(resp)
response = client.bulk( index: 'my-data-stream', refresh: true, body: [ { create: {} }, { "@timestamp": '2099-03-08T11:04:05.000Z', user: { id: 'vlb44hny' }, message: 'Login attempt failed' }, { create: {} }, { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' }, { create: {} }, { "@timestamp": '2099-03-09T11:07:08.000Z', user: { id: 'l7gk7f82' }, message: 'Logout successful' } ] ) puts response
const response = await client.bulk({ index: "my-data-stream", refresh: "true", operations: [ { create: {}, }, { "@timestamp": "2099-03-08T11:04:05.000Z", user: { id: "vlb44hny", }, message: "Login attempt failed", }, { create: {}, }, { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, { create: {}, }, { "@timestamp": "2099-03-09T11:07:08.000Z", user: { id: "l7gk7f82", }, message: "Logout successful", }, ], }); console.log(response);
PUT /my-data-stream/_bulk?refresh {"create":{ }} { "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" } {"create":{ }} { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } {"create":{ }} { "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
搜索数据流
编辑以下搜索 API 支持数据流
获取数据流的统计信息
编辑使用数据流统计 API来获取一个或多个数据流的统计信息
resp = client.indices.data_streams_stats( name="my-data-stream", human=True, ) print(resp)
response = client.indices.data_streams_stats( name: 'my-data-stream', human: true ) puts response
const response = await client.indices.dataStreamsStats({ name: "my-data-stream", human: "true", }); console.log(response);
GET /_data_stream/my-data-stream/_stats?human=true
手动滚动更新数据流
编辑使用滚动更新 API手动滚动更新数据流。 手动滚动更新时,您有两个选择
-
立即触发滚动更新
resp = client.indices.rollover( alias="my-data-stream", ) print(resp)
response = client.indices.rollover( alias: 'my-data-stream' ) puts response
const response = await client.indices.rollover({ alias: "my-data-stream", }); console.log(response);
POST /my-data-stream/_rollover/
-
或者推迟滚动更新,直到下一个索引事件发生
resp = client.indices.rollover( alias="my-data-stream", lazy=True, ) print(resp)
response = client.indices.rollover( alias: 'my-data-stream', lazy: true ) puts response
const response = await client.indices.rollover({ alias: "my-data-stream", lazy: "true", }); console.log(response);
POST /my-data-stream/_rollover?lazy
使用第二个选项可以避免在不经常更新的数据流中出现空的后备索引。
打开已关闭的后备索引
编辑您无法搜索已关闭的后备索引,即使通过搜索其数据流也不行。您也无法更新或删除已关闭索引中的文档。
要重新打开已关闭的后备索引,请直接向该索引提交一个打开索引 API 请求
resp = client.indices.open( index=".ds-my-data-stream-2099.03.07-000001", ) print(resp)
response = client.indices.open( index: '.ds-my-data-stream-2099.03.07-000001' ) puts response
const response = await client.indices.open({ index: ".ds-my-data-stream-2099.03.07-000001", }); console.log(response);
POST /.ds-my-data-stream-2099.03.07-000001/_open/
要重新打开数据流的所有已关闭后备索引,请向该流提交一个打开索引 API 请求
resp = client.indices.open( index="my-data-stream", ) print(resp)
response = client.indices.open( index: 'my-data-stream' ) puts response
const response = await client.indices.open({ index: "my-data-stream", }); console.log(response);
POST /my-data-stream/_open/
使用数据流重新索引
编辑使用重新索引 API将文档从现有索引、别名或数据流复制到数据流。由于数据流是仅追加的,因此重新索引到数据流必须使用 op_type
为 create
。重新索引无法更新数据流中的现有文档。
resp = client.reindex( source={ "index": "archive" }, dest={ "index": "my-data-stream", "op_type": "create" }, ) print(resp)
response = client.reindex( body: { source: { index: 'archive' }, dest: { index: 'my-data-stream', op_type: 'create' } } ) puts response
const response = await client.reindex({ source: { index: "archive", }, dest: { index: "my-data-stream", op_type: "create", }, }); console.log(response);
POST /_reindex { "source": { "index": "archive" }, "dest": { "index": "my-data-stream", "op_type": "create" } }
通过查询更新数据流中的文档
编辑使用按查询更新 API来更新数据流中与提供的查询匹配的文档
resp = client.update_by_query( index="my-data-stream", query={ "match": { "user.id": "l7gk7f82" } }, script={ "source": "ctx._source.user.id = params.new_id", "params": { "new_id": "XgdX0NoX" } }, ) print(resp)
response = client.update_by_query( index: 'my-data-stream', body: { query: { match: { 'user.id' => 'l7gk7f82' } }, script: { source: 'ctx._source.user.id = params.new_id', params: { new_id: 'XgdX0NoX' } } } ) puts response
const response = await client.updateByQuery({ index: "my-data-stream", query: { match: { "user.id": "l7gk7f82", }, }, script: { source: "ctx._source.user.id = params.new_id", params: { new_id: "XgdX0NoX", }, }, }); console.log(response);
POST /my-data-stream/_update_by_query { "query": { "match": { "user.id": "l7gk7f82" } }, "script": { "source": "ctx._source.user.id = params.new_id", "params": { "new_id": "XgdX0NoX" } } }
通过查询删除数据流中的文档
编辑使用按查询删除 API来删除数据流中与提供的查询匹配的文档
resp = client.delete_by_query( index="my-data-stream", query={ "match": { "user.id": "vlb44hny" } }, ) print(resp)
response = client.delete_by_query( index: 'my-data-stream', body: { query: { match: { 'user.id' => 'vlb44hny' } } } ) puts response
const response = await client.deleteByQuery({ index: "my-data-stream", query: { match: { "user.id": "vlb44hny", }, }, }); console.log(response);
POST /my-data-stream/_delete_by_query { "query": { "match": { "user.id": "vlb44hny" } } }
更新或删除后备索引中的文档
编辑如果需要,您可以通过将请求发送到包含文档的后备索引来更新或删除数据流中的文档。您需要
要获取此信息,请使用搜索请求
resp = client.search( index="my-data-stream", seq_no_primary_term=True, query={ "match": { "user.id": "yWIumJd7" } }, ) print(resp)
response = client.search( index: 'my-data-stream', body: { seq_no_primary_term: true, query: { match: { 'user.id' => 'yWIumJd7' } } } ) puts response
const response = await client.search({ index: "my-data-stream", seq_no_primary_term: true, query: { match: { "user.id": "yWIumJd7", }, }, }); console.log(response);
GET /my-data-stream/_search { "seq_no_primary_term": true, "query": { "match": { "user.id": "yWIumJd7" } } }
响应
{ "took": 20, "timed_out": false, "_shards": { "total": 3, "successful": 3, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 0.2876821, "hits": [ { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "_seq_no": 0, "_primary_term": 1, "_score": 0.2876821, "_source": { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "yWIumJd7" }, "message": "Login successful" } } ] } }
要更新文档,请使用具有有效 if_seq_no
和 if_primary_term
参数的索引 API 请求
resp = client.index( index=".ds-my-data-stream-2099-03-08-000003", id="bfspvnIBr7VVZlfp2lqX", if_seq_no="0", if_primary_term="1", document={ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }, ) print(resp)
const response = await client.index({ index: ".ds-my-data-stream-2099-03-08-000003", id: "bfspvnIBr7VVZlfp2lqX", if_seq_no: 0, if_primary_term: 1, document: { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, }); console.log(response);
PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1 { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
要删除文档,请使用删除 API
resp = client.delete( index=".ds-my-data-stream-2099.03.08-000003", id="bfspvnIBr7VVZlfp2lqX", ) print(resp)
response = client.delete( index: '.ds-my-data-stream-2099.03.08-000003', id: 'bfspvnIBr7VVZlfp2lqX' ) puts response
const response = await client.delete({ index: ".ds-my-data-stream-2099.03.08-000003", id: "bfspvnIBr7VVZlfp2lqX", }); console.log(response);
DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX
要通过单个请求删除或更新多个文档,请使用批量 API 的 delete
、index
和 update
操作。对于 index
操作,请包含有效的if_seq_no
和 if_primary_term
参数。
resp = client.bulk( refresh=True, operations=[ { "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }, { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } ], ) print(resp)
response = client.bulk( refresh: true, body: [ { index: { _index: '.ds-my-data-stream-2099.03.08-000003', _id: 'bfspvnIBr7VVZlfp2lqX', if_seq_no: 0, if_primary_term: 1 } }, { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' } ] ) puts response
const response = await client.bulk({ refresh: "true", operations: [ { index: { _index: ".ds-my-data-stream-2099.03.08-000003", _id: "bfspvnIBr7VVZlfp2lqX", if_seq_no: 0, if_primary_term: 1, }, }, { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, ], }); console.log(response);
PUT /_bulk?refresh { "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } } { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }