Update By Query API

编辑

更新匹配指定查询的文档。如果未指定查询,则对数据流或索引中的每个文档执行更新,而不修改源,这对于获取映射更改很有用。

resp = client.update_by_query(
    index="my-index-000001",
    conflicts="proceed",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  conflicts: 'proceed'
)
puts response
const response = await client.updateByQuery({
  index: "my-index-000001",
  conflicts: "proceed",
});
console.log(response);
POST my-index-000001/_update_by_query?conflicts=proceed

请求

编辑

POST /<目标>/_update_by_query

先决条件

编辑
  • 如果启用了 Elasticsearch 安全功能,则您必须对目标数据流、索引或别名具有以下索引权限

    • 读取
    • indexwrite

描述

编辑

您可以使用与搜索 API相同的语法,在请求 URI 或请求正文中指定查询条件。

当您提交按查询更新请求时,Elasticsearch 会在开始处理请求时获取数据流或索引的快照,并使用 internal 版本控制更新匹配的文档。当版本匹配时,文档将被更新,并且版本号会递增。如果在快照拍摄时间和更新操作处理之间文档发生更改,则会导致版本冲突并且操作失败。您可以选择计数版本冲突而不是停止并返回,方法是将 conflicts 设置为 proceed。请注意,如果您选择计数版本冲突,则该操作可能会尝试从源更新比 max_docs 更多的文档,直到它成功更新了 max_docs 个文档,或者它已经遍历了源查询中的每个文档。

版本等于 0 的文档不能使用按查询更新进行更新,因为 internal 版本控制不支持 0 作为有效的版本号。

在处理按查询更新请求时,Elasticsearch 会依次执行多个搜索请求以查找所有匹配的文档。对每批匹配的文档执行批量更新请求。任何查询或更新失败都会导致按查询更新请求失败,并且失败会显示在响应中。任何成功完成的更新请求仍然会保留,它们不会被回滚。

刷新分片

编辑

指定 refresh 参数会在请求完成后刷新所有分片。这与更新 API 的 refresh 参数不同,后者仅导致接收请求的分片被刷新。与更新 API 不同,它不支持 wait_for

异步运行按查询更新

编辑

如果请求包含 wait_for_completion=false,Elasticsearch 会执行一些预检,启动请求,并返回一个 task,您可以使用该任务来取消或获取任务的状态。Elasticsearch 会在 .tasks/task/${taskId} 处以文档形式创建此任务的记录。

等待活动分片

编辑

wait_for_active_shards 控制在继续执行请求之前必须有多少分片副本处于活动状态。有关详细信息,请参阅活动分片timeout 控制每个写入请求等待不可用分片变为可用的时间。它们的工作方式与在批量 API中完全相同。按查询更新使用滚动搜索,因此您也可以指定 scroll 参数来控制它保持搜索上下文活动的时间,例如 ?scroll=10m。默认值为 5 分钟。

限制更新请求

编辑

要控制按查询更新发出批量更新操作的速率,您可以将 requests_per_second 设置为任何正十进制数。这会在每个批次中添加等待时间以限制速率。将 requests_per_second 设置为 -1 可禁用限制。

限制使用批次之间的等待时间,以便可以为内部滚动请求提供一个超时时间,该超时时间会将请求填充考虑在内。填充时间是批次大小除以 requests_per_second 与写入所花费时间之间的差值。默认情况下,批次大小为 1000,因此如果 requests_per_second 设置为 500

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds

由于批次是作为单个 _bulk 请求发出的,因此较大的批次大小会导致 Elasticsearch 创建许多请求并在启动下一组请求之前等待。这是“突发”而不是“平滑”。

切片

编辑

按查询更新支持切片滚动来并行化更新过程。这可以提高效率并提供一种将请求分解为较小部分的便捷方法。

slices 设置为 auto 会为大多数数据流和索引选择一个合理的数字。如果您要手动切片或以其他方式调整自动切片,请记住

  • slices 的数量等于索引或后备索引中的分片数量时,查询性能最高效。如果该数字很大(例如,500),请选择一个较低的数字,因为太多的 slices 会损害性能。将 slices 设置为高于分片数量通常不会提高效率,反而会增加开销。
  • 更新性能随着切片数量的增加,在可用资源上线性扩展。

查询或更新性能是否在运行时占主导地位取决于正在重新索引的文档和集群资源。

路径参数

编辑
<目标>
(可选,字符串)要搜索的数据流、索引和别名的逗号分隔列表。支持通配符(*)。要搜索所有数据流或索引,请省略此参数或使用 *_all

查询参数

编辑
allow_no_indices

(可选,布尔值)如果为 false,则当任何通配符表达式、索引别名_all 值仅以缺失或关闭的索引为目标时,请求会返回错误。即使请求以其他打开的索引为目标,此行为也适用。例如,如果一个索引以 foo 开头,但没有索引以 bar 开头,则以 foo*,bar* 为目标的请求会返回错误。

默认为 true

analyzer

(可选,字符串)用于查询字符串的分析器。

此参数只能在指定 q 查询字符串参数时使用。

analyze_wildcard

(可选,布尔值)如果为 true,则分析通配符和前缀查询。默认为 false

此参数只能在指定 q 查询字符串参数时使用。

conflicts
(可选,字符串)如果按查询更新遇到版本冲突,则执行的操作:abortproceed。默认为 abort
default_operator

(可选,字符串)查询字符串查询的默认运算符:AND 或 OR。默认为 OR

此参数只能在指定 q 查询字符串参数时使用。

df

(可选,字符串)用作默认字段的字段,其中查询字符串中没有给出字段前缀。

此参数只能在指定 q 查询字符串参数时使用。

expand_wildcards

(可选,字符串)通配符模式可以匹配的索引类型。如果请求可以以数据流为目标,则此参数确定通配符表达式是否匹配隐藏的数据流。支持逗号分隔的值,例如 open,hidden。有效值为

all
匹配任何数据流或索引,包括隐藏的数据流或索引。
open
匹配打开的非隐藏索引。也匹配任何非隐藏的数据流。
closed
匹配关闭的非隐藏索引。也匹配任何非隐藏的数据流。数据流无法关闭。
hidden
匹配隐藏的数据流和隐藏的索引。必须与 openclosed 或两者结合使用。
none
不接受通配符模式。

默认为 open

ignore_unavailable
(可选,布尔值)如果为 false,则如果请求以缺失或关闭的索引为目标,则会返回错误。默认为 false
lenient

(可选,布尔值)如果为 true,则会忽略查询字符串中基于格式的查询失败(例如,向数字字段提供文本)。默认为 false

此参数只能在指定 q 查询字符串参数时使用。

max_docs
(可选,整数)要处理的最大文档数。默认为所有文档。当设置为小于或等于 scroll_size 的值时,将不会使用滚动来检索操作的结果。
pipeline
(可选,字符串)用于预处理传入文档的管道的 ID。如果索引指定了默认的摄取管道,则将值设置为 _none 将禁用此请求的默认摄取管道。如果配置了最终管道,则无论此参数的值如何,它都将始终运行。
preference
(可选,字符串)指定应在其上执行操作的节点或分片。默认为随机。
q
(可选,字符串)Lucene 查询字符串语法中的查询。
request_cache
(可选,布尔值)如果为 true,则将对此请求使用请求缓存。默认为索引级别的设置。
refresh
(可选,布尔值)如果为 true,则 Elasticsearch 会刷新受影响的分片以使操作对搜索可见。默认为 false
requests_per_second
(可选,整数)此请求的限制,以每秒子请求数表示。默认为 -1(无限制)。
routing
(可选,字符串)用于将操作路由到特定分片的自定义值。
scroll
(可选,时间值)保留搜索上下文以进行滚动的时间段。请参阅滚动搜索结果
scroll_size
(可选,整数)支持操作的滚动请求的大小。默认为 1000。
search_type

(可选,字符串)搜索操作的类型。可用选项

  • query_then_fetch
  • dfs_query_then_fetch
search_timeout
(可选,时间单位)每个搜索请求的显式超时。默认为无超时。
slices
(可选,整数)此任务应分为的切片数。默认为 1,表示任务未切片为子任务。
sort
(可选,字符串)<字段>:<方向> 对的逗号分隔列表。
stats
(可选,字符串)用于记录和统计目的的请求的特定 tag
terminate_after

(可选,整数)每个分片要收集的最大文档数。如果查询达到此限制,Elasticsearch 会提前终止查询。Elasticsearch 会在排序之前收集文档。

请谨慎使用。Elasticsearch 会将此参数应用于处理请求的每个分片。如果可能,请让 Elasticsearch 自动执行提前终止。避免为以跨多个数据层支持索引的数据流为目标的请求指定此参数。

timeout

(可选,时间单位)每个更新请求等待以下操作的周期

默认为 1m(一分钟)。这保证了 Elasticsearch 在失败之前至少等待超时时间。实际等待时间可能会更长,尤其是在发生多次等待时。

version
(可选,布尔值)如果为 true,则返回文档版本作为命中的一部分。
wait_for_active_shards

(可选,字符串)在继续操作之前必须处于活动状态的每个分片的副本数。设置为 all 或任何非负整数,直到索引中每个分片的副本总数(number_of_replicas+1)。默认为 1,表示仅等待每个主分片处于活动状态。

请参阅 活动分片

请求正文

编辑
query
(可选,查询对象)使用 查询 DSL 指定要更新的文档。

响应正文

编辑
took
整个操作从开始到结束的毫秒数。
timed_out
如果在按查询执行更新期间执行的任何请求超时,则此标志设置为 true
total
已成功处理的文档数。
updated
已成功更新的文档数。
deleted
已成功删除的文档数。
batches
按查询更新拉回的滚动响应数。
version_conflicts
按查询更新命中的版本冲突数。
noops
由于用于按查询更新的脚本为 ctx.op 返回了 noop 值而被忽略的文档数。
retries
按查询更新尝试的重试次数。bulk 是重试的批量操作数,而 search 是重试的搜索操作数。
throttled_millis
请求为了符合 requests_per_second 而休眠的毫秒数。
requests_per_second
按查询更新期间实际执行的每秒请求数。
throttled_until_millis
_update_by_query 响应中,此字段应始终等于零。它仅在使用 任务 API 时才有意义,在任务 API 中,它表示为了符合 requests_per_second,再次执行限制请求的下一个时间(自 epoch 以来的毫秒数)。
failures
如果在过程中存在任何不可恢复的错误,则会显示错误数组。如果此数组非空,则表示由于这些错误,请求中止。按查询更新使用批处理实现。任何失败都会导致整个过程中止,但当前批次中的所有失败都将收集到数组中。您可以使用 conflicts 选项来防止重新索引因版本冲突而中止。

示例

编辑

_update_by_query 的最简单用法只是在数据流或索引中的每个文档上执行更新,而不更改源。这对于获取新属性或某些其他在线映射更改非常有用。

要更新所选文档,请在请求正文中指定查询

resp = client.update_by_query(
    index="my-index-000001",
    conflicts="proceed",
    query={
        "term": {
            "user.id": "kimchy"
        }
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  conflicts: 'proceed',
  body: {
    query: {
      term: {
        'user.id' => 'kimchy'
      }
    }
  }
)
puts response
const response = await client.updateByQuery({
  index: "my-index-000001",
  conflicts: "proceed",
  query: {
    term: {
      "user.id": "kimchy",
    },
  },
});
console.log(response);
POST my-index-000001/_update_by_query?conflicts=proceed
{
  "query": { 
    "term": {
      "user.id": "kimchy"
    }
  }
}

必须将查询作为值传递给 query 键,方式与 搜索 API 相同。您也可以像搜索 API 一样使用 q 参数。

更新多个数据流或索引中的文档

resp = client.update_by_query(
    index="my-index-000001,my-index-000002",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001,my-index-000002'
)
puts response
const response = await client.updateByQuery({
  index: "my-index-000001,my-index-000002",
});
console.log(response);
POST my-index-000001,my-index-000002/_update_by_query

将按查询更新操作限制为具有特定路由值的分片

resp = client.update_by_query(
    index="my-index-000001",
    routing="1",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  routing: 1
)
puts response
const response = await client.updateByQuery({
  index: "my-index-000001",
  routing: 1,
});
console.log(response);
POST my-index-000001/_update_by_query?routing=1

默认情况下,按查询更新使用 1000 个滚动批次。您可以使用 scroll_size 参数更改批次大小

resp = client.update_by_query(
    index="my-index-000001",
    scroll_size="100",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  scroll_size: 100
)
puts response
const response = await client.updateByQuery({
  index: "my-index-000001",
  scroll_size: 100,
});
console.log(response);
POST my-index-000001/_update_by_query?scroll_size=100

使用唯一属性更新文档

resp = client.update_by_query(
    index="my-index-000001",
    query={
        "term": {
            "user.id": "kimchy"
        }
    },
    max_docs=1,
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    query: {
      term: {
        'user.id' => 'kimchy'
      }
    },
    max_docs: 1
  }
)
puts response
const response = await client.updateByQuery({
  index: "my-index-000001",
  query: {
    term: {
      "user.id": "kimchy",
    },
  },
  max_docs: 1,
});
console.log(response);
POST my-index-000001/_update_by_query
{
  "query": {
    "term": {
      "user.id": "kimchy"
    }
  },
  "max_docs": 1
}

更新文档源

编辑

按查询更新支持使用脚本来更新文档源。例如,以下请求会递增 my-index-000001user.idkimchy 的所有文档的 count 字段

resp = client.update_by_query(
    index="my-index-000001",
    script={
        "source": "ctx._source.count++",
        "lang": "painless"
    },
    query={
        "term": {
            "user.id": "kimchy"
        }
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    script: {
      source: 'ctx._source.count++',
      lang: 'painless'
    },
    query: {
      term: {
        'user.id' => 'kimchy'
      }
    }
  }
)
puts response
const response = await client.updateByQuery({
  index: "my-index-000001",
  script: {
    source: "ctx._source.count++",
    lang: "painless",
  },
  query: {
    term: {
      "user.id": "kimchy",
    },
  },
});
console.log(response);
POST my-index-000001/_update_by_query
{
  "script": {
    "source": "ctx._source.count++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user.id": "kimchy"
    }
  }
}

请注意,此示例中未指定 conflicts=proceed。在这种情况下,版本冲突应停止该过程,以便您可以处理失败。

更新 API 一样,您可以设置 ctx.op 来更改执行的操作

noop

如果您的脚本确定它无需进行任何更改,请设置 ctx.op = "noop"。按查询更新操作将跳过更新文档并递增 noop 计数器。

delete

如果您的脚本确定应删除文档,请设置 ctx.op = "delete"。按查询更新操作将删除文档并递增 deleted 计数器。

按查询更新仅支持 indexnoopdelete。将 ctx.op 设置为其他任何内容都是错误的。设置 ctx 中的任何其他字段都是错误的。此 API 仅允许您修改匹配文档的源,而不能移动它们。

使用引入管道更新文档

编辑

按查询更新可以通过指定 pipeline 来使用 引入管道功能

resp = client.ingest.put_pipeline(
    id="set-foo",
    description="sets foo",
    processors=[
        {
            "set": {
                "field": "foo",
                "value": "bar"
            }
        }
    ],
)
print(resp)

resp1 = client.update_by_query(
    index="my-index-000001",
    pipeline="set-foo",
)
print(resp1)
response = client.ingest.put_pipeline(
  id: 'set-foo',
  body: {
    description: 'sets foo',
    processors: [
      {
        set: {
          field: 'foo',
          value: 'bar'
        }
      }
    ]
  }
)
puts response

response = client.update_by_query(
  index: 'my-index-000001',
  pipeline: 'set-foo'
)
puts response
const response = await client.ingest.putPipeline({
  id: "set-foo",
  description: "sets foo",
  processors: [
    {
      set: {
        field: "foo",
        value: "bar",
      },
    },
  ],
});
console.log(response);

const response1 = await client.updateByQuery({
  index: "my-index-000001",
  pipeline: "set-foo",
});
console.log(response1);
PUT _ingest/pipeline/set-foo
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}
POST my-index-000001/_update_by_query?pipeline=set-foo
获取按查询更新操作的状态
编辑

您可以使用 任务 API 获取所有正在运行的按查询更新请求的状态

$response = $client->tasks()->list();
resp = client.tasks.list(
    detailed=True,
    actions="*byquery",
)
print(resp)
response = client.tasks.list(
  detailed: true,
  actions: '*byquery'
)
puts response
res, err := es.Tasks.List(
	es.Tasks.List.WithActions("*byquery"),
	es.Tasks.List.WithDetailed(true),
)
fmt.Println(res, err)
const response = await client.tasks.list({
  detailed: "true",
  actions: "*byquery",
});
console.log(response);
GET _tasks?detailed=true&actions=*byquery

响应如下所示

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/update/byquery",
          "status" : {    
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            },
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

此对象包含实际状态。它类似于响应 JSON,但重要增加了 total 字段。total 是重新索引预期执行的操作总数。您可以通过添加 updatedcreateddeleted 字段来估计进度。当它们的总和等于 total 字段时,请求将完成。

使用任务 ID,您可以直接查找任务。以下示例检索有关任务 r1A2WoRbTwKZ516z6NEs5A:36619 的信息

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->tasks()->get($params);
resp = client.tasks.get(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
)
print(resp)
response = client.tasks.get(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
)
puts response
res, err := es.Tasks.Get(
	"r1A2WoRbTwKZ516z6NEs5A:36619",
)
fmt.Println(res, err)
const response = await client.tasks.get({
  task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
});
console.log(response);
GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619

此 API 的优势在于,它与 wait_for_completion=false 集成,以透明地返回已完成任务的状态。如果任务已完成,并且在其上设置了 wait_for_completion=false,则它将返回 resultserror 字段。此功能的代价是 wait_for_completion=false.tasks/task/${taskId} 创建的文档。您需要删除该文档。

取消按查询更新操作
编辑

可以使用 任务取消 API 取消任何按查询更新

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->tasks()->cancel($params);
resp = client.tasks.cancel(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
)
print(resp)
response = client.tasks.cancel(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
)
puts response
res, err := es.Tasks.Cancel(
	es.Tasks.Cancel.WithTaskID("r1A2WoRbTwKZ516z6NEs5A:36619"),
)
fmt.Println(res, err)
const response = await client.tasks.cancel({
  task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
});
console.log(response);
POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel

可以使用 任务 API 找到任务 ID。

取消操作应该会很快完成,但可能需要几秒钟。上面的任务状态 API 将继续列出按查询更新任务,直到此任务检查到它已被取消并自行终止。

更改请求的限制
编辑

可以使用 _rethrottle API 更改正在运行的按查询更新的 requests_per_second

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->updateByQueryRethrottle($params);
resp = client.update_by_query_rethrottle(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
    requests_per_second="-1",
)
print(resp)
response = client.update_by_query_rethrottle(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619',
  requests_per_second: -1
)
puts response
res, err := es.UpdateByQueryRethrottle(
	"r1A2WoRbTwKZ516z6NEs5A:36619",
	esapi.IntPtr(-1),
)
fmt.Println(res, err)
const response = await client.updateByQueryRethrottle({
  task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
  requests_per_second: "-1",
});
console.log(response);
POST _update_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1

可以使用 任务 API 找到任务 ID。

就像在 _update_by_query API 上设置它一样,requests_per_second 可以是 -1 以禁用限制,也可以是任何十进制数,如 1.712 以将限制设置为该级别。加速查询的重新限制会立即生效,但减慢查询的重新限制会在完成当前批次后生效。这可以防止滚动超时。

手动切片
编辑

通过为每个请求提供切片 ID 和切片总数,手动对按查询更新进行切片

resp = client.update_by_query(
    index="my-index-000001",
    slice={
        "id": 0,
        "max": 2
    },
    script={
        "source": "ctx._source['extra'] = 'test'"
    },
)
print(resp)

resp1 = client.update_by_query(
    index="my-index-000001",
    slice={
        "id": 1,
        "max": 2
    },
    script={
        "source": "ctx._source['extra'] = 'test'"
    },
)
print(resp1)
response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    slice: {
      id: 0,
      max: 2
    },
    script: {
      source: "ctx._source['extra'] = 'test'"
    }
  }
)
puts response

response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    slice: {
      id: 1,
      max: 2
    },
    script: {
      source: "ctx._source['extra'] = 'test'"
    }
  }
)
puts response
const response = await client.updateByQuery({
  index: "my-index-000001",
  slice: {
    id: 0,
    max: 2,
  },
  script: {
    source: "ctx._source['extra'] = 'test'",
  },
});
console.log(response);

const response1 = await client.updateByQuery({
  index: "my-index-000001",
  slice: {
    id: 1,
    max: 2,
  },
  script: {
    source: "ctx._source['extra'] = 'test'",
  },
});
console.log(response1);
POST my-index-000001/_update_by_query
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}
POST my-index-000001/_update_by_query
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

您可以通过以下方式验证是否正常工作

resp = client.indices.refresh()
print(resp)

resp1 = client.search(
    index="my-index-000001",
    size="0",
    q="extra:test",
    filter_path="hits.total",
)
print(resp1)
response = client.indices.refresh
puts response

response = client.search(
  index: 'my-index-000001',
  size: 0,
  q: 'extra:test',
  filter_path: 'hits.total'
)
puts response
const response = await client.indices.refresh();
console.log(response);

const response1 = await client.search({
  index: "my-index-000001",
  size: 0,
  q: "extra:test",
  filter_path: "hits.total",
});
console.log(response1);
GET _refresh
POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total

这将产生一个合理的 total,如下所示

{
  "hits": {
    "total": {
        "value": 120,
        "relation": "eq"
    }
  }
}
使用自动切片
编辑

您还可以让按查询更新使用 切片滚动_id 上进行切片,从而自动并行化。使用 slices 指定要使用的切片数

resp = client.update_by_query(
    index="my-index-000001",
    refresh=True,
    slices="5",
    script={
        "source": "ctx._source['extra'] = 'test'"
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  refresh: true,
  slices: 5,
  body: {
    script: {
      source: "ctx._source['extra'] = 'test'"
    }
  }
)
puts response
const response = await client.updateByQuery({
  index: "my-index-000001",
  refresh: "true",
  slices: 5,
  script: {
    source: "ctx._source['extra'] = 'test'",
  },
});
console.log(response);
POST my-index-000001/_update_by_query?refresh&slices=5
{
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

您也可以通过以下方式验证是否正常工作

resp = client.search(
    index="my-index-000001",
    size="0",
    q="extra:test",
    filter_path="hits.total",
)
print(resp)
response = client.search(
  index: 'my-index-000001',
  size: 0,
  q: 'extra:test',
  filter_path: 'hits.total'
)
puts response
const response = await client.search({
  index: "my-index-000001",
  size: 0,
  q: "extra:test",
  filter_path: "hits.total",
});
console.log(response);
POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total

这将产生一个合理的 total,如下所示

{
  "hits": {
    "total": {
        "value": 120,
        "relation": "eq"
    }
  }
}

slices 设置为 auto 将使 Elasticsearch 选择要使用的切片数。此设置将每个分片使用一个切片,直到达到某个限制。如果有多个源数据流或索引,它将根据分片数最少的索引或支持索引选择切片数。

slices 添加到 _update_by_query 只是自动化了上面部分中使用的手动过程,创建子请求,这意味着它有一些怪癖

  • 您可以在 任务 API 中看到这些请求。这些子请求是具有 slices 的请求任务的“子”任务。
  • 获取具有 slices 的请求的任务状态仅包含已完成切片的状态。
  • 这些子请求可以通过以下方式单独寻址,例如取消和重新限制。
  • 重新限制具有 slices 的请求将按比例重新限制未完成的子请求。
  • 取消具有 slices 的请求将取消每个子请求。
  • 由于 slices 的性质,每个子请求都不会获得完全均匀的文档部分。所有文档都将被寻址,但某些切片可能比其他切片大。预期较大的切片具有更均匀的分布。
  • 具有 slices 的请求上的 requests_per_secondmax_docs 等参数会按比例分配给每个子请求。将其与上面关于分布不均匀的点相结合,您应该得出结论,将 max_docsslices 一起使用可能不会导致更新正好 max_docs 个文档。
  • 每个子请求都会获得源数据流或索引的稍微不同的快照,尽管这些快照都是在近似相同的时间拍摄的。
获取新属性
编辑

假设您创建了一个没有动态映射的索引,用数据填充了它,然后添加了一个映射值以从数据中获取更多字段

$params = [
    'index' => 'test',
    'body' => [
        'mappings' => [
            'dynamic' => false,
            'properties' => [
                'text' => [
                    'type' => 'text',
                ],
            ],
        ],
    ],
];
$response = $client->indices()->create($params);
$params = [
    'index' => 'test',
    'body' => [
        'text' => 'words words',
        'flag' => 'bar',
    ],
];
$response = $client->index($params);
$params = [
    'index' => 'test',
    'body' => [
        'text' => 'words words',
        'flag' => 'foo',
    ],
];
$response = $client->index($params);
$params = [
    'index' => 'test',
    'body' => [
        'properties' => [
            'text' => [
                'type' => 'text',
            ],
            'flag' => [
                'type' => 'text',
                'analyzer' => 'keyword',
            ],
        ],
    ],
];
$response = $client->indices()->putMapping($params);
resp = client.indices.create(
    index="test",
    mappings={
        "dynamic": False,
        "properties": {
            "text": {
                "type": "text"
            }
        }
    },
)
print(resp)

resp1 = client.index(
    index="test",
    refresh=True,
    document={
        "text": "words words",
        "flag": "bar"
    },
)
print(resp1)

resp2 = client.index(
    index="test",
    refresh=True,
    document={
        "text": "words words",
        "flag": "foo"
    },
)
print(resp2)

resp3 = client.indices.put_mapping(
    index="test",
    properties={
        "text": {
            "type": "text"
        },
        "flag": {
            "type": "text",
            "analyzer": "keyword"
        }
    },
)
print(resp3)
response = client.indices.create(
  index: 'test',
  body: {
    mappings: {
      dynamic: false,
      properties: {
        text: {
          type: 'text'
        }
      }
    }
  }
)
puts response

response = client.index(
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'bar'
  }
)
puts response

response = client.index(
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'foo'
  }
)
puts response

response = client.indices.put_mapping(
  index: 'test',
  body: {
    properties: {
      text: {
        type: 'text'
      },
      flag: {
        type: 'text',
        analyzer: 'keyword'
      }
    }
  }
)
puts response
{
	res, err := es.Indices.Create(
		"test",
		es.Indices.Create.WithBody(strings.NewReader(`{
	  "mappings": {
	    "dynamic": false,
	    "properties": {
	      "text": {
	        "type": "text"
	      }
	    }
	  }
	}`)),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Index(
		"test",
		strings.NewReader(`{
	  "text": "words words",
	  "flag": "bar"
	}`),
		es.Index.WithRefresh("true"),
		es.Index.WithPretty(),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Index(
		"test",
		strings.NewReader(`{
	  "text": "words words",
	  "flag": "foo"
	}`),
		es.Index.WithRefresh("true"),
		es.Index.WithPretty(),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Indices.PutMapping(
		[]string{"test"},
		strings.NewReader(`{
	  "properties": {
	    "text": {
	      "type": "text"
	    },
	    "flag": {
	      "type": "text",
	      "analyzer": "keyword"
	    }
	  }
	}`),
	)
	fmt.Println(res, err)
}
const response = await client.indices.create({
  index: "test",
  mappings: {
    dynamic: false,
    properties: {
      text: {
        type: "text",
      },
    },
  },
});
console.log(response);

const response1 = await client.index({
  index: "test",
  refresh: "true",
  document: {
    text: "words words",
    flag: "bar",
  },
});
console.log(response1);

const response2 = await client.index({
  index: "test",
  refresh: "true",
  document: {
    text: "words words",
    flag: "foo",
  },
});
console.log(response2);

const response3 = await client.indices.putMapping({
  index: "test",
  properties: {
    text: {
      type: "text",
    },
    flag: {
      type: "text",
      analyzer: "keyword",
    },
  },
});
console.log(response3);
PUT test
{
  "mappings": {
    "dynamic": false,   
    "properties": {
      "text": {"type": "text"}
    }
  }
}

POST test/_doc?refresh
{
  "text": "words words",
  "flag": "bar"
}
POST test/_doc?refresh
{
  "text": "words words",
  "flag": "foo"
}
PUT test/_mapping   
{
  "properties": {
    "text": {"type": "text"},
    "flag": {"type": "text", "analyzer": "keyword"}
  }
}

这意味着新字段不会被索引,只会存储在 _source 中。

这将更新映射以添加新的 flag 字段。要获取新字段,您必须重新索引所有包含该字段的文档。

搜索数据不会找到任何内容

$params = [
    'index' => 'test',
    'body' => [
        'query' => [
            'match' => [
                'flag' => 'foo',
            ],
        ],
    ],
];
$response = $client->search($params);
resp = client.search(
    index="test",
    filter_path="hits.total",
    query={
        "match": {
            "flag": "foo"
        }
    },
)
print(resp)
response = client.search(
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
)
puts response
res, err := es.Search(
	es.Search.WithIndex("test"),
	es.Search.WithBody(strings.NewReader(`{
	  "query": {
	    "match": {
	      "flag": "foo"
	    }
	  }
	}`)),
	es.Search.WithFilterPath("hits.total"),
	es.Search.WithPretty(),
)
fmt.Println(res, err)
const response = await client.search({
  index: "test",
  filter_path: "hits.total",
  query: {
    match: {
      flag: "foo",
    },
  },
});
console.log(response);
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total": {
        "value": 0,
        "relation": "eq"
    }
  }
}

但是您可以发出一个 _update_by_query 请求来获取新的映射

$params = [
    'index' => 'test',
];
$response = $client->updateByQuery($params);
$params = [
    'index' => 'test',
    'body' => [
        'query' => [
            'match' => [
                'flag' => 'foo',
            ],
        ],
    ],
];
$response = $client->search($params);
resp = client.update_by_query(
    index="test",
    refresh=True,
    conflicts="proceed",
)
print(resp)

resp1 = client.search(
    index="test",
    filter_path="hits.total",
    query={
        "match": {
            "flag": "foo"
        }
    },
)
print(resp1)
response = client.update_by_query(
  index: 'test',
  refresh: true,
  conflicts: 'proceed'
)
puts response

response = client.search(
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
)
puts response
{
	res, err := es.UpdateByQuery(
		[]string{"test"},
		es.UpdateByQuery.WithConflicts("proceed"),
		es.UpdateByQuery.WithRefresh(true),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Search(
		es.Search.WithIndex("test"),
		es.Search.WithBody(strings.NewReader(`{
	  "query": {
	    "match": {
	      "flag": "foo"
	    }
	  }
	}`)),
		es.Search.WithFilterPath("hits.total"),
		es.Search.WithPretty(),
	)
	fmt.Println(res, err)
}
const response = await client.updateByQuery({
  index: "test",
  refresh: "true",
  conflicts: "proceed",
});
console.log(response);

const response1 = await client.search({
  index: "test",
  filter_path: "hits.total",
  query: {
    match: {
      flag: "foo",
    },
  },
});
console.log(response1);
POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total": {
        "value": 1,
        "relation": "eq"
    }
  }
}

当向多字段添加字段时,您可以执行完全相同的操作。