使用数据流

编辑

在您设置数据流之后,您可以执行以下操作

向数据流添加文档

编辑

要添加单个文档,请使用索引 API摄取管道受支持。

resp = client.index(
    index="my-data-stream",
    document={
        "@timestamp": "2099-03-08T11:06:07.000Z",
        "user": {
            "id": "8a4f500d"
        },
        "message": "Login successful"
    },
)
print(resp)
response = client.index(
  index: 'my-data-stream',
  body: {
    "@timestamp": '2099-03-08T11:06:07.000Z',
    user: {
      id: '8a4f500d'
    },
    message: 'Login successful'
  }
)
puts response
const response = await client.index({
  index: "my-data-stream",
  document: {
    "@timestamp": "2099-03-08T11:06:07.000Z",
    user: {
      id: "8a4f500d",
    },
    message: "Login successful",
  },
});
console.log(response);
POST /my-data-stream/_doc/
{
  "@timestamp": "2099-03-08T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

您不能使用索引 API 的PUT /<target>/_doc/<_id>请求格式向数据流添加新文档。要指定文档 ID,请改用PUT /<target>/_create/<_id>格式。仅支持op_typecreate

要使用单个请求添加多个文档,请使用批量 API。仅支持create操作。

resp = client.bulk(
    index="my-data-stream",
    refresh=True,
    operations=[
        {
            "create": {}
        },
        {
            "@timestamp": "2099-03-08T11:04:05.000Z",
            "user": {
                "id": "vlb44hny"
            },
            "message": "Login attempt failed"
        },
        {
            "create": {}
        },
        {
            "@timestamp": "2099-03-08T11:06:07.000Z",
            "user": {
                "id": "8a4f500d"
            },
            "message": "Login successful"
        },
        {
            "create": {}
        },
        {
            "@timestamp": "2099-03-09T11:07:08.000Z",
            "user": {
                "id": "l7gk7f82"
            },
            "message": "Logout successful"
        }
    ],
)
print(resp)
response = client.bulk(
  index: 'my-data-stream',
  refresh: true,
  body: [
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-08T11:04:05.000Z',
      user: {
        id: 'vlb44hny'
      },
      message: 'Login attempt failed'
    },
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-08T11:06:07.000Z',
      user: {
        id: '8a4f500d'
      },
      message: 'Login successful'
    },
    {
      create: {}
    },
    {
      "@timestamp": '2099-03-09T11:07:08.000Z',
      user: {
        id: 'l7gk7f82'
      },
      message: 'Logout successful'
    }
  ]
)
puts response
const response = await client.bulk({
  index: "my-data-stream",
  refresh: "true",
  operations: [
    {
      create: {},
    },
    {
      "@timestamp": "2099-03-08T11:04:05.000Z",
      user: {
        id: "vlb44hny",
      },
      message: "Login attempt failed",
    },
    {
      create: {},
    },
    {
      "@timestamp": "2099-03-08T11:06:07.000Z",
      user: {
        id: "8a4f500d",
      },
      message: "Login successful",
    },
    {
      create: {},
    },
    {
      "@timestamp": "2099-03-09T11:07:08.000Z",
      user: {
        id: "l7gk7f82",
      },
      message: "Logout successful",
    },
  ],
});
console.log(response);
PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

搜索数据流

编辑

以下搜索 API 支持数据流

获取数据流的统计信息

编辑

使用数据流统计信息 API获取一个或多个数据流的统计信息

resp = client.indices.data_streams_stats(
    name="my-data-stream",
    human=True,
)
print(resp)
response = client.indices.data_streams_stats(
  name: 'my-data-stream',
  human: true
)
puts response
const response = await client.indices.dataStreamsStats({
  name: "my-data-stream",
  human: "true",
});
console.log(response);
GET /_data_stream/my-data-stream/_stats?human=true

手动滚动数据流

编辑

使用滚动 API手动滚动数据流。手动滚动时有两个选项

  1. 立即触发滚动

    resp = client.indices.rollover(
        alias="my-data-stream",
    )
    print(resp)
    response = client.indices.rollover(
      alias: 'my-data-stream'
    )
    puts response
    const response = await client.indices.rollover({
      alias: "my-data-stream",
    });
    console.log(response);
    POST /my-data-stream/_rollover/
  2. 或将滚动推迟到下一个索引事件发生

    resp = client.indices.rollover(
        alias="my-data-stream",
        lazy=True,
    )
    print(resp)
    response = client.indices.rollover(
      alias: 'my-data-stream',
      lazy: true
    )
    puts response
    const response = await client.indices.rollover({
      alias: "my-data-stream",
      lazy: "true",
    });
    console.log(response);
    POST /my-data-stream/_rollover?lazy

    对于不经常更新的数据流,使用第二个选项可以避免出现空的支持索引。

打开已关闭的支持索引

编辑

您无法搜索已关闭的支持索引,即使通过搜索其数据流也不行。您也不能更新删除已关闭索引中的文档。

要重新打开已关闭的支持索引,请直接向索引提交打开索引 API 请求

resp = client.indices.open(
    index=".ds-my-data-stream-2099.03.07-000001",
)
print(resp)
response = client.indices.open(
  index: '.ds-my-data-stream-2099.03.07-000001'
)
puts response
const response = await client.indices.open({
  index: ".ds-my-data-stream-2099.03.07-000001",
});
console.log(response);
POST /.ds-my-data-stream-2099.03.07-000001/_open/

要重新打开数据流的所有已关闭的支持索引,请向数据流提交打开索引 API 请求

resp = client.indices.open(
    index="my-data-stream",
)
print(resp)
response = client.indices.open(
  index: 'my-data-stream'
)
puts response
const response = await client.indices.open({
  index: "my-data-stream",
});
console.log(response);
POST /my-data-stream/_open/

使用数据流重新索引

编辑

使用重新索引 API将文档从现有索引、别名或数据流复制到数据流。由于数据流是仅追加的,因此重新索引到数据流必须使用op_typecreate。重新索引不能更新数据流中的现有文档。

resp = client.reindex(
    source={
        "index": "archive"
    },
    dest={
        "index": "my-data-stream",
        "op_type": "create"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'archive'
    },
    dest: {
      index: 'my-data-stream',
      op_type: 'create'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: "archive",
  },
  dest: {
    index: "my-data-stream",
    op_type: "create",
  },
});
console.log(response);
POST /_reindex
{
  "source": {
    "index": "archive"
  },
  "dest": {
    "index": "my-data-stream",
    "op_type": "create"
  }
}

通过查询更新数据流中的文档

编辑

使用按查询更新 API更新与提供的查询匹配的数据流中的文档

resp = client.update_by_query(
    index="my-data-stream",
    query={
        "match": {
            "user.id": "l7gk7f82"
        }
    },
    script={
        "source": "ctx._source.user.id = params.new_id",
        "params": {
            "new_id": "XgdX0NoX"
        }
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-data-stream',
  body: {
    query: {
      match: {
        'user.id' => 'l7gk7f82'
      }
    },
    script: {
      source: 'ctx._source.user.id = params.new_id',
      params: {
        new_id: 'XgdX0NoX'
      }
    }
  }
)
puts response
const response = await client.updateByQuery({
  index: "my-data-stream",
  query: {
    match: {
      "user.id": "l7gk7f82",
    },
  },
  script: {
    source: "ctx._source.user.id = params.new_id",
    params: {
      new_id: "XgdX0NoX",
    },
  },
});
console.log(response);
POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
    }
  },
  "script": {
    "source": "ctx._source.user.id = params.new_id",
    "params": {
      "new_id": "XgdX0NoX"
    }
  }
}

通过查询删除数据流中的文档

编辑

使用按查询删除 API删除与提供的查询匹配的数据流中的文档

resp = client.delete_by_query(
    index="my-data-stream",
    query={
        "match": {
            "user.id": "vlb44hny"
        }
    },
)
print(resp)
response = client.delete_by_query(
  index: 'my-data-stream',
  body: {
    query: {
      match: {
        'user.id' => 'vlb44hny'
      }
    }
  }
)
puts response
const response = await client.deleteByQuery({
  index: "my-data-stream",
  query: {
    match: {
      "user.id": "vlb44hny",
    },
  },
});
console.log(response);
POST /my-data-stream/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

更新或删除支持索引中的文档

编辑

如果需要,您可以通过向包含文档的支持索引发送请求来更新或删除数据流中的文档。您需要

要获取此信息,请使用搜索请求

resp = client.search(
    index="my-data-stream",
    seq_no_primary_term=True,
    query={
        "match": {
            "user.id": "yWIumJd7"
        }
    },
)
print(resp)
response = client.search(
  index: 'my-data-stream',
  body: {
    seq_no_primary_term: true,
    query: {
      match: {
        'user.id' => 'yWIumJd7'
      }
    }
  }
)
puts response
const response = await client.search({
  index: "my-data-stream",
  seq_no_primary_term: true,
  query: {
    match: {
      "user.id": "yWIumJd7",
    },
  },
});
console.log(response);
GET /my-data-stream/_search
{
  "seq_no_primary_term": true,
  "query": {
    "match": {
      "user.id": "yWIumJd7"
    }
  }
}

响应

{
  "took": 20,
  "timed_out": false,
  "_shards": {
    "total": 3,
    "successful": 3,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 0.2876821,
    "hits": [
      {
        "_index": ".ds-my-data-stream-2099.03.08-000003",      
        "_id": "bfspvnIBr7VVZlfp2lqX",              
        "_seq_no": 0,                               
        "_primary_term": 1,                         
        "_score": 0.2876821,
        "_source": {
          "@timestamp": "2099-03-08T11:06:07.000Z",
          "user": {
            "id": "yWIumJd7"
          },
          "message": "Login successful"
        }
      }
    ]
  }
}

包含匹配文档的支持索引

文档的文档 ID

文档的当前序列号

文档的主项

要更新文档,请使用带有有效if_seq_noif_primary_term参数的索引 API请求

resp = client.index(
    index=".ds-my-data-stream-2099-03-08-000003",
    id="bfspvnIBr7VVZlfp2lqX",
    if_seq_no="0",
    if_primary_term="1",
    document={
        "@timestamp": "2099-03-08T11:06:07.000Z",
        "user": {
            "id": "8a4f500d"
        },
        "message": "Login successful"
    },
)
print(resp)
const response = await client.index({
  index: ".ds-my-data-stream-2099-03-08-000003",
  id: "bfspvnIBr7VVZlfp2lqX",
  if_seq_no: 0,
  if_primary_term: 1,
  document: {
    "@timestamp": "2099-03-08T11:06:07.000Z",
    user: {
      id: "8a4f500d",
    },
    message: "Login successful",
  },
});
console.log(response);
PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
  "@timestamp": "2099-03-08T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

要删除文档,请使用删除 API

resp = client.delete(
    index=".ds-my-data-stream-2099.03.08-000003",
    id="bfspvnIBr7VVZlfp2lqX",
)
print(resp)
response = client.delete(
  index: '.ds-my-data-stream-2099.03.08-000003',
  id: 'bfspvnIBr7VVZlfp2lqX'
)
puts response
const response = await client.delete({
  index: ".ds-my-data-stream-2099.03.08-000003",
  id: "bfspvnIBr7VVZlfp2lqX",
});
console.log(response);
DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX

要使用单个请求删除或更新多个文档,请使用批量 APIdeleteindexupdate操作。对于index操作,请包含有效的if_seq_noif_primary_term参数。

resp = client.bulk(
    refresh=True,
    operations=[
        {
            "index": {
                "_index": ".ds-my-data-stream-2099.03.08-000003",
                "_id": "bfspvnIBr7VVZlfp2lqX",
                "if_seq_no": 0,
                "if_primary_term": 1
            }
        },
        {
            "@timestamp": "2099-03-08T11:06:07.000Z",
            "user": {
                "id": "8a4f500d"
            },
            "message": "Login successful"
        }
    ],
)
print(resp)
response = client.bulk(
  refresh: true,
  body: [
    {
      index: {
        _index: '.ds-my-data-stream-2099.03.08-000003',
        _id: 'bfspvnIBr7VVZlfp2lqX',
        if_seq_no: 0,
        if_primary_term: 1
      }
    },
    {
      "@timestamp": '2099-03-08T11:06:07.000Z',
      user: {
        id: '8a4f500d'
      },
      message: 'Login successful'
    }
  ]
)
puts response
const response = await client.bulk({
  refresh: "true",
  operations: [
    {
      index: {
        _index: ".ds-my-data-stream-2099.03.08-000003",
        _id: "bfspvnIBr7VVZlfp2lqX",
        if_seq_no: 0,
        if_primary_term: 1,
      },
    },
    {
      "@timestamp": "2099-03-08T11:06:07.000Z",
      user: {
        id: "8a4f500d",
      },
      message: "Login successful",
    },
  ],
});
console.log(response);
PUT /_bulk?refresh
{ "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }