创建转换 API
编辑创建转换 API
编辑实例化一个转换。
请求
编辑PUT _transform/<transform_id>
先决条件
编辑需要以下权限
- 集群:
manage_transform
(transform_admin
内置角色授予此权限) - 源索引:
read
,view_index_metadata
- 目标索引:
read
,create_index
,index
。如果配置了retention_policy
,则还需要delete
权限。
描述
编辑此 API 定义了一个转换,它从源索引复制数据、转换数据并将其持久化到以实体为中心的目标索引中。如果您选择对转换使用 pivot 方法,则实体由 pivot
对象中的 group_by
字段集定义。如果您选择使用 latest 方法,则实体由 latest
对象中的 unique_key
字段值定义。
您也可以将目标索引视为二维表格数据结构(称为数据帧)。数据帧中每个文档的 ID 由实体的哈希值生成,因此每个实体都有一行唯一行。有关更多信息,请参阅转换数据。
创建转换时,会进行一系列验证以确保其成功。例如,会检查源索引是否存在,并检查目标索引是否不是源索引模式的一部分。您可以使用 defer_validation
参数来跳过这些检查。
在启动转换时,始终会运行延迟验证,特权检查除外。
- 转换会记住创建它的用户在创建时拥有的角色,并使用相同的角色。如果这些角色在源索引和目标索引上没有所需的权限,则当转换尝试未经授权的操作时会失败。如果您提供辅助授权标头,则会使用这些凭据。
- 您必须使用 Kibana 或此 API 创建转换。不要使用 Elasticsearch 索引 API 将转换直接添加到任何
.transform-internal*
索引中。如果启用了 Elasticsearch 安全功能,请不要向用户授予任何.transform-internal*
索引的权限。如果您在 7.5 之前使用过转换,也不要向用户授予任何.data-frame-internal*
索引的权限。
您必须为转换选择 latest 或 pivot 方法;您不能在单个转换中同时使用两者。
路径参数
编辑-
<transform_id>
- (必需,字符串)转换的标识符。此标识符可以包含小写字母数字字符(a-z 和 0-9)、连字符和下划线。它具有 64 个字符的限制,并且必须以字母数字字符开头和结尾。
查询参数
编辑-
defer_validation
- (可选,布尔值)如果为
true
,则不会运行可延迟的验证。如果源索引在创建转换后才存在,则可能需要此行为。 -
timeout
- (可选,时间)等待响应的时间段。如果在超时到期之前未收到任何响应,则请求失败并返回错误。默认为
30s
。
请求正文
编辑-
description
- (可选,字符串)转换的自由文本描述。
-
dest
-
(必需,对象)转换的目标。
dest
的属性-
index
- (必需,字符串)转换的目标索引。
在
pivot
转换的情况下,目标索引的映射在可能的情况下根据源字段推断出来。如果需要替代映射,请在启动转换之前使用创建索引 API。在
latest
转换的情况下,永远不会推断映射。如果目标索引的动态映射是不可取的,请在启动转换之前使用创建索引 API。-
aliases
- (可选,对象数组)转换的目标索引应具有的别名。别名是使用转换的存储凭据操作的,这意味着在创建时提供的辅助凭据(如果同时指定了主凭据和辅助凭据)。
无论目标索引是由转换创建还是用户预先创建的,都会将目标索引添加到别名。
+
aliases
的属性详情
-
alias
- (必需,字符串)别名的名称。
-
move_on_creation
- (可选,布尔值)目标索引是否应为此别名中唯一的索引。如果为
true
,则在将目标索引添加到此别名之前,将从此别名中删除所有其他索引。默认为false
。
-
pipeline
- (可选,字符串)Ingest管道的唯一标识符。
-
-
frequency
- (可选,时间单位)当转换持续运行时,检查源索引中更改的间隔。最小值是
1s
,最大值是1h
。默认值为1m
。
-
latest
-
(必需*,对象)
latest
方法通过查找每个唯一键的最新文档来转换数据。latest
的属性-
sort
- (必需,字符串)指定用于标识最新文档的日期字段。
-
unique_key
- (必需,字符串数组)指定用于对数据进行分组的一个或多个字段的数组。
-
-
_meta
- (可选,对象)定义可选的转换元数据。
-
pivot
-
(必需*,对象)
pivot
方法通过聚合和分组数据来转换数据。这些对象定义group by
字段和用于减少数据的聚合。pivot
的属性
-
retention_policy
-
(可选,对象)定义转换的保留策略。满足定义的条件的数据将从目标索引中删除。
retention_policy
的属性-
time
-
(必需,对象)指定转换使用时间字段来设置保留策略。如果存在保留策略的
time.field
并且包含早于max.age
的数据,则会删除数据。time
的属性-
field
- (必需,字符串)用于计算文档年龄的日期字段。将
time.field
设置为现有的日期字段。 -
max_age
- (必需,时间单位)指定目标索引中文档的最大年龄。早于配置值的文档将从目标索引中删除。
-
-
-
settings
-
(可选,对象)定义可选的转换设置。
settings
的属性-
align_checkpoints
- (可选,布尔值)指定是否应优化转换检查点范围以提高性能。当在转换配置中将日期直方图指定为组源时,此类优化可以将检查点范围与日期直方图间隔对齐。这样做的效果是,在目标索引中执行的文档更新更少,从而提高了整体性能。默认值为
true
,这意味着如果可能,将优化检查点范围。 -
dates_as_epoch_millis
- (可选,布尔值)定义输出中的日期应写入为 ISO 格式的字符串(默认)还是自 epoch 以来的毫秒数。
epoch_millis
一直是7.11
版本之前创建的转换的默认值。对于兼容的输出,请将其设置为true
。默认值为false
。 -
deduce_mappings
- (可选,布尔值)指定是否应从转换配置中推断目标索引映射。默认值为
true
,表示如果可能,将推断目标索引映射。 -
docs_per_second
- (可选,浮点数)指定每秒输入文档数量的限制。此设置通过在搜索请求之间添加等待时间来限制转换。默认值为
null
,表示禁用限制。 -
max_page_search_size
- (可选,整数)定义每个检查点用于复合聚合的初始页面大小。如果发生断路器异常,则页面大小会动态调整为较低的值。最小值是
10
,最大值是65,536
。默认值为500
。 -
num_failure_retries
- (可选,整数)定义在将转换任务标记为
failed
之前,可恢复的失败的重试次数。最小值是0
,最大值是100
。-1
可用于表示无限。在这种情况下,转换永远不会放弃重试可恢复的失败。默认值是集群级别的设置num_transform_failure_retries
。 -
unattended
- (可选,布尔值)如果为
true
,则转换在无人值守模式下运行。在无人值守模式下,如果发生错误,转换会无限期重试,这意味着转换永远不会失败。将重试次数设置为无限次以外的值将在验证中失败。默认为false
。
-
-
source
-
(必需,对象)转换的数据源。
source
的属性-
index
-
(必需,字符串或数组)转换的源索引。它可以是单个索引、索引模式(例如,
"my-index-*"
)、索引数组(例如,["my-index-000001", "my-index-000002"]
)或索引模式数组(例如,["my-index-*", "my-other-index-*"]
)。对于远程索引,请使用语法"remote_name:index_name"
。如果任何索引位于远程集群中,则主节点和至少一个转换节点必须具有
remote_cluster_client
节点角色。 -
query
- (可选,对象)一个查询子句,用于从源索引检索数据子集。请参阅 查询 DSL。
-
runtime_mappings
- (可选,对象)转换可以使用的搜索时运行时字段的定义。对于搜索运行时字段,所有数据节点(包括远程节点)必须为 7.12 或更高版本。
-
示例
编辑以下转换使用 pivot
方法
resp = client.transform.put_transform( transform_id="ecommerce_transform1", source={ "index": "kibana_sample_data_ecommerce", "query": { "term": { "geoip.continent_name": { "value": "Asia" } } } }, pivot={ "group_by": { "customer_id": { "terms": { "field": "customer_id", "missing_bucket": True } } }, "aggregations": { "max_price": { "max": { "field": "taxful_total_price" } } } }, description="Maximum priced ecommerce data by customer_id in Asia", dest={ "index": "kibana_sample_data_ecommerce_transform1", "pipeline": "add_timestamp_pipeline" }, frequency="5m", sync={ "time": { "field": "order_date", "delay": "60s" } }, retention_policy={ "time": { "field": "order_date", "max_age": "30d" } }, ) print(resp)
const response = await client.transform.putTransform({ transform_id: "ecommerce_transform1", source: { index: "kibana_sample_data_ecommerce", query: { term: { "geoip.continent_name": { value: "Asia", }, }, }, }, pivot: { group_by: { customer_id: { terms: { field: "customer_id", missing_bucket: true, }, }, }, aggregations: { max_price: { max: { field: "taxful_total_price", }, }, }, }, description: "Maximum priced ecommerce data by customer_id in Asia", dest: { index: "kibana_sample_data_ecommerce_transform1", pipeline: "add_timestamp_pipeline", }, frequency: "5m", sync: { time: { field: "order_date", delay: "60s", }, }, retention_policy: { time: { field: "order_date", max_age: "30d", }, }, }); console.log(response);
PUT _transform/ecommerce_transform1 { "source": { "index": "kibana_sample_data_ecommerce", "query": { "term": { "geoip.continent_name": { "value": "Asia" } } } }, "pivot": { "group_by": { "customer_id": { "terms": { "field": "customer_id", "missing_bucket": true } } }, "aggregations": { "max_price": { "max": { "field": "taxful_total_price" } } } }, "description": "Maximum priced ecommerce data by customer_id in Asia", "dest": { "index": "kibana_sample_data_ecommerce_transform1", "pipeline": "add_timestamp_pipeline" }, "frequency": "5m", "sync": { "time": { "field": "order_date", "delay": "60s" } }, "retention_policy": { "time": { "field": "order_date", "max_age": "30d" } } }
创建转换时,您会收到以下结果
{ "acknowledged" : true }
以下转换使用 latest
方法
resp = client.transform.put_transform( transform_id="ecommerce_transform2", source={ "index": "kibana_sample_data_ecommerce" }, latest={ "unique_key": [ "customer_id" ], "sort": "order_date" }, description="Latest order for each customer", dest={ "index": "kibana_sample_data_ecommerce_transform2" }, frequency="5m", sync={ "time": { "field": "order_date", "delay": "60s" } }, ) print(resp)
const response = await client.transform.putTransform({ transform_id: "ecommerce_transform2", source: { index: "kibana_sample_data_ecommerce", }, latest: { unique_key: ["customer_id"], sort: "order_date", }, description: "Latest order for each customer", dest: { index: "kibana_sample_data_ecommerce_transform2", }, frequency: "5m", sync: { time: { field: "order_date", delay: "60s", }, }, }); console.log(response);
PUT _transform/ecommerce_transform2 { "source": { "index": "kibana_sample_data_ecommerce" }, "latest": { "unique_key": ["customer_id"], "sort": "order_date" }, "description": "Latest order for each customer", "dest": { "index": "kibana_sample_data_ecommerce_transform2" }, "frequency": "5m", "sync": { "time": { "field": "order_date", "delay": "60s" } } }