Reindex API

编辑

将文档从源复制到目标。

源可以是任何现有的索引、别名或数据流。目标必须与源不同。例如,你不能将数据流重新索引到其自身。

Reindex 要求为源中的所有文档启用 _source

在调用 _reindex 之前,应根据需要配置目标。Reindex 不会复制源或其关联模板的设置。

必须提前配置映射、分片计数、副本等。

resp = client.reindex(
    source={
        "index": "my-index-000001"
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'my-index-000001'
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: "my-index-000001",
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

请求

编辑

POST /_reindex

前提条件

编辑
  • 如果启用了 Elasticsearch 安全功能,你必须拥有以下安全权限

    • 源数据流、索引或别名的 read 索引权限
    • 目标数据流、索引或索引别名的 write 索引权限。
    • 要使用 reindex API 请求自动创建数据流或索引,你必须拥有目标数据流、索引或别名的 auto_configurecreate_indexmanage 索引权限。
    • 如果从远程集群重新索引,source.remote.user 必须拥有源数据流、索引或别名的 monitor 集群权限read 索引权限。
  • 如果从远程集群重新索引,你必须在 elasticsearch.ymlreindex.remote.whitelist 设置中显式允许远程主机。请参阅 从远程重新索引
  • 自动创建数据流需要启用数据流的匹配索引模板。请参阅 设置数据流

描述

编辑

从源索引中提取 文档源,并将文档索引到目标索引中。你可以将所有文档复制到目标索引,或者重新索引文档的子集。

就像 _update_by_query 一样,_reindex 获取源的快照,但其目标必须 不同,因此不太可能发生版本冲突。可以像 index API 一样配置 dest 元素以控制乐观并发控制。省略 version_type 或将其设置为 internal 会导致 Elasticsearch 将文档盲目地转储到目标中,覆盖任何具有相同 ID 的文档。

version_type 设置为 external 会导致 Elasticsearch 保留源中的 version,创建任何丢失的文档,并更新目标中版本较旧的任何文档。

op_type 设置为 create 会导致 _reindex 仅在目标中创建丢失的文档。所有现有文档都将导致版本冲突。

由于数据流是 仅追加 的,因此对目标数据流的任何重新索引请求都必须具有 op_type of create。重新索引只能向目标数据流添加新文档。它无法更新目标数据流中的现有文档。

默认情况下,版本冲突会中止 _reindex 进程。要在存在冲突的情况下继续重新索引,请将 "conflicts" 请求主体参数设置为 proceed。在这种情况下,响应将包括遇到的版本冲突的计数。请注意,其他错误类型的处理不受 "conflicts" 参数的影响。此外,如果你选择计算版本冲突,则该操作可能会尝试从源中重新索引比 max_docs 更多的文档,直到它成功将 max_docs 文档索引到目标中,或者它已遍历了源查询中的每个文档。

异步运行重新索引

编辑

如果请求包含 wait_for_completion=false,则 Elasticsearch 会执行一些预检,启动请求,并返回一个 task,你可以使用该任务来取消或获取任务的状态。Elasticsearch 会在 _tasks/<task_id> 中创建此任务的记录作为文档。

从多个源重新索引

编辑

如果你有多个源要重新索引,通常最好一次重新索引一个源,而不是使用 glob 模式来选取多个源。这样,如果出现任何错误,你可以通过删除部分完成的源并重新开始来恢复该过程。这也使得并行处理过程非常简单:拆分要重新索引的源列表并并行运行每个列表。

一次性 bash 脚本似乎很适合此操作

for index in i1 i2 i3 i4 i5; do
  curl -HContent-Type:application/json -XPOST localhost:9200/_reindex?pretty -d'{
    "source": {
      "index": "'$index'"
    },
    "dest": {
      "index": "'$index'-reindexed"
    }
  }'
done

节流

编辑

requests_per_second 设置为任何正十进制数(1.461000 等),以限制 _reindex 发出索引操作批次的速度。通过用等待时间填充每个批次来限制请求。要禁用节流,请将 requests_per_second 设置为 -1

节流是通过在批次之间等待来完成的,以便可以为 _reindex 在内部使用的 scroll 提供考虑到填充的超时时间。填充时间是批次大小除以 requests_per_second 与写入所花费的时间之间的差值。默认情况下,批次大小为 1000,因此,如果将 requests_per_second 设置为 500

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds

由于批次是作为单个 _bulk 请求发出的,因此较大的批次大小会导致 Elasticsearch 创建许多请求,然后在开始下一组之前等待一段时间。这是“突发”而不是“平稳”。

重新节流

编辑

可以使用 _rethrottle API 在正在运行的重新索引上更改 requests_per_second 的值

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->reindexRethrottle($params);
resp = client.reindex_rethrottle(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
    requests_per_second="-1",
)
print(resp)
response = client.reindex_rethrottle(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619',
  requests_per_second: -1
)
puts response
res, err := es.ReindexRethrottle(
	"r1A2WoRbTwKZ516z6NEs5A:36619",
	esapi.IntPtr(-1),
)
fmt.Println(res, err)
const response = await client.reindexRethrottle({
  task_id: "r1A2WoRbTwKZ516z6NEs5A:36619",
  requests_per_second: "-1",
});
console.log(response);
POST _reindex/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1

可以使用 任务 API 找到任务 ID。

就像在 Reindex API 上设置它一样,requests_per_second 可以是 -1 以禁用节流,也可以是任何十进制数,例如 1.712 以将节流限制在该级别。加快查询速度的重新节流会立即生效,但是减慢查询速度的重新节流将在完成当前批次后生效。这样可以防止滚动超时。

切片

编辑

Reindex 支持 切片滚动 以并行化重新索引过程。这种并行化可以提高效率,并提供一种将请求分解为较小部分的便捷方式。

从远程集群重新索引不支持 手动自动切片

手动切片
编辑

通过为每个请求提供切片 ID 和切片总数来手动切片重新索引请求

resp = client.reindex(
    source={
        "index": "my-index-000001",
        "slice": {
            "id": 0,
            "max": 2
        }
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp)

resp1 = client.reindex(
    source={
        "index": "my-index-000001",
        "slice": {
            "id": 1,
            "max": 2
        }
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp1)
response = client.reindex(
  body: {
    source: {
      index: 'my-index-000001',
      slice: {
        id: 0,
        max: 2
      }
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response

response = client.reindex(
  body: {
    source: {
      index: 'my-index-000001',
      slice: {
        id: 1,
        max: 2
      }
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: "my-index-000001",
    slice: {
      id: 0,
      max: 2,
    },
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response);

const response1 = await client.reindex({
  source: {
    index: "my-index-000001",
    slice: {
      id: 1,
      max: 2,
    },
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response1);
POST _reindex
{
  "source": {
    "index": "my-index-000001",
    "slice": {
      "id": 0,
      "max": 2
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}
POST _reindex
{
  "source": {
    "index": "my-index-000001",
    "slice": {
      "id": 1,
      "max": 2
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

你可以通过以下方式验证此操作是否有效

resp = client.indices.refresh()
print(resp)

resp1 = client.search(
    index="my-new-index-000001",
    size="0",
    filter_path="hits.total",
)
print(resp1)
response = client.indices.refresh
puts response

response = client.search(
  index: 'my-new-index-000001',
  size: 0,
  filter_path: 'hits.total'
)
puts response
const response = await client.indices.refresh();
console.log(response);

const response1 = await client.search({
  index: "my-new-index-000001",
  size: 0,
  filter_path: "hits.total",
});
console.log(response1);
GET _refresh
POST my-new-index-000001/_search?size=0&filter_path=hits.total

这将产生一个像这样的合理 total

{
  "hits": {
    "total" : {
        "value": 120,
        "relation": "eq"
    }
  }
}
自动切片
编辑

你还可以让 _reindex 使用 切片滚动_id 上进行切片来自动并行化。使用 slices 指定要使用的切片数

resp = client.reindex(
    slices="5",
    refresh=True,
    source={
        "index": "my-index-000001"
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp)
response = client.reindex(
  slices: 5,
  refresh: true,
  body: {
    source: {
      index: 'my-index-000001'
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response
const response = await client.reindex({
  slices: 5,
  refresh: "true",
  source: {
    index: "my-index-000001",
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response);
POST _reindex?slices=5&refresh
{
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

你还可以通过以下方式验证此操作是否有效

resp = client.search(
    index="my-new-index-000001",
    size="0",
    filter_path="hits.total",
)
print(resp)
response = client.search(
  index: 'my-new-index-000001',
  size: 0,
  filter_path: 'hits.total'
)
puts response
const response = await client.search({
  index: "my-new-index-000001",
  size: 0,
  filter_path: "hits.total",
});
console.log(response);
POST my-new-index-000001/_search?size=0&filter_path=hits.total

这将产生一个像这样的合理 total

{
  "hits": {
    "total" : {
        "value": 120,
        "relation": "eq"
    }
  }
}

slices 设置为 auto 将让 Elasticsearch 选择要使用的切片数。此设置将为每个分片使用一个切片,最多达到一定限制。如果存在多个源,它将根据分片数最少的索引或 后备索引 来选择切片数。

slices 添加到 _reindex 只是自动化了上述部分中使用的手动过程,创建了子请求,这意味着它有一些怪癖

  • 你可以在 任务 API 中查看这些请求。这些子请求是具有 slices 的请求任务的“子”任务。
  • 获取具有 slices 的请求的任务状态仅包含已完成切片的状态。
  • 这些子请求可以单独寻址,以进行诸如取消和重新节流之类的操作。
  • 使用 slices 重新节流请求将按比例重新节流未完成的子请求。
  • 使用 slices 取消请求将取消每个子请求。
  • 由于 slices 的性质,每个子请求都不会获得完全均匀的文档部分。将处理所有文档,但是某些切片可能比其他切片大。期望较大的切片具有更均匀的分布。
  • 具有 slices 的请求上的 requests_per_secondmax_docs 之类的参数按比例分配给每个子请求。将此与上面有关分布不均匀的观点相结合,你应该得出结论,将 max_docsslices 一起使用可能不会导致重新索引正好 max_docs 个文档。
  • 每个子请求都会获得稍微不同的源快照,尽管这些快照都是在大约同一时间拍摄的。
选择切片数
编辑

如果自动切片,将 slices 设置为 auto 将为大多数索引选择合理的数字。如果手动切片或以其他方式调整自动切片,请使用以下准则。

slices 的数量等于索引中的分片数时,查询性能最有效。如果该数字很大(例如 500),请选择较小的数字,因为过多的 slices 会损害性能。将 slices 设置为高于分片数通常不会提高效率,并且会增加开销。

索引性能会随着切片数量线性扩展到可用资源。

查询性能还是索引性能在运行时中占主导地位取决于要重新索引的文档和集群资源。

重新索引路由

编辑

默认情况下,如果 _reindex 看到具有路由的文档,则除非脚本对其进行更改,否则会保留该路由。你可以在 dest 请求上设置 routing 以更改此行为

keep
将每个匹配项发送的批量请求中的路由设置为匹配项上的路由。这是默认值。
discard
将每个匹配项发送的批量请求上的路由设置为 null
=<一些文本>
将每个匹配项的批量请求的路由设置为 = 之后的所有文本。

例如,您可以使用以下请求将所有公司名称为 cat 的文档从 source 复制到 dest,并将路由设置为 cat

$params = [
    'body' => [
        'source' => [
            'index' => 'source',
            'query' => [
                'match' => [
                    'company' => 'cat',
                ],
            ],
        ],
        'dest' => [
            'index' => 'dest',
            'routing' => '=cat',
        ],
    ],
];
$response = $client->reindex($params);
resp = client.reindex(
    source={
        "index": "source",
        "query": {
            "match": {
                "company": "cat"
            }
        }
    },
    dest={
        "index": "dest",
        "routing": "=cat"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'source',
      query: {
        match: {
          company: 'cat'
        }
      }
    },
    dest: {
      index: 'dest',
      routing: '=cat'
    }
  }
)
puts response
res, err := es.Reindex(
	strings.NewReader(`{
	  "source": {
	    "index": "source",
	    "query": {
	      "match": {
	        "company": "cat"
	      }
	    }
	  },
	  "dest": {
	    "index": "dest",
	    "routing": "=cat"
	  }
	}`))
fmt.Println(res, err)
const response = await client.reindex({
  source: {
    index: "source",
    query: {
      match: {
        company: "cat",
      },
    },
  },
  dest: {
    index: "dest",
    routing: "=cat",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": "source",
    "query": {
      "match": {
        "company": "cat"
      }
    }
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

默认情况下,_reindex 使用 1000 个滚动批次。您可以使用 source 元素中的 size 字段更改批次大小。

$params = [
    'body' => [
        'source' => [
            'index' => 'source',
            'size' => 100,
        ],
        'dest' => [
            'index' => 'dest',
            'routing' => '=cat',
        ],
    ],
];
$response = $client->reindex($params);
resp = client.reindex(
    source={
        "index": "source",
        "size": 100
    },
    dest={
        "index": "dest",
        "routing": "=cat"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'source',
      size: 100
    },
    dest: {
      index: 'dest',
      routing: '=cat'
    }
  }
)
puts response
res, err := es.Reindex(
	strings.NewReader(`{
	  "source": {
	    "index": "source",
	    "size": 100
	  },
	  "dest": {
	    "index": "dest",
	    "routing": "=cat"
	  }
	}`))
fmt.Println(res, err)
const response = await client.reindex({
  source: {
    index: "source",
    size: 100,
  },
  dest: {
    index: "dest",
    routing: "=cat",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": "source",
    "size": 100
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

使用 Ingest 管道重新索引

编辑

通过指定一个 pipeline,Reindex 还可以使用 Ingest 管道功能,如下所示

$params = [
    'body' => [
        'source' => [
            'index' => 'source',
        ],
        'dest' => [
            'index' => 'dest',
            'pipeline' => 'some_ingest_pipeline',
        ],
    ],
];
$response = $client->reindex($params);
resp = client.reindex(
    source={
        "index": "source"
    },
    dest={
        "index": "dest",
        "pipeline": "some_ingest_pipeline"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'source'
    },
    dest: {
      index: 'dest',
      pipeline: 'some_ingest_pipeline'
    }
  }
)
puts response
res, err := es.Reindex(
	strings.NewReader(`{
	  "source": {
	    "index": "source"
	  },
	  "dest": {
	    "index": "dest",
	    "pipeline": "some_ingest_pipeline"
	  }
	}`))
fmt.Println(res, err)
const response = await client.reindex({
  source: {
    index: "source",
  },
  dest: {
    index: "dest",
    pipeline: "some_ingest_pipeline",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": "source"
  },
  "dest": {
    "index": "dest",
    "pipeline": "some_ingest_pipeline"
  }
}

查询参数

编辑
refresh
(可选,布尔值)如果为 true,则请求会刷新受影响的分片,以使此操作对搜索可见。默认为 false
timeout

(可选,时间单位)每个索引等待以下操作的周期

默认为 1m(一分钟)。这保证了 Elasticsearch 在失败前至少等待超时时间。实际等待时间可能更长,尤其是在发生多次等待时。

wait_for_active_shards

(可选,字符串)在继续操作之前必须处于活动状态的每个分片的副本数。设置为 all 或任何非负整数,直到索引中每个分片的副本总数 (number_of_replicas+1)。默认为 1,表示仅等待每个主分片处于活动状态。

请参阅 活动分片

wait_for_completion
(可选,布尔值)如果为 true,则请求将阻塞,直到操作完成。默认为 true
requests_per_second
(可选,整数)此请求的节流限制,以每秒子请求数为单位。默认为 -1(不节流)。
require_alias
(可选,布尔值)如果为 true,则目标必须是 索引别名。默认为 false
scroll
(可选,时间单位)指定应为滚动搜索维护索引一致视图的时长。
slices
(可选,整数)此任务应划分为的切片数。默认为 1,表示任务未切片为子任务。
max_docs
(可选,整数)要处理的最大文档数。默认为所有文档。当设置为小于或等于 scroll_size 的值时,将不会使用滚动来检索操作结果。

请求体

编辑
conflicts
(可选,枚举)设置为 proceed 可继续重新索引,即使存在冲突。默认为 abort
max_docs
(可选,整数)要重新索引的最大文档数。如果 冲突 等于 proceed,则重新索引可能会尝试从源重新索引比 max_docs 更多的文档,直到它已成功将 max_docs 个文档索引到目标中,或者它已遍历源查询中的每个文档。
source
index
(必需,字符串)您要复制其中的数据流、索引或别名的名称。也接受以逗号分隔的列表以从多个源重新索引。
query
(可选,查询对象)使用查询 DSL 指定要重新索引的文档。
remote
host
(可选,字符串)您要从中索引的 Elasticsearch 远程实例的 URL。从远程索引时必需。
username
(可选,字符串)用于与远程主机进行身份验证的用户名。
password
(可选,字符串)用于与远程主机进行身份验证的密码。
socket_timeout
(可选,时间单位)远程套接字读取超时。默认为 30 秒。
connect_timeout
(可选,时间单位)远程连接超时。默认为 30 秒。
headers
(可选,对象)包含请求标头的对象。
size
(可选,整数)每个批次要索引的文档数。从远程索引时使用,以确保批次适合堆上的缓冲区,该缓冲区默认为最大 100 MB。
slice
id
(可选,整数)用于手动切片的切片 ID。
max
(可选,整数)切片的总数。
sort

(可选,列表)以逗号分隔的 <field>:<direction> 对列表,用于在索引前进行排序。与 max_docs 结合使用,以控制重新索引的文档。

在 7.6 中已弃用。

重新索引中的排序已弃用。重新索引中的排序从未保证按顺序索引文档,并且会阻止重新索引的进一步开发,例如弹性性能改进。如果与 max_docs 结合使用,请考虑改用查询过滤器。

_source
(可选,字符串)如果为 true,则重新索引所有源字段。设置为列表以重新索引选定的字段。默认为 true
dest
index
(必需,字符串)您要复制其中的数据流、索引或索引别名的名称。
version_type
(可选,枚举)用于索引操作的版本控制。有效值:internalexternalexternal_gtexternal_gte。有关详细信息,请参阅 版本类型
op_type

(可选,枚举)设置为 create 可仅索引尚不存在的文档(如果不存在则放入)。有效值:indexcreate。默认为 index

要重新索引到数据流目标,此参数必须为 create

pipeline
(可选,字符串)要使用的 管道 的名称。
script
source
(可选,字符串)在重新索引时运行以更新文档源或元数据的脚本。
lang
(可选,枚举)脚本语言:painlessexpressionmustachejava。有关详细信息,请参阅 脚本

响应体

编辑
took
(整数)整个操作所花费的总毫秒数。
timed_out
(布尔值)如果重新索引期间执行的任何请求超时,则将此标志设置为 true
total
(整数)已成功处理的文档数。
updated
(整数)已成功更新的文档数,即重新索引更新它之前已存在的具有相同 ID 的文档。
created
(整数)已成功创建的文档数。
deleted
(整数)已成功删除的文档数。
batches
(整数)重新索引拉回的滚动响应数。
noops
(整数)由于用于重新索引的脚本为 ctx.op 返回 noop 值而被忽略的文档数。
version_conflicts
(整数)重新索引命中的版本冲突数。
retries
(整数)重新索引尝试的重试次数。bulk 是重试的批量操作数,search 是重试的搜索操作数。
throttled_millis
(整数)请求为了符合 requests_per_second 而休眠的毫秒数。
requests_per_second
(整数)重新索引期间有效执行的每秒请求数。
throttled_until_millis
(整数)此字段在 _reindex 响应中应始终等于零。它仅在使用 任务 API 时才有意义,它指示为了符合 requests_per_second,节流请求将再次执行的下一个时间(以自纪元以来的毫秒为单位)。
failures
(数组)如果在处理过程中出现任何无法恢复的错误,则为失败数组。如果此数组不为空,则由于这些失败,请求已中止。重新索引是使用批次实现的,任何失败都会导致整个过程中止,但当前批次中的所有失败都会收集到数组中。您可以使用 conflicts 选项来防止重新索引在版本冲突时中止。

示例

编辑

使用查询重新索引选定的文档

编辑

您可以通过向 source 添加查询来限制文档。例如,以下请求仅将 user.idkimchy 的文档复制到 my-new-index-000001

resp = client.reindex(
    source={
        "index": "my-index-000001",
        "query": {
            "term": {
                "user.id": "kimchy"
            }
        }
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'my-index-000001',
      query: {
        term: {
          'user.id' => 'kimchy'
        }
      }
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: "my-index-000001",
    query: {
      term: {
        "user.id": "kimchy",
      },
    },
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": "my-index-000001",
    "query": {
      "term": {
        "user.id": "kimchy"
      }
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

使用 max_docs 重新索引选定的文档

编辑

您可以通过设置 max_docs 来限制处理的文档数。例如,此请求将单个文档从 my-index-000001 复制到 my-new-index-000001

resp = client.reindex(
    max_docs=1,
    source={
        "index": "my-index-000001"
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp)
response = client.reindex(
  body: {
    max_docs: 1,
    source: {
      index: 'my-index-000001'
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response
const response = await client.reindex({
  max_docs: 1,
  source: {
    index: "my-index-000001",
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response);
POST _reindex
{
  "max_docs": 1,
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

从多个源重新索引

编辑

source 中的 index 属性可以是列表,允许您在一个请求中从多个源复制。这将从 my-index-000001my-index-000002 索引复制文档

resp = client.reindex(
    source={
        "index": [
            "my-index-000001",
            "my-index-000002"
        ]
    },
    dest={
        "index": "my-new-index-000002"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: [
        'my-index-000001',
        'my-index-000002'
      ]
    },
    dest: {
      index: 'my-new-index-000002'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: ["my-index-000001", "my-index-000002"],
  },
  dest: {
    index: "my-new-index-000002",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": ["my-index-000001", "my-index-000002"]
  },
  "dest": {
    "index": "my-new-index-000002"
  }
}

Reindex API 不会尽力处理 ID 冲突,因此最后写入的文档将“获胜”,但顺序通常是不可预测的,因此依赖此行为不是一个好主意。相反,请确保使用脚本使 ID 唯一。

使用源过滤器重新索引选定的字段

编辑

您可以使用源筛选来重新索引原始文档中的字段子集。例如,以下请求仅重新索引每个文档的 user.id_doc 字段

resp = client.reindex(
    source={
        "index": "my-index-000001",
        "_source": [
            "user.id",
            "_doc"
        ]
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'my-index-000001',
      _source: [
        'user.id',
        '_doc'
      ]
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: "my-index-000001",
    _source: ["user.id", "_doc"],
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": "my-index-000001",
    "_source": ["user.id", "_doc"]
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

重新索引以更改字段的名称

编辑

_reindex 可用于构建具有重命名字段的索引副本。假设您创建了一个索引,其中包含如下所示的文档

resp = client.index(
    index="my-index-000001",
    id="1",
    refresh=True,
    document={
        "text": "words words",
        "flag": "foo"
    },
)
print(resp)
response = client.index(
  index: 'my-index-000001',
  id: 1,
  refresh: true,
  body: {
    text: 'words words',
    flag: 'foo'
  }
)
puts response
const response = await client.index({
  index: "my-index-000001",
  id: 1,
  refresh: "true",
  document: {
    text: "words words",
    flag: "foo",
  },
});
console.log(response);
POST my-index-000001/_doc/1?refresh
{
  "text": "words words",
  "flag": "foo"
}

但您不喜欢名称 flag,并且希望将其替换为 tag_reindex 可以为您创建另一个索引

resp = client.reindex(
    source={
        "index": "my-index-000001"
    },
    dest={
        "index": "my-new-index-000001"
    },
    script={
        "source": "ctx._source.tag = ctx._source.remove(\"flag\")"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'my-index-000001'
    },
    dest: {
      index: 'my-new-index-000001'
    },
    script: {
      source: 'ctx._source.tag = ctx._source.remove("flag")'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: "my-index-000001",
  },
  dest: {
    index: "my-new-index-000001",
  },
  script: {
    source: 'ctx._source.tag = ctx._source.remove("flag")',
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001"
  },
  "script": {
    "source": "ctx._source.tag = ctx._source.remove(\"flag\")"
  }
}

现在您可以获取新文档

resp = client.get(
    index="my-new-index-000001",
    id="1",
)
print(resp)
response = client.get(
  index: 'my-new-index-000001',
  id: 1
)
puts response
const response = await client.get({
  index: "my-new-index-000001",
  id: 1,
});
console.log(response);
GET my-new-index-000001/_doc/1

这将返回

{
  "found": true,
  "_id": "1",
  "_index": "my-new-index-000001",
  "_version": 1,
  "_seq_no": 44,
  "_primary_term": 1,
  "_source": {
    "text": "words words",
    "tag": "foo"
  }
}

重新索引每日索引

编辑

您可以将 _reindexPainless 结合使用,以重新索引每日索引,从而将新模板应用于现有文档。

假设您的索引包含如下所示的文档

$params = [
    'index' => 'metricbeat-2016.05.30',
    'id' => '1',
    'body' => [
        'system.cpu.idle.pct' => 0.908,
    ],
];
$response = $client->index($params);
$params = [
    'index' => 'metricbeat-2016.05.31',
    'id' => '1',
    'body' => [
        'system.cpu.idle.pct' => 0.105,
    ],
];
$response = $client->index($params);
resp = client.index(
    index="metricbeat-2016.05.30",
    id="1",
    refresh=True,
    document={
        "system.cpu.idle.pct": 0.908
    },
)
print(resp)

resp1 = client.index(
    index="metricbeat-2016.05.31",
    id="1",
    refresh=True,
    document={
        "system.cpu.idle.pct": 0.105
    },
)
print(resp1)
response = client.index(
  index: 'metricbeat-2016.05.30',
  id: 1,
  refresh: true,
  body: {
    'system.cpu.idle.pct' => 0.908
  }
)
puts response

response = client.index(
  index: 'metricbeat-2016.05.31',
  id: 1,
  refresh: true,
  body: {
    'system.cpu.idle.pct' => 0.105
  }
)
puts response
{
	res, err := es.Index(
		"metricbeat-2016.05.30",
		strings.NewReader(`{
	  "system.cpu.idle.pct": 0.908
	}`),
		es.Index.WithDocumentID("1"),
		es.Index.WithRefresh("true"),
		es.Index.WithPretty(),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Index(
		"metricbeat-2016.05.31",
		strings.NewReader(`{
	  "system.cpu.idle.pct": 0.105
	}`),
		es.Index.WithDocumentID("1"),
		es.Index.WithRefresh("true"),
		es.Index.WithPretty(),
	)
	fmt.Println(res, err)
}
const response = await client.index({
  index: "metricbeat-2016.05.30",
  id: 1,
  refresh: "true",
  document: {
    "system.cpu.idle.pct": 0.908,
  },
});
console.log(response);

const response1 = await client.index({
  index: "metricbeat-2016.05.31",
  id: 1,
  refresh: "true",
  document: {
    "system.cpu.idle.pct": 0.105,
  },
});
console.log(response1);
PUT metricbeat-2016.05.30/_doc/1?refresh
{"system.cpu.idle.pct": 0.908}
PUT metricbeat-2016.05.31/_doc/1?refresh
{"system.cpu.idle.pct": 0.105}

metricbeat-* 索引的新模板已加载到 Elasticsearch 中,但它仅适用于新创建的索引。Painless 可用于重新索引现有文档并应用新模板。

以下脚本从索引名称中提取日期,并创建一个附加了 -1 的新索引。所有来自 metricbeat-2016.05.31 的数据都将被重新索引到 metricbeat-2016.05.31-1 中。

$params = [
    'body' => [
        'source' => [
            'index' => 'metricbeat-*',
        ],
        'dest' => [
            'index' => 'metricbeat',
        ],
        'script' => [
            'lang' => 'painless',
            'source' => 'ctx._index = \'metricbeat-\' + (ctx._index.substring(\'metricbeat-\'.length(), ctx._index.length())) + \'-1\'',
        ],
    ],
];
$response = $client->reindex($params);
resp = client.reindex(
    source={
        "index": "metricbeat-*"
    },
    dest={
        "index": "metricbeat"
    },
    script={
        "lang": "painless",
        "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'metricbeat-*'
    },
    dest: {
      index: 'metricbeat'
    },
    script: {
      lang: 'painless',
      source: "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
    }
  }
)
puts response
res, err := es.Reindex(
	strings.NewReader(`{
	  "source": {
	    "index": "metricbeat-*"
	  },
	  "dest": {
	    "index": "metricbeat"
	  },
	  "script": {
	    "lang": "painless",
	    "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
	  }
	}`))
fmt.Println(res, err)
const response = await client.reindex({
  source: {
    index: "metricbeat-*",
  },
  dest: {
    index: "metricbeat",
  },
  script: {
    lang: "painless",
    source:
      "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": "metricbeat-*"
  },
  "dest": {
    "index": "metricbeat"
  },
  "script": {
    "lang": "painless",
    "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
  }
}

现在可以在 *-1 索引中找到之前 metricbeat 索引中的所有文档。

$params = [
    'index' => 'metricbeat-2016.05.30-1',
    'id' => '1',
];
$response = $client->get($params);
$params = [
    'index' => 'metricbeat-2016.05.31-1',
    'id' => '1',
];
$response = $client->get($params);
resp = client.get(
    index="metricbeat-2016.05.30-1",
    id="1",
)
print(resp)

resp1 = client.get(
    index="metricbeat-2016.05.31-1",
    id="1",
)
print(resp1)
response = client.get(
  index: 'metricbeat-2016.05.30-1',
  id: 1
)
puts response

response = client.get(
  index: 'metricbeat-2016.05.31-1',
  id: 1
)
puts response
{
	res, err := es.Get("metricbeat-2016.05.30-1", "1", es.Get.WithPretty())
	fmt.Println(res, err)
}

{
	res, err := es.Get("metricbeat-2016.05.31-1", "1", es.Get.WithPretty())
	fmt.Println(res, err)
}
const response = await client.get({
  index: "metricbeat-2016.05.30-1",
  id: 1,
});
console.log(response);

const response1 = await client.get({
  index: "metricbeat-2016.05.31-1",
  id: 1,
});
console.log(response1);
GET metricbeat-2016.05.30-1/_doc/1
GET metricbeat-2016.05.31-1/_doc/1

先前的方法还可以与更改字段名称结合使用,以仅将现有数据加载到新索引中,并根据需要重命名字段。

提取源的随机子集

编辑

_reindex 可以用来提取源的随机子集以进行测试。

resp = client.reindex(
    max_docs=10,
    source={
        "index": "my-index-000001",
        "query": {
            "function_score": {
                "random_score": {},
                "min_score": 0.9
            }
        }
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp)
response = client.reindex(
  body: {
    max_docs: 10,
    source: {
      index: 'my-index-000001',
      query: {
        function_score: {
          random_score: {},
          min_score: 0.9
        }
      }
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response
const response = await client.reindex({
  max_docs: 10,
  source: {
    index: "my-index-000001",
    query: {
      function_score: {
        random_score: {},
        min_score: 0.9,
      },
    },
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response);
POST _reindex
{
  "max_docs": 10,
  "source": {
    "index": "my-index-000001",
    "query": {
      "function_score" : {
        "random_score" : {},
        "min_score" : 0.9    
      }
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

您可能需要根据从源中提取的相对数据量来调整 min_score

在重新索引期间修改文档

编辑

_update_by_query 类似,_reindex 支持修改文档的脚本。与 _update_by_query 不同,该脚本允许修改文档的元数据。此示例会增加源文档的版本。

resp = client.reindex(
    source={
        "index": "my-index-000001"
    },
    dest={
        "index": "my-new-index-000001",
        "version_type": "external"
    },
    script={
        "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
        "lang": "painless"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      index: 'my-index-000001'
    },
    dest: {
      index: 'my-new-index-000001',
      version_type: 'external'
    },
    script: {
      source: "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
      lang: 'painless'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    index: "my-index-000001",
  },
  dest: {
    index: "my-new-index-000001",
    version_type: "external",
  },
  script: {
    source:
      "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
    lang: "painless",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001",
    "version_type": "external"
  },
  "script": {
    "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
    "lang": "painless"
  }
}

就像在 _update_by_query 中一样,您可以设置 ctx.op 来更改在目标上执行的操作。

noop
如果您的脚本确定文档不必在目标中索引,则设置 ctx.op = "noop"。此空操作将在响应正文中的 noop 计数器中报告。
删除
如果您的脚本确定必须从目标中删除文档,则设置 ctx.op = "delete"。删除将在响应正文中的 deleted 计数器中报告。

ctx.op 设置为其他任何值将返回错误,设置 ctx 中的任何其他字段也会返回错误。

想想这些可能性!请务必小心;您可以更改

  • _id
  • _index
  • _version
  • _routing

_version 设置为 null 或从 ctx 映射中清除它,就像在索引请求中不发送版本一样;这将导致文档在目标中被覆盖,而无论目标上的版本或您在 _reindex 请求中使用的版本类型。

从远程重新索引

编辑

重新索引支持从远程 Elasticsearch 集群重新索引

resp = client.reindex(
    source={
        "remote": {
            "host": "http://otherhost:9200",
            "username": "user",
            "password": "pass"
        },
        "index": "my-index-000001",
        "query": {
            "match": {
                "test": "data"
            }
        }
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      remote: {
        host: 'http://otherhost:9200',
        username: 'user',
        password: 'pass'
      },
      index: 'my-index-000001',
      query: {
        match: {
          test: 'data'
        }
      }
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    remote: {
      host: "http://otherhost:9200",
      username: "user",
      password: "pass",
    },
    index: "my-index-000001",
    query: {
      match: {
        test: "data",
      },
    },
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "username": "user",
      "password": "pass"
    },
    "index": "my-index-000001",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

host 参数必须包含方案、主机、端口(例如 https://otherhost:9200)和可选路径(例如 https://otherhost:9200/proxy)。usernamepassword 参数是可选的,当它们存在时,_reindex 将使用基本身份验证连接到远程 Elasticsearch 节点。使用基本身份验证时,请务必使用 https,否则密码将以纯文本形式发送。有一系列的设置可用于配置 https 连接的行为。

使用 Elastic Cloud 时,也可以通过使用有效的 API 密钥对远程集群进行身份验证。

resp = client.reindex(
    source={
        "remote": {
            "host": "http://otherhost:9200",
            "headers": {
                "Authorization": "ApiKey API_KEY_VALUE"
            }
        },
        "index": "my-index-000001",
        "query": {
            "match": {
                "test": "data"
            }
        }
    },
    dest={
        "index": "my-new-index-000001"
    },
)
print(resp)
response = client.reindex(
  body: {
    source: {
      remote: {
        host: 'http://otherhost:9200',
        headers: {
          "Authorization": 'ApiKey API_KEY_VALUE'
        }
      },
      index: 'my-index-000001',
      query: {
        match: {
          test: 'data'
        }
      }
    },
    dest: {
      index: 'my-new-index-000001'
    }
  }
)
puts response
const response = await client.reindex({
  source: {
    remote: {
      host: "http://otherhost:9200",
      headers: {
        Authorization: "ApiKey API_KEY_VALUE",
      },
    },
    index: "my-index-000001",
    query: {
      match: {
        test: "data",
      },
    },
  },
  dest: {
    index: "my-new-index-000001",
  },
});
console.log(response);
POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "headers": {
        "Authorization": "ApiKey API_KEY_VALUE"
      }
    },
    "index": "my-index-000001",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

必须在 elasticsearch.yml 中使用 reindex.remote.whitelist 属性显式允许远程主机。它可以设置为允许的远程 hostport 组合的逗号分隔列表。忽略方案,仅使用主机和端口。例如

reindex.remote.whitelist: [otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*"]

必须在任何将协调重新索引的节点上配置允许的主机列表。

此功能应适用于您可能找到的任何版本的 Elasticsearch 远程集群。这应该允许您通过从旧版本的集群重新索引来将任何版本的 Elasticsearch 升级到当前版本。

Elasticsearch 不支持跨主要版本的前向兼容性。例如,您不能从 7.x 集群重新索引到 6.x 集群。

为了启用发送到旧版本 Elasticsearch 的查询,query 参数将直接发送到远程主机,而无需验证或修改。

从远程集群重新索引不支持 手动自动切片

从远程服务器重新索引使用一个默认最大大小为 100MB 的堆上缓冲区。如果远程索引包含非常大的文档,则需要使用较小的批处理大小。下面的示例将批处理大小设置为 10,这是非常小的。

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      ...
    },
    "index": "source",
    "size": 10,
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

也可以使用 socket_timeout 字段设置远程连接上的套接字读取超时,并使用 connect_timeout 字段设置连接超时。两者默认都为 30 秒。此示例将套接字读取超时设置为一分钟,将连接超时设置为 10 秒。

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      ...,
      "socket_timeout": "1m",
      "connect_timeout": "10s"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

配置 SSL 参数

编辑

从远程重新索引支持可配置的 SSL 设置。这些必须在 elasticsearch.yml 文件中指定,安全设置除外,安全设置是在 Elasticsearch 密钥库中添加的。无法在 _reindex 请求的正文中配置 SSL。

支持以下设置

reindex.ssl.certificate_authorities
应信任的 PEM 编码证书文件的路径列表。您不能同时指定 reindex.ssl.certificate_authoritiesreindex.ssl.truststore.path
reindex.ssl.truststore.path
包含要信任的证书的 Java 密钥库文件的路径。此密钥库可以是“JKS”或“PKCS#12”格式。您不能同时指定 reindex.ssl.certificate_authoritiesreindex.ssl.truststore.path
reindex.ssl.truststore.password
密钥库 (reindex.ssl.truststore.path) 的密码。 [7.17.0] 在 7.17.0 中已弃用。 请优先使用 reindex.ssl.truststore.secure_password。此设置不能与 reindex.ssl.truststore.secure_password 一起使用。
reindex.ssl.truststore.secure_password (安全)
密钥库 (reindex.ssl.truststore.path) 的密码。此设置不能与 reindex.ssl.truststore.password 一起使用。
reindex.ssl.truststore.type
密钥库 (reindex.ssl.truststore.path) 的类型。必须为 jksPKCS12。如果密钥库路径以“.p12”、“.pfx”或“pkcs12”结尾,则此设置默认为 PKCS12。否则,它默认为 jks
reindex.ssl.verification_mode
指示用于防止中间人攻击和证书伪造的验证类型。可以是 full(验证主机名和证书路径)、certificate(验证证书路径,但不验证主机名)或 none(不执行任何验证 - 强烈建议不要在生产环境中使用)。默认为 full
reindex.ssl.certificate
指定用于 HTTP 客户端身份验证的 PEM 编码证书(或证书链)的路径(如果远程集群需要)。此设置要求还设置 reindex.ssl.key。您不能同时指定 reindex.ssl.certificatereindex.ssl.keystore.path
reindex.ssl.key
指定与用于客户端身份验证的证书关联的 PEM 编码私钥的路径(reindex.ssl.certificate)。您不能同时指定 reindex.ssl.keyreindex.ssl.keystore.path
reindex.ssl.key_passphrase
指定用于解密 PEM 编码私钥 (reindex.ssl.key) 的密码(如果已加密)。 [7.17.0] 在 7.17.0 中已弃用。 请优先使用 reindex.ssl.secure_key_passphrase。不能与 reindex.ssl.secure_key_passphrase 一起使用。
reindex.ssl.secure_key_passphrase (安全)
指定用于解密 PEM 编码私钥 (reindex.ssl.key) 的密码(如果已加密)。不能与 reindex.ssl.key_passphrase 一起使用。
reindex.ssl.keystore.path
指定密钥库的路径,该密钥库包含用于 HTTP 客户端身份验证的私钥和证书(如果远程集群需要)。此密钥库可以是“JKS”或“PKCS#12”格式。您不能同时指定 reindex.ssl.keyreindex.ssl.keystore.path
reindex.ssl.keystore.type
密钥库 (reindex.ssl.keystore.path) 的类型。必须为 jksPKCS12。如果密钥库路径以“.p12”、“.pfx”或“pkcs12”结尾,则此设置默认为 PKCS12。否则,它默认为 jks
reindex.ssl.keystore.password
密钥库 (reindex.ssl.keystore.path) 的密码。 [7.17.0] 在 7.17.0 中已弃用。 请优先使用 reindex.ssl.keystore.secure_password。此设置不能与 reindex.ssl.keystore.secure_password 一起使用。
reindex.ssl.keystore.secure_password (安全)
密钥库 (reindex.ssl.keystore.path) 的密码。此设置不能与 reindex.ssl.keystore.password 一起使用。
reindex.ssl.keystore.key_password
密钥库 (reindex.ssl.keystore.path) 中密钥的密码。默认为密钥库密码。 [7.17.0] 在 7.17.0 中已弃用。 请优先使用 reindex.ssl.keystore.secure_key_password。此设置不能与 reindex.ssl.keystore.secure_key_password 一起使用。
reindex.ssl.keystore.secure_key_password (安全)
密钥库 (reindex.ssl.keystore.path) 中密钥的密码。默认为密钥库密码。此设置不能与 reindex.ssl.keystore.key_password 一起使用。