使用数据流
编辑使用数据流
编辑在您设置数据流之后,您可以执行以下操作
向数据流添加文档
编辑resp = client.index( index="my-data-stream", document={ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }, ) print(resp)
response = client.index( index: 'my-data-stream', body: { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' } ) puts response
const response = await client.index({ index: "my-data-stream", document: { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, }); console.log(response);
POST /my-data-stream/_doc/ { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
您不能使用索引 API 的PUT /<target>/_doc/<_id>
请求格式向数据流添加新文档。要指定文档 ID,请改用PUT /<target>/_create/<_id>
格式。仅支持op_type
为create
。
要使用单个请求添加多个文档,请使用批量 API。仅支持create
操作。
resp = client.bulk( index="my-data-stream", refresh=True, operations=[ { "create": {} }, { "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }, { "create": {} }, { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }, { "create": {} }, { "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" } ], ) print(resp)
response = client.bulk( index: 'my-data-stream', refresh: true, body: [ { create: {} }, { "@timestamp": '2099-03-08T11:04:05.000Z', user: { id: 'vlb44hny' }, message: 'Login attempt failed' }, { create: {} }, { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' }, { create: {} }, { "@timestamp": '2099-03-09T11:07:08.000Z', user: { id: 'l7gk7f82' }, message: 'Logout successful' } ] ) puts response
const response = await client.bulk({ index: "my-data-stream", refresh: "true", operations: [ { create: {}, }, { "@timestamp": "2099-03-08T11:04:05.000Z", user: { id: "vlb44hny", }, message: "Login attempt failed", }, { create: {}, }, { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, { create: {}, }, { "@timestamp": "2099-03-09T11:07:08.000Z", user: { id: "l7gk7f82", }, message: "Logout successful", }, ], }); console.log(response);
PUT /my-data-stream/_bulk?refresh {"create":{ }} { "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" } {"create":{ }} { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } {"create":{ }} { "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
搜索数据流
编辑以下搜索 API 支持数据流
获取数据流的统计信息
编辑使用数据流统计信息 API获取一个或多个数据流的统计信息
resp = client.indices.data_streams_stats( name="my-data-stream", human=True, ) print(resp)
response = client.indices.data_streams_stats( name: 'my-data-stream', human: true ) puts response
const response = await client.indices.dataStreamsStats({ name: "my-data-stream", human: "true", }); console.log(response);
GET /_data_stream/my-data-stream/_stats?human=true
手动滚动数据流
编辑-
立即触发滚动
resp = client.indices.rollover( alias="my-data-stream", ) print(resp)
response = client.indices.rollover( alias: 'my-data-stream' ) puts response
const response = await client.indices.rollover({ alias: "my-data-stream", }); console.log(response);
POST /my-data-stream/_rollover/
-
或将滚动推迟到下一个索引事件发生
resp = client.indices.rollover( alias="my-data-stream", lazy=True, ) print(resp)
response = client.indices.rollover( alias: 'my-data-stream', lazy: true ) puts response
const response = await client.indices.rollover({ alias: "my-data-stream", lazy: "true", }); console.log(response);
POST /my-data-stream/_rollover?lazy
对于不经常更新的数据流,使用第二个选项可以避免出现空的支持索引。
打开已关闭的支持索引
编辑您无法搜索已关闭的支持索引,即使通过搜索其数据流也不行。您也不能更新或删除已关闭索引中的文档。
要重新打开已关闭的支持索引,请直接向索引提交打开索引 API 请求
resp = client.indices.open( index=".ds-my-data-stream-2099.03.07-000001", ) print(resp)
response = client.indices.open( index: '.ds-my-data-stream-2099.03.07-000001' ) puts response
const response = await client.indices.open({ index: ".ds-my-data-stream-2099.03.07-000001", }); console.log(response);
POST /.ds-my-data-stream-2099.03.07-000001/_open/
要重新打开数据流的所有已关闭的支持索引,请向数据流提交打开索引 API 请求
resp = client.indices.open( index="my-data-stream", ) print(resp)
response = client.indices.open( index: 'my-data-stream' ) puts response
const response = await client.indices.open({ index: "my-data-stream", }); console.log(response);
POST /my-data-stream/_open/
使用数据流重新索引
编辑使用重新索引 API将文档从现有索引、别名或数据流复制到数据流。由于数据流是仅追加的,因此重新索引到数据流必须使用op_type
为create
。重新索引不能更新数据流中的现有文档。
resp = client.reindex( source={ "index": "archive" }, dest={ "index": "my-data-stream", "op_type": "create" }, ) print(resp)
response = client.reindex( body: { source: { index: 'archive' }, dest: { index: 'my-data-stream', op_type: 'create' } } ) puts response
const response = await client.reindex({ source: { index: "archive", }, dest: { index: "my-data-stream", op_type: "create", }, }); console.log(response);
POST /_reindex { "source": { "index": "archive" }, "dest": { "index": "my-data-stream", "op_type": "create" } }
通过查询更新数据流中的文档
编辑使用按查询更新 API更新与提供的查询匹配的数据流中的文档
resp = client.update_by_query( index="my-data-stream", query={ "match": { "user.id": "l7gk7f82" } }, script={ "source": "ctx._source.user.id = params.new_id", "params": { "new_id": "XgdX0NoX" } }, ) print(resp)
response = client.update_by_query( index: 'my-data-stream', body: { query: { match: { 'user.id' => 'l7gk7f82' } }, script: { source: 'ctx._source.user.id = params.new_id', params: { new_id: 'XgdX0NoX' } } } ) puts response
const response = await client.updateByQuery({ index: "my-data-stream", query: { match: { "user.id": "l7gk7f82", }, }, script: { source: "ctx._source.user.id = params.new_id", params: { new_id: "XgdX0NoX", }, }, }); console.log(response);
POST /my-data-stream/_update_by_query { "query": { "match": { "user.id": "l7gk7f82" } }, "script": { "source": "ctx._source.user.id = params.new_id", "params": { "new_id": "XgdX0NoX" } } }
通过查询删除数据流中的文档
编辑使用按查询删除 API删除与提供的查询匹配的数据流中的文档
resp = client.delete_by_query( index="my-data-stream", query={ "match": { "user.id": "vlb44hny" } }, ) print(resp)
response = client.delete_by_query( index: 'my-data-stream', body: { query: { match: { 'user.id' => 'vlb44hny' } } } ) puts response
const response = await client.deleteByQuery({ index: "my-data-stream", query: { match: { "user.id": "vlb44hny", }, }, }); console.log(response);
POST /my-data-stream/_delete_by_query { "query": { "match": { "user.id": "vlb44hny" } } }
更新或删除支持索引中的文档
编辑如果需要,您可以通过向包含文档的支持索引发送请求来更新或删除数据流中的文档。您需要
要获取此信息,请使用搜索请求
resp = client.search( index="my-data-stream", seq_no_primary_term=True, query={ "match": { "user.id": "yWIumJd7" } }, ) print(resp)
response = client.search( index: 'my-data-stream', body: { seq_no_primary_term: true, query: { match: { 'user.id' => 'yWIumJd7' } } } ) puts response
const response = await client.search({ index: "my-data-stream", seq_no_primary_term: true, query: { match: { "user.id": "yWIumJd7", }, }, }); console.log(response);
GET /my-data-stream/_search { "seq_no_primary_term": true, "query": { "match": { "user.id": "yWIumJd7" } } }
响应
{ "took": 20, "timed_out": false, "_shards": { "total": 3, "successful": 3, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 0.2876821, "hits": [ { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "_seq_no": 0, "_primary_term": 1, "_score": 0.2876821, "_source": { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "yWIumJd7" }, "message": "Login successful" } } ] } }
要更新文档,请使用带有有效if_seq_no
和if_primary_term
参数的索引 API请求
resp = client.index( index=".ds-my-data-stream-2099-03-08-000003", id="bfspvnIBr7VVZlfp2lqX", if_seq_no="0", if_primary_term="1", document={ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }, ) print(resp)
const response = await client.index({ index: ".ds-my-data-stream-2099-03-08-000003", id: "bfspvnIBr7VVZlfp2lqX", if_seq_no: 0, if_primary_term: 1, document: { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, }); console.log(response);
PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1 { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
要删除文档,请使用删除 API
resp = client.delete( index=".ds-my-data-stream-2099.03.08-000003", id="bfspvnIBr7VVZlfp2lqX", ) print(resp)
response = client.delete( index: '.ds-my-data-stream-2099.03.08-000003', id: 'bfspvnIBr7VVZlfp2lqX' ) puts response
const response = await client.delete({ index: ".ds-my-data-stream-2099.03.08-000003", id: "bfspvnIBr7VVZlfp2lqX", }); console.log(response);
DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX
要使用单个请求删除或更新多个文档,请使用批量 API的delete
、index
和update
操作。对于index
操作,请包含有效的if_seq_no
和if_primary_term
参数。
resp = client.bulk( refresh=True, operations=[ { "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }, { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } ], ) print(resp)
response = client.bulk( refresh: true, body: [ { index: { _index: '.ds-my-data-stream-2099.03.08-000003', _id: 'bfspvnIBr7VVZlfp2lqX', if_seq_no: 0, if_primary_term: 1 } }, { "@timestamp": '2099-03-08T11:06:07.000Z', user: { id: '8a4f500d' }, message: 'Login successful' } ] ) puts response
const response = await client.bulk({ refresh: "true", operations: [ { index: { _index: ".ds-my-data-stream-2099.03.08-000003", _id: "bfspvnIBr7VVZlfp2lqX", if_seq_no: 0, if_primary_term: 1, }, }, { "@timestamp": "2099-03-08T11:06:07.000Z", user: { id: "8a4f500d", }, message: "Login successful", }, ], }); console.log(response);
PUT /_bulk?refresh { "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } } { "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }