使用数据流

编辑

在您设置数据流之后,您可以执行以下操作

向数据流添加文档

编辑

要添加单个文档,请使用索引 API。 支持摄取管道

resp = client.index(
    index="my-data-stream",
    document={
        "@timestamp": "2099-03-08T11:06:07.000Z",
        "user": {
            "id": "8a4f500d"
        },
        "message": "Login successful"
    },
)
print(resp)
response = client.index(
  index: 'my-data-stream',
  body: {
    "@timestamp": '2099-03-08T11:06:07.000Z',
    user: {
      id: '8a4f500d'
    },
    message: 'Login successful'
  }
)
puts response
const response = await client.index({
  index: "my-data-stream",
  document: {
    "@timestamp": "2099-03-08T11:06:07.000Z",
    user: {
      id: "8a4f500d",
    },
    message: "Login successful",
  },
});
console.log(response);
POST /my-data-stream/_doc/
{
  "@timestamp": "2099-03-08T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

您不能使用索引 API 的 PUT /<target>/_doc/<_id> 请求格式向数据流添加新文档。要指定文档 ID,请改用 PUT /<target>/_create/<_id> 格式。仅支持 op_typecreate

要通过单个请求添加多个文档,请使用批量 API。 仅支持 create 操作。

resp = client.bulk(
    index="my-data-stream",
    refresh=True,
    operations=[
        {
            "create": {}
        },
        {
            "@timestamp": "2099-03-08T11:04:05.000Z",
            "user": {
                "id": "vlb44hny"
            },
            "message": "Login attempt failed"
        },
        {
            "create": {}
        },
        {
            "@timestamp": "2099-03-08T11:06:07.000Z",
            "user": {
                "id": "8a4f500d"
            },
            "message": "Login successful"
        },
        {
            "create": {}
        },
        {
            "@timestamp": "2099-03-09T11:07:08.000Z",
            "user": {
                "id": "l7gk7f82"
            },
            "message": "Logout successful"
        }
    ],
)
print(resp)
response = client.bulk(
  index: 'my-data-stream',
  refresh: true,
  body: [
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-08T11:04:05.000Z',
      user: {
        id: 'vlb44hny'
      },
      message: 'Login attempt failed'
    },
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-08T11:06:07.000Z',
      user: {
        id: '8a4f500d'
      },
      message: 'Login successful'
    },
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-09T11:07:08.000Z',
      user: {
        id: 'l7gk7f82'
      },
      message: 'Logout successful'
    }
  ]
)
puts response
const response = await client.bulk({
  index: "my-data-stream",
  refresh: "true",
  operations: [
    {
      create: {},
    },
    {
      "@timestamp": "2099-03-08T11:04:05.000Z",
      user: {
        id: "vlb44hny",
      },
      message: "Login attempt failed",
    },
    {
      create: {},
    },
    {
      "@timestamp": "2099-03-08T11:06:07.000Z",
      user: {
        id: "8a4f500d",
      },
      message: "Login successful",
    },
    {
      create: {},
    },
    {
      "@timestamp": "2099-03-09T11:07:08.000Z",
      user: {
        id: "l7gk7f82",
      },
      message: "Logout successful",
    },
  ],
});
console.log(response);
PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

搜索数据流

编辑

以下搜索 API 支持数据流

获取数据流的统计信息

编辑

使用数据流统计 API来获取一个或多个数据流的统计信息

resp = client.indices.data_streams_stats(
    name="my-data-stream",
    human=True,
)
print(resp)
response = client.indices.data_streams_stats(
  name: 'my-data-stream',
  human: true
)
puts response
const response = await client.indices.dataStreamsStats({
  name: "my-data-stream",
  human: "true",
});
console.log(response);
GET /_data_stream/my-data-stream/_stats?human=true

手动滚动更新数据流

编辑

使用滚动更新 API手动滚动更新数据流。 手动滚动更新时,您有两个选择

  1. 立即触发滚动更新

    resp = client.indices.rollover(
        alias="my-data-stream",
    )
    print(resp)
    response = client.indices.rollover(
      alias: 'my-data-stream'
    )
    puts response
    const response = await client.indices.rollover({
      alias: "my-data-stream",
    });
    console.log(response);
    POST /my-data-stream/_rollover/
  2. 或者推迟滚动更新,直到下一个索引事件发生

    resp = client.indices.rollover(
        alias="my-data-stream",
        lazy=True,
    )
    print(resp)
    response = client.indices.rollover(
      alias: 'my-data-stream',
      lazy: true
    )
    puts response
    const response = await client.indices.rollover({
      alias: "my-data-stream",
      lazy: "true",
    });
    console.log(response);
    POST /my-data-stream/_rollover?lazy

    使用第二个选项可以避免在不经常更新的数据流中出现空的后备索引。

打开已关闭的后备索引

编辑

您无法搜索已关闭的后备索引,即使通过搜索其数据流也不行。您也无法更新删除已关闭索引中的文档。

要重新打开已关闭的后备索引,请直接向该索引提交一个打开索引 API 请求

resp = client.indices.open(
    index=".ds-my-data-stream-2099.03.07-000001",
)
print(resp)
response = client.indices.open(
  index: '.ds-my-data-stream-2099.03.07-000001'
)
puts response
const response = await client.indices.open({
  index: ".ds-my-data-stream-2099.03.07-000001",
});
console.log(response);
POST /.ds-my-data-stream-2099.03.07-000001/_open/

要重新打开数据流的所有已关闭后备索引,请向该流提交一个打开索引 API 请求

resp = client.indices.open(
    index="my-data-stream",
)
print(resp)
response = client.indices.open(
  index: 'my-data-stream'
)
puts response
const response = await client.indices.open({
  index: "my-data-stream",
});
console.log(response);
POST /my-data-stream/_open/

使用数据流重新索引

编辑

使用重新索引 API将文档从现有索引、别名或数据流复制到数据流。由于数据流是仅追加的,因此重新索引到数据流必须使用 op_typecreate。重新索引无法更新数据流中的现有文档。

resp = client.reindex(
    source={
        "index": "archive"
    },
    dest={
        "index": "my-data-stream",
        "op_type": "create"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'archive'
    },
    dest: {
      index: 'my-data-stream',
      op_type: 'create'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: "archive",
  },
  dest: {
    index: "my-data-stream",
    op_type: "create",
  },
});
console.log(response);
POST /_reindex
{
  "source": {
    "index": "archive"
  },
  "dest": {
    "index": "my-data-stream",
    "op_type": "create"
  }
}

通过查询更新数据流中的文档

编辑

使用按查询更新 API来更新数据流中与提供的查询匹配的文档

resp = client.update_by_query(
    index="my-data-stream",
    query={
        "match": {
            "user.id": "l7gk7f82"
        }
    },
    script={
        "source": "ctx._source.user.id = params.new_id",
        "params": {
            "new_id": "XgdX0NoX"
        }
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-data-stream',
  body: {
    query: {
      match: {
        'user.id' => 'l7gk7f82'
      }
    },
    script: {
      source: 'ctx._source.user.id = params.new_id',
      params: {
        new_id: 'XgdX0NoX'
      }
    }
  }
)
puts response
const response = await client.updateByQuery({
  index: "my-data-stream",
  query: {
    match: {
      "user.id": "l7gk7f82",
    },
  },
  script: {
    source: "ctx._source.user.id = params.new_id",
    params: {
      new_id: "XgdX0NoX",
    },
  },
});
console.log(response);
POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
    }
  },
  "script": {
    "source": "ctx._source.user.id = params.new_id",
    "params": {
      "new_id": "XgdX0NoX"
    }
  }
}

通过查询删除数据流中的文档

编辑

使用按查询删除 API来删除数据流中与提供的查询匹配的文档

resp = client.delete_by_query(
    index="my-data-stream",
    query={
        "match": {
            "user.id": "vlb44hny"
        }
    },
)
print(resp)
response = client.delete_by_query(
  index: 'my-data-stream',
  body: {
    query: {
      match: {
        'user.id' => 'vlb44hny'
      }
    }
  }
)
puts response
const response = await client.deleteByQuery({
  index: "my-data-stream",
  query: {
    match: {
      "user.id": "vlb44hny",
    },
  },
});
console.log(response);
POST /my-data-stream/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

更新或删除后备索引中的文档

编辑

如果需要,您可以通过将请求发送到包含文档的后备索引来更新或删除数据流中的文档。您需要

要获取此信息,请使用搜索请求

resp = client.search(
    index="my-data-stream",
    seq_no_primary_term=True,
    query={
        "match": {
            "user.id": "yWIumJd7"
        }
    },
)
print(resp)
response = client.search(
  index: 'my-data-stream',
  body: {
    seq_no_primary_term: true,
    query: {
      match: {
        'user.id' => 'yWIumJd7'
      }
    }
  }
)
puts response
const response = await client.search({
  index: "my-data-stream",
  seq_no_primary_term: true,
  query: {
    match: {
      "user.id": "yWIumJd7",
    },
  },
});
console.log(response);
GET /my-data-stream/_search
{
  "seq_no_primary_term": true,
  "query": {
    "match": {
      "user.id": "yWIumJd7"
    }
  }
}

响应

{
  "took": 20,
  "timed_out": false,
  "_shards": {
    "total": 3,
    "successful": 3,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 0.2876821,
    "hits": [
      {
        "_index": ".ds-my-data-stream-2099.03.08-000003",      
        "_id": "bfspvnIBr7VVZlfp2lqX",              
        "_seq_no": 0,                               
        "_primary_term": 1,                         
        "_score": 0.2876821,
        "_source": {
          "@timestamp": "2099-03-08T11:06:07.000Z",
          "user": {
            "id": "yWIumJd7"
          },
          "message": "Login successful"
        }
      }
    ]
  }
}

包含匹配文档的后备索引

文档的文档 ID

文档的当前序列号

文档的主项

要更新文档,请使用具有有效 if_seq_noif_primary_term 参数的索引 API 请求

resp = client.index(
    index=".ds-my-data-stream-2099-03-08-000003",
    id="bfspvnIBr7VVZlfp2lqX",
    if_seq_no="0",
    if_primary_term="1",
    document={
        "@timestamp": "2099-03-08T11:06:07.000Z",
        "user": {
            "id": "8a4f500d"
        },
        "message": "Login successful"
    },
)
print(resp)
const response = await client.index({
  index: ".ds-my-data-stream-2099-03-08-000003",
  id: "bfspvnIBr7VVZlfp2lqX",
  if_seq_no: 0,
  if_primary_term: 1,
  document: {
    "@timestamp": "2099-03-08T11:06:07.000Z",
    user: {
      id: "8a4f500d",
    },
    message: "Login successful",
  },
});
console.log(response);
PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
  "@timestamp": "2099-03-08T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

要删除文档,请使用删除 API

resp = client.delete(
    index=".ds-my-data-stream-2099.03.08-000003",
    id="bfspvnIBr7VVZlfp2lqX",
)
print(resp)
response = client.delete(
  index: '.ds-my-data-stream-2099.03.08-000003',
  id: 'bfspvnIBr7VVZlfp2lqX'
)
puts response
const response = await client.delete({
  index: ".ds-my-data-stream-2099.03.08-000003",
  id: "bfspvnIBr7VVZlfp2lqX",
});
console.log(response);
DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX

要通过单个请求删除或更新多个文档,请使用批量 APIdeleteindexupdate 操作。对于 index 操作,请包含有效的if_seq_noif_primary_term 参数。

resp = client.bulk(
    refresh=True,
    operations=[
        {
            "index": {
                "_index": ".ds-my-data-stream-2099.03.08-000003",
                "_id": "bfspvnIBr7VVZlfp2lqX",
                "if_seq_no": 0,
                "if_primary_term": 1
            }
        },
        {
            "@timestamp": "2099-03-08T11:06:07.000Z",
            "user": {
                "id": "8a4f500d"
            },
            "message": "Login successful"
        }
    ],
)
print(resp)
response = client.bulk(
  refresh: true,
  body: [
    {
      index: {
        _index: '.ds-my-data-stream-2099.03.08-000003',
        _id: 'bfspvnIBr7VVZlfp2lqX',
        if_seq_no: 0,
        if_primary_term: 1
      }
    },
    {
      "@timestamp": '2099-03-08T11:06:07.000Z',
      user: {
        id: '8a4f500d'
      },
      message: 'Login successful'
    }
  ]
)
puts response
const response = await client.bulk({
  refresh: "true",
  operations: [
    {
      index: {
        _index: ".ds-my-data-stream-2099.03.08-000003",
        _id: "bfspvnIBr7VVZlfp2lqX",
        if_seq_no: 0,
        if_primary_term: 1,
      },
    },
    {
      "@timestamp": "2099-03-08T11:06:07.000Z",
      user: {
        id: "8a4f500d",
      },
      message: "Login successful",
    },
  ],
});
console.log(response);
PUT /_bulk?refresh
{ "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }