通过查询 API 更新编辑

更新与指定查询匹配的文档。如果未指定查询,则对数据流或索引中的每个文档执行更新,而不会修改源,这对于获取映射更改很有用。

resp = client.update_by_query(
    index="my-index-000001",
    conflicts="proceed",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  conflicts: 'proceed'
)
puts response
POST my-index-000001/_update_by_query?conflicts=proceed

请求编辑

POST /<target>/_update_by_query

先决条件编辑

  • 如果启用了 Elasticsearch 安全功能,则必须对目标数据流、索引或别名具有以下 索引权限

    • 读取
    • 索引写入

描述编辑

您可以在请求 URI 或请求正文中指定查询条件,使用与 搜索 API 相同的语法。

当您提交通过查询更新请求时,Elasticsearch 会在开始处理请求时获取数据流或索引的快照,并使用 内部 版本控制更新匹配的文档。当版本匹配时,文档将被更新,版本号将递增。如果文档在快照拍摄时间和更新操作处理时间之间发生更改,则会导致版本冲突,操作失败。您可以选择计算版本冲突而不是停止并返回,方法是将 conflicts 设置为 proceed。请注意,如果您选择计算版本冲突,则操作可能会尝试从源中更新比 max_docs 更多的文档,直到它成功更新了 max_docs 文档,或者它已经遍历了源查询中的所有文档。

版本等于 0 的文档无法使用通过查询更新进行更新,因为 内部 版本控制不支持 0 作为有效版本号。

在处理通过查询更新请求时,Elasticsearch 会顺序执行多个搜索请求以查找所有匹配的文档。对每批匹配的文档执行批量更新请求。任何查询或更新失败都会导致通过查询更新请求失败,并且错误将显示在响应中。任何成功完成的更新请求仍然有效,不会回滚。

刷新分片编辑

指定 refresh 参数会在请求完成后刷新所有分片。这与更新 API 的 refresh 参数不同,后者会导致仅接收请求的分片被刷新。与更新 API 不同,它不支持 wait_for

异步运行通过查询更新编辑

如果请求包含 wait_for_completion=false,Elasticsearch 会执行一些预检,启动请求,并返回一个 任务,您可以使用它来取消或获取任务的状态。Elasticsearch 会在 .tasks/task/${taskId} 处创建一个此任务的记录作为文档。

等待活动分片编辑

wait_for_active_shards 控制在继续处理请求之前必须有多少个分片副本处于活动状态。有关详细信息,请参阅 活动分片timeout 控制每个写入请求等待不可用分片变为可用的时间。两者都与它们在 批量 API 中的工作方式完全相同。通过查询更新使用滚动搜索,因此您也可以指定 scroll 参数来控制它保持搜索上下文处于活动状态的时间,例如 ?scroll=10m。默认值为 5 分钟。

限制更新请求编辑

要控制通过查询更新发出更新操作批次的速率,您可以将 requests_per_second 设置为任何正小数。这会在每个批次中添加等待时间以限制速率。将 requests_per_second 设置为 -1 以禁用限制。

限制使用批次之间的等待时间,以便内部滚动请求可以被赋予一个考虑请求填充时间的超时时间。填充时间是批次大小除以 requests_per_second 与写入时间之差。默认情况下,批次大小为 1000,因此如果 requests_per_second 设置为 500

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds

由于批次作为单个 _bulk 请求发出,因此较大的批次大小会导致 Elasticsearch 创建许多请求并在开始下一组之前等待。这是一种“突发式”而不是“平滑式”的方式。

切片编辑

通过查询更新支持 切片滚动 以并行化更新过程。这可以提高效率,并提供一种方便的方法将请求分解成更小的部分。

slices 设置为 auto 会为大多数数据流和索引选择一个合理的值。如果您正在手动切片或以其他方式调整自动切片,请记住

  • slices 的数量等于索引或后备索引中的分片数量时,查询性能最有效。如果该数字很大(例如,500),请选择一个较小的数字,因为过多的 slices 会影响性能。将 slices 设置为高于分片数量通常不会提高效率,还会增加开销。
  • 更新性能随着切片数量的增加而线性扩展到可用资源。

查询性能或更新性能在运行时占主导地位取决于正在重新索引的文档和集群资源。

路径参数编辑

<target>
(可选,字符串) 要搜索的数据流、索引和别名的逗号分隔列表。支持通配符 (*)。要搜索所有数据流或索引,请省略此参数或使用 *_all

查询参数编辑

allow_no_indices

(可选,布尔值) 如果为 false,则如果任何通配符表达式、索引别名_all 值仅针对缺失或关闭的索引,则请求将返回错误。即使请求针对其他打开的索引,此行为也适用。例如,如果索引以 foo 开头,但没有索引以 bar 开头,则针对 foo*,bar* 的请求将返回错误。

默认为 true

analyzer

(可选,字符串) 用于查询字符串的分析器。

此参数只能在指定 q 查询字符串参数时使用。

analyze_wildcard

(可选,布尔值) 如果为 true,则分析通配符和前缀查询。默认为 false

此参数只能在指定 q 查询字符串参数时使用。

conflicts
(可选,字符串) 如果通过查询更新遇到版本冲突,该怎么办:abortproceed。默认为 abort
default_operator

(可选,字符串) 查询字符串查询的默认运算符:AND 或 OR。默认为 OR

此参数只能在指定 q 查询字符串参数时使用。

df

(可选,字符串) 用作默认值的字段,在查询字符串中没有给出字段前缀。

此参数只能在指定 q 查询字符串参数时使用。

expand_wildcards

(可选,字符串) 通配符模式可以匹配的索引类型。如果请求可以针对数据流,则此参数决定通配符表达式是否匹配隐藏的数据流。支持逗号分隔的值,例如 open,hidden。有效值为

all
匹配任何数据流或索引,包括 隐藏 的数据流或索引。
open
匹配打开的、非隐藏的索引。还匹配任何非隐藏的数据流。
closed
匹配关闭的、非隐藏的索引。还匹配任何非隐藏的数据流。数据流无法关闭。
hidden
匹配隐藏的数据流和隐藏的索引。必须与 openclosed 或两者结合使用。
none
不接受通配符模式。

默认为 open

ignore_unavailable
(可选,布尔值) 如果为 false,则如果请求针对缺失或关闭的索引,则请求将返回错误。默认为 false
lenient

(可选,布尔值) 如果为 true,则会忽略查询字符串中基于格式的查询失败(例如,向数字字段提供文本)。默认为 false

此参数只能在指定 q 查询字符串参数时使用。

max_docs
(可选,整数) 要处理的文档的最大数量。默认值为所有文档。如果设置为小于或等于 scroll_size 的值,则不会使用滚动来检索操作的结果。
pipeline
(可选,字符串) 要用于预处理传入文档的管道的 ID。如果索引指定了默认的摄取管道,则将该值设置为 _none 将禁用此请求的默认摄取管道。如果配置了最终管道,它将始终运行,无论此参数的值如何。
preference
(可选,字符串) 指定应执行操作的节点或分片。默认情况下为随机。
q
(可选,字符串) Lucene 查询字符串语法中的查询。
request_cache
(可选,布尔值) 如果为 true,则此请求将使用请求缓存。默认值为索引级设置。
refresh
(可选,布尔值) 如果为 true,则 Elasticsearch 将刷新受影响的分片,以使操作对搜索可见。默认值为 false
requests_per_second
(可选,整数) 此请求的节流,以每秒子请求数表示。默认值为 -1(无节流)。
routing
(可选,字符串) 用于将操作路由到特定分片的自定义值。
scroll
(可选,时间值) 用于保留 搜索上下文 以进行滚动的周期。请参阅 滚动搜索结果
scroll_size
(可选,整数) 为操作提供支持的滚动请求的大小。默认值为 1000。
search_type

(可选,字符串) 搜索操作的类型。可用选项

  • query_then_fetch
  • dfs_query_then_fetch
search_timeout
(可选,时间单位) 每个搜索请求的显式超时。默认值为无超时。
slices
(可选,整数) 此任务应划分的切片数量。默认值为 1,这意味着任务不会被切分成子任务。
sort
(可选,字符串) <field>:<direction> 对的逗号分隔列表。
stats
(可选,字符串) 用于日志记录和统计目的的请求的特定 tag
terminate_after

(可选,整数) 为每个分片收集的文档的最大数量。如果查询达到此限制,Elasticsearch 将提前终止查询。Elasticsearch 在排序之前收集文档。

谨慎使用。Elasticsearch 将此参数应用于处理请求的每个分片。如果可能,让 Elasticsearch 自动执行提前终止。避免为针对跨多个数据层级具有后备索引的数据流的请求指定此参数。

timeout

(可选,时间单位) 每个更新请求等待以下操作的周期

默认值为 1m(一分钟)。这保证 Elasticsearch 在失败之前至少等待超时时间。实际等待时间可能更长,尤其是在发生多次等待时。

version
(可选,布尔值) 如果为 true,则将文档版本作为命中的一部分返回。
wait_for_active_shards

(可选,字符串) 在继续执行操作之前必须处于活动状态的分片副本数量。设置为 all 或任何正整数,直到索引中的分片总数 (number_of_replicas+1)。默认值:1,主分片。

请参阅 活动分片

请求正文edit

query
(可选,查询对象) 使用 查询 DSL 指定要更新的文档。

响应正文edit

took
整个操作从开始到结束的毫秒数。
timed_out
如果在更新查询执行期间执行的任何请求超时,则此标志将设置为 true
total
成功处理的文档数量。
updated
成功更新的文档数量。
deleted
成功删除的文档数量。
batches
更新查询拉取的滚动响应数量。
version_conflicts
更新查询遇到的版本冲突数量。
noops
由于用于更新查询的脚本为 ctx.op 返回了 noop 值而被忽略的文档数量。
retries
更新查询尝试的重试次数。 bulk 是重试的批量操作数量,search 是重试的搜索操作数量。
throttled_millis
请求为了符合 requests_per_second 而休眠的毫秒数。
requests_per_second
在更新查询期间实际执行的每秒请求数。
throttled_until_millis
此字段在 _update_by_query 响应中应始终等于零。它只有在使用 任务 API 时才有意义,它指示下次(以自纪元以来的毫秒数表示)节流请求将再次执行以符合 requests_per_second
failures
如果在过程中出现任何不可恢复的错误,则为错误数组。如果此数组不为空,则请求由于这些错误而中止。更新查询是使用批次实现的。任何错误都会导致整个过程中止,但当前批次中的所有错误都会被收集到数组中。您可以使用 conflicts 选项来防止重新索引在版本冲突时中止。

示例edit

_update_by_query 的最简单用法只是在数据流或索引中的每个文档上执行更新,而不会更改源。这对于 获取新属性 或其他一些在线映射更改很有用。

要更新选定的文档,请在请求正文中指定查询

resp = client.update_by_query(
    index="my-index-000001",
    conflicts="proceed",
    body={"query": {"term": {"user.id": "kimchy"}}},
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  conflicts: 'proceed',
  body: {
    query: {
      term: {
        'user.id' => 'kimchy'
      }
    }
  }
)
puts response
POST my-index-000001/_update_by_query?conflicts=proceed
{
  "query": { 
    "term": {
      "user.id": "kimchy"
    }
  }
}

查询必须作为 query 键的值传递,与 搜索 API 相同。您也可以使用 q 参数,与搜索 API 相同。

更新多个数据流或索引中的文档

resp = client.update_by_query(
    index=["my-index-000001", "my-index-000002"],
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001,my-index-000002'
)
puts response
POST my-index-000001,my-index-000002/_update_by_query

将更新查询操作限制为具有特定路由值的 shard

resp = client.update_by_query(
    index="my-index-000001",
    routing="1",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  routing: 1
)
puts response
POST my-index-000001/_update_by_query?routing=1

默认情况下,更新查询使用 1000 的滚动批次。您可以使用 scroll_size 参数更改批次大小

resp = client.update_by_query(
    index="my-index-000001",
    scroll_size="100",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  scroll_size: 100
)
puts response
POST my-index-000001/_update_by_query?scroll_size=100

使用唯一属性更新文档

resp = client.update_by_query(
    index="my-index-000001",
    body={"query": {"term": {"user.id": "kimchy"}}, "max_docs": 1},
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    query: {
      term: {
        'user.id' => 'kimchy'
      }
    },
    max_docs: 1
  }
)
puts response
POST my-index-000001/_update_by_query
{
  "query": {
    "term": {
      "user.id": "kimchy"
    }
  },
  "max_docs": 1
}

更新文档源edit

更新查询支持脚本来更新文档源。例如,以下请求会为 my-index-000001 中所有 user.idkimchy 的文档增加 count 字段

resp = client.update_by_query(
    index="my-index-000001",
    body={
        "script": {"source": "ctx._source.count++", "lang": "painless"},
        "query": {"term": {"user.id": "kimchy"}},
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    script: {
      source: 'ctx._source.count++',
      lang: 'painless'
    },
    query: {
      term: {
        'user.id' => 'kimchy'
      }
    }
  }
)
puts response
POST my-index-000001/_update_by_query
{
  "script": {
    "source": "ctx._source.count++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user.id": "kimchy"
    }
  }
}

请注意,此示例中未指定 conflicts=proceed。在这种情况下,版本冲突应停止该过程,以便您可以处理错误。

更新 API 一样,您可以设置 ctx.op 来更改执行的操作

noop

如果您的脚本决定不需要进行任何更改,请设置 ctx.op = "noop"。更新查询操作将跳过更新文档并增加 noop 计数器。

delete

如果您的脚本决定应该删除文档,请设置 ctx.op = "delete"。更新查询操作将删除文档并增加 deleted 计数器。

更新查询仅支持 indexnoopdelete。将 ctx.op 设置为任何其他值都是错误。在 ctx 中设置任何其他字段都是错误。此 API 仅允许您修改匹配文档的源,您无法移动它们。

使用摄取管道更新文档edit

更新查询可以通过指定 pipeline 来使用 摄取管道 功能

resp = client.ingest.put_pipeline(
    id="set-foo",
    body={
        "description": "sets foo",
        "processors": [{"set": {"field": "foo", "value": "bar"}}],
    },
)
print(resp)

resp = client.update_by_query(
    index="my-index-000001",
    pipeline="set-foo",
)
print(resp)
response = client.ingest.put_pipeline(
  id: 'set-foo',
  body: {
    description: 'sets foo',
    processors: [
      {
        set: {
          field: 'foo',
          value: 'bar'
        }
      }
    ]
  }
)
puts response

response = client.update_by_query(
  index: 'my-index-000001',
  pipeline: 'set-foo'
)
puts response
PUT _ingest/pipeline/set-foo
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}
POST my-index-000001/_update_by_query?pipeline=set-foo
获取更新查询操作的状态edit

您可以使用 任务 API 获取所有正在运行的更新查询请求的状态

$response = $client->tasks()->list();
resp = client.tasks.list(
    detailed="true",
    actions="*byquery",
)
print(resp)
response = client.tasks.list(
  detailed: true,
  actions: '*byquery'
)
puts response
res, err := es.Tasks.List(
	es.Tasks.List.WithActions("*byquery"),
	es.Tasks.List.WithDetailed(true),
)
fmt.Println(res, err)
const response = await client.tasks.list({
  detailed: 'true',
  actions: '*byquery'
})
console.log(response)
GET _tasks?detailed=true&actions=*byquery

响应看起来像

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/update/byquery",
          "status" : {    
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            },
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

此对象包含实际状态。它与响应 JSON 相同,重要的补充是 total 字段。 total 是重新索引预计执行的操作总数。您可以通过添加 updatedcreateddeleted 字段来估计进度。当它们的总和等于 total 字段时,请求将完成。

使用任务 ID,您可以直接查找任务。以下示例检索有关任务 r1A2WoRbTwKZ516z6NEs5A:36619 的信息

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->tasks()->get($params);
resp = client.tasks.get(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
)
print(resp)
response = client.tasks.get(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
)
puts response
res, err := es.Tasks.Get(
	"r1A2WoRbTwKZ516z6NEs5A:36619",
)
fmt.Println(res, err)
const response = await client.tasks.get({
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
})
console.log(response)
GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619

此 API 的优点是它与 wait_for_completion=false 集成,以透明地返回已完成任务的状态。如果任务已完成并且在该任务上设置了 wait_for_completion=false,则它将返回一个 resultserror 字段。此功能的代价是在 .tasks/task/${taskId} 处创建的 wait_for_completion=false 文档。您需要删除该文档。

取消更新查询操作edit

可以使用 任务取消 API 取消任何更新查询

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->tasks()->cancel($params);
resp = client.tasks.cancel(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
)
print(resp)
response = client.tasks.cancel(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
)
puts response
res, err := es.Tasks.Cancel(
	es.Tasks.Cancel.WithTaskID("r1A2WoRbTwKZ516z6NEs5A:36619"),
)
fmt.Println(res, err)
const response = await client.tasks.cancel({
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
})
console.log(response)
POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel

可以使用 任务 API 找到任务 ID。

取消应该很快发生,但可能需要几秒钟。上面的任务状态 API 将继续列出更新查询任务,直到此任务检查它是否已取消并自行终止。

更改请求的节流edit

在运行的更新中,可以使用 _rethrottle API 通过查询更改 requests_per_second 的值。

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->updateByQueryRethrottle($params);
resp = client.update_by_query_rethrottle(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
    requests_per_second="-1",
)
print(resp)
response = client.update_by_query_rethrottle(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619',
  requests_per_second: -1
)
puts response
res, err := es.UpdateByQueryRethrottle(
	"r1A2WoRbTwKZ516z6NEs5A:36619",
	esapi.IntPtr(-1),
)
fmt.Println(res, err)
const response = await client.updateByQueryRethrottle({
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619',
  requests_per_second: '-1'
})
console.log(response)
POST _update_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1

可以使用 任务 API 找到任务 ID。

与在 _update_by_query API 上设置它一样,requests_per_second 可以是 -1 来禁用节流,也可以是任何十进制数字,例如 1.712 来节流到该级别。加速查询的重新节流会立即生效,但减速查询的重新节流将在完成当前批次后生效。这可以防止滚动超时。

手动切片编辑

通过为每个请求提供切片 ID 和切片总数来手动切片更新查询。

resp = client.update_by_query(
    index="my-index-000001",
    body={
        "slice": {"id": 0, "max": 2},
        "script": {"source": "ctx._source['extra'] = 'test'"},
    },
)
print(resp)

resp = client.update_by_query(
    index="my-index-000001",
    body={
        "slice": {"id": 1, "max": 2},
        "script": {"source": "ctx._source['extra'] = 'test'"},
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    slice: {
      id: 0,
      max: 2
    },
    script: {
      source: "ctx._source['extra'] = 'test'"
    }
  }
)
puts response

response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    slice: {
      id: 1,
      max: 2
    },
    script: {
      source: "ctx._source['extra'] = 'test'"
    }
  }
)
puts response
POST my-index-000001/_update_by_query
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}
POST my-index-000001/_update_by_query
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

您可以验证它是否有效。

resp = client.indices.refresh()
print(resp)

resp = client.search(
    index="my-index-000001",
    size="0",
    q="extra:test",
    filter_path="hits.total",
)
print(resp)
response = client.indices.refresh
puts response

response = client.search(
  index: 'my-index-000001',
  size: 0,
  q: 'extra:test',
  filter_path: 'hits.total'
)
puts response
GET _refresh
POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total

这将导致一个合理的 total,如下所示。

{
  "hits": {
    "total": {
        "value": 120,
        "relation": "eq"
    }
  }
}
使用自动切片编辑

您也可以让更新查询使用 切片滚动 自动并行化,以在 _id 上进行切片。使用 slices 指定要使用的切片数量。

resp = client.update_by_query(
    index="my-index-000001",
    refresh=True,
    slices="5",
    body={"script": {"source": "ctx._source['extra'] = 'test'"}},
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  refresh: true,
  slices: 5,
  body: {
    script: {
      source: "ctx._source['extra'] = 'test'"
    }
  }
)
puts response
POST my-index-000001/_update_by_query?refresh&slices=5
{
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

您也可以验证它是否有效。

resp = client.search(
    index="my-index-000001",
    size="0",
    q="extra:test",
    filter_path="hits.total",
)
print(resp)
response = client.search(
  index: 'my-index-000001',
  size: 0,
  q: 'extra:test',
  filter_path: 'hits.total'
)
puts response
POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total

这将导致一个合理的 total,如下所示。

{
  "hits": {
    "total": {
        "value": 120,
        "relation": "eq"
    }
  }
}

slices 设置为 auto 将让 Elasticsearch 选择要使用的切片数量。此设置将使用每个分片一个切片,最多到一定限制。如果有多个源数据流或索引,它将根据具有最小分片数量的索引或后备索引选择切片数量。

slices 添加到 _update_by_query 只是自动执行了上面部分中使用的手动过程,创建子请求,这意味着它有一些怪癖。

  • 您可以在 任务 API 中看到这些请求。这些子请求是带有 slices 的请求任务的“子”任务。
  • 获取带有 slices 的请求的任务状态只包含已完成切片的状态。
  • 这些子请求可以单独寻址,用于取消和重新节流等操作。
  • 重新节流带有 slices 的请求将按比例重新节流未完成的子请求。
  • 取消带有 slices 的请求将取消每个子请求。
  • 由于 slices 的性质,每个子请求都不会获得完全均匀的文档部分。所有文档都将被处理,但一些切片可能比其他切片更大。预计较大的切片将具有更均匀的分布。
  • 带有 slices 的请求上的参数,如 requests_per_secondmax_docs,将按比例分配给每个子请求。将这一点与上面关于分布不均匀的观点结合起来,您应该得出结论,使用 max_docsslices 可能不会导致正好更新 max_docs 个文档。
  • 每个子请求都会获得源数据流或索引的略微不同的快照,尽管这些快照都是在大致相同的时间拍摄的。
获取新属性编辑

假设您创建了一个没有动态映射的索引,用数据填充它,然后添加了一个映射值来从数据中获取更多字段。

$params = [
    'index' => 'test',
    'body' => [
        'mappings' => [
            'dynamic' => false,
            'properties' => [
                'text' => [
                    'type' => 'text',
                ],
            ],
        ],
    ],
];
$response = $client->indices()->create($params);
$params = [
    'index' => 'test',
    'body' => [
        'text' => 'words words',
        'flag' => 'bar',
    ],
];
$response = $client->index($params);
$params = [
    'index' => 'test',
    'body' => [
        'text' => 'words words',
        'flag' => 'foo',
    ],
];
$response = $client->index($params);
$params = [
    'index' => 'test',
    'body' => [
        'properties' => [
            'text' => [
                'type' => 'text',
            ],
            'flag' => [
                'type' => 'text',
                'analyzer' => 'keyword',
            ],
        ],
    ],
];
$response = $client->indices()->putMapping($params);
resp = client.indices.create(
    index="test",
    body={
        "mappings": {
            "dynamic": False,
            "properties": {"text": {"type": "text"}},
        }
    },
)
print(resp)

resp = client.index(
    index="test",
    refresh=True,
    body={"text": "words words", "flag": "bar"},
)
print(resp)

resp = client.index(
    index="test",
    refresh=True,
    body={"text": "words words", "flag": "foo"},
)
print(resp)

resp = client.indices.put_mapping(
    index="test",
    body={
        "properties": {
            "text": {"type": "text"},
            "flag": {"type": "text", "analyzer": "keyword"},
        }
    },
)
print(resp)
response = client.indices.create(
  index: 'test',
  body: {
    mappings: {
      dynamic: false,
      properties: {
        text: {
          type: 'text'
        }
      }
    }
  }
)
puts response

response = client.index(
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'bar'
  }
)
puts response

response = client.index(
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'foo'
  }
)
puts response

response = client.indices.put_mapping(
  index: 'test',
  body: {
    properties: {
      text: {
        type: 'text'
      },
      flag: {
        type: 'text',
        analyzer: 'keyword'
      }
    }
  }
)
puts response
{
	res, err := es.Indices.Create(
		"test",
		es.Indices.Create.WithBody(strings.NewReader(`{
	  "mappings": {
	    "dynamic": false,
	    "properties": {
	      "text": {
	        "type": "text"
	      }
	    }
	  }
	}`)),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Index(
		"test",
		strings.NewReader(`{
	  "text": "words words",
	  "flag": "bar"
	}`),
		es.Index.WithRefresh("true"),
		es.Index.WithPretty(),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Index(
		"test",
		strings.NewReader(`{
	  "text": "words words",
	  "flag": "foo"
	}`),
		es.Index.WithRefresh("true"),
		es.Index.WithPretty(),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Indices.PutMapping(
		[]string{"test"},
		strings.NewReader(`{
	  "properties": {
	    "text": {
	      "type": "text"
	    },
	    "flag": {
	      "type": "text",
	      "analyzer": "keyword"
	    }
	  }
	}`),
	)
	fmt.Println(res, err)
}
const response0 = await client.indices.create({
  index: 'test',
  body: {
    mappings: {
      dynamic: false,
      properties: {
        text: {
          type: 'text'
        }
      }
    }
  }
})
console.log(response0)

const response1 = await client.index({
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'bar'
  }
})
console.log(response1)

const response2 = await client.index({
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'foo'
  }
})
console.log(response2)

const response3 = await client.indices.putMapping({
  index: 'test',
  body: {
    properties: {
      text: {
        type: 'text'
      },
      flag: {
        type: 'text',
        analyzer: 'keyword'
      }
    }
  }
})
console.log(response3)
PUT test
{
  "mappings": {
    "dynamic": false,   
    "properties": {
      "text": {"type": "text"}
    }
  }
}

POST test/_doc?refresh
{
  "text": "words words",
  "flag": "bar"
}
POST test/_doc?refresh
{
  "text": "words words",
  "flag": "foo"
}
PUT test/_mapping   
{
  "properties": {
    "text": {"type": "text"},
    "flag": {"type": "text", "analyzer": "keyword"}
  }
}

这意味着新字段不会被索引,只会存储在 _source 中。

这将更新映射以添加新的 flag 字段。要获取新字段,您必须重新索引所有包含它的文档。

搜索数据将找不到任何内容。

$params = [
    'index' => 'test',
    'body' => [
        'query' => [
            'match' => [
                'flag' => 'foo',
            ],
        ],
    ],
];
$response = $client->search($params);
resp = client.search(
    index="test",
    filter_path="hits.total",
    body={"query": {"match": {"flag": "foo"}}},
)
print(resp)
response = client.search(
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
)
puts response
res, err := es.Search(
	es.Search.WithIndex("test"),
	es.Search.WithBody(strings.NewReader(`{
	  "query": {
	    "match": {
	      "flag": "foo"
	    }
	  }
	}`)),
	es.Search.WithFilterPath("hits.total"),
	es.Search.WithPretty(),
)
fmt.Println(res, err)
const response = await client.search({
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
})
console.log(response)
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total": {
        "value": 0,
        "relation": "eq"
    }
  }
}

但是,您可以发出一个 _update_by_query 请求来获取新的映射。

$params = [
    'index' => 'test',
];
$response = $client->updateByQuery($params);
$params = [
    'index' => 'test',
    'body' => [
        'query' => [
            'match' => [
                'flag' => 'foo',
            ],
        ],
    ],
];
$response = $client->search($params);
resp = client.update_by_query(
    index="test",
    refresh=True,
    conflicts="proceed",
)
print(resp)

resp = client.search(
    index="test",
    filter_path="hits.total",
    body={"query": {"match": {"flag": "foo"}}},
)
print(resp)
response = client.update_by_query(
  index: 'test',
  refresh: true,
  conflicts: 'proceed'
)
puts response

response = client.search(
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
)
puts response
{
	res, err := es.UpdateByQuery(
		[]string{"test"},
		es.UpdateByQuery.WithConflicts("proceed"),
		es.UpdateByQuery.WithRefresh(true),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Search(
		es.Search.WithIndex("test"),
		es.Search.WithBody(strings.NewReader(`{
	  "query": {
	    "match": {
	      "flag": "foo"
	    }
	  }
	}`)),
		es.Search.WithFilterPath("hits.total"),
		es.Search.WithPretty(),
	)
	fmt.Println(res, err)
}
const response0 = await client.updateByQuery({
  index: 'test',
  refresh: true,
  conflicts: 'proceed'
})
console.log(response0)

const response1 = await client.search({
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
})
console.log(response1)
POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total": {
        "value": 1,
        "relation": "eq"
    }
  }
}

当您将字段添加到多字段时,您可以执行完全相同的操作。