创建转换 API编辑

实例化转换。

请求编辑

PUT _transform/<transform_id>

先决条件编辑

需要以下权限

  • 集群:manage_transformtransform_admin 内置角色授予此权限)
  • 源索引:readview_index_metadata
  • 目标索引:readcreate_indexindex。如果配置了 retention_policy,则还需要 delete 权限。

描述编辑

此 API 定义了一个转换,它从源索引复制数据,对其进行转换,并将其持久化到以实体为中心的目的地索引中。如果您选择将枢轴方法用于您的转换,则实体由 pivot 对象中的 group_by 字段集定义。如果您选择使用最新方法,则实体由 latest 对象中的 unique_key 字段值定义。

您也可以将目标索引视为二维表格数据结构(称为数据帧)。数据帧中每个文档的 ID 是根据实体的哈希值生成的,因此每个实体都有唯一的行。有关更多信息,请参阅 转换数据

创建转换时,会进行一系列验证以确保其成功。例如,会检查源索引是否存在,并检查目标索引是否不是源索引模式的一部分。您可以使用 defer_validation 参数跳过这些检查。

除了权限检查外,延迟验证始终在转换启动时运行。

  • 转换会记住创建它时用户的角色,并使用相同的角色。如果这些角色对源索引和目标索引没有所需的权限,则转换在尝试执行未经授权的操作时会失败。如果您提供 辅助授权标头,则会使用这些凭据。
  • 您必须使用 Kibana 或此 API 来创建转换。不要使用 Elasticsearch 索引 API 将转换直接添加到任何 .transform-internal* 索引中。如果启用了 Elasticsearch 安全功能,请不要向用户授予 .transform-internal* 索引的任何权限。如果您在 7.5 之前使用过转换,也不要向用户授予 .data-frame-internal* 索引的任何权限。

您必须为您的转换选择最新方法或枢轴方法;您不能在单个转换中同时使用两者。

路径参数编辑

<transform_id>
(必需,字符串) 转换的标识符。此标识符可以包含小写字母数字字符 (a-z 和 0-9)、连字符和下划线。它有 64 个字符的限制,并且必须以字母数字字符开头和结尾。

查询参数编辑

defer_validation
(可选,布尔值) 当 true 时,不会运行可延迟验证。如果源索引在创建转换后才存在,则可能需要这种行为。
timeout
(可选,时间) 等待响应的期限。如果在超时时间到期之前没有收到响应,则请求失败并返回错误。默认为 30s

请求正文编辑

description
(可选,字符串) 转换的自由文本描述。
dest

(必需,对象) 转换的目标。

dest 的属性
index
(必需,字符串) 转换的目标索引

pivot 转换的情况下,目标索引的映射会在可能的情况下根据源字段推断。如果需要备用映射,请在启动转换之前使用 创建索引 API

latest 转换的情况下,永远不会推断映射。如果目标索引的动态映射不可取,请在启动转换之前使用 创建索引 API

aliases
(可选,对象数组) 转换的目标索引应该具有的别名。别名使用转换的存储凭据进行操作,这意味着在创建时提供的辅助凭据(如果同时指定了主凭据和辅助凭据)。

目标索引将添加到别名中,无论目标索引是通过转换创建还是由用户预先创建。

+ aliases 的属性

详细信息
alias
(必需,字符串) 别名的名称。
move_on_creation
(可选,布尔值) 目标索引是否应该是此别名中的唯一索引。如果 true,则在将目标索引添加到此别名之前,将从该别名中删除所有其他索引。默认为 false
pipeline
(可选,字符串) 摄取管道 的唯一标识符。
frequency
(可选,时间单位) 转换连续运行时,检查源索引中更改的间隔。最小值为 1s,最大值为 1h。默认值为 1m
latest

(必需*,对象) latest 方法通过查找每个唯一键的最新文档来转换数据。

latest 的属性
sort
(必需,字符串) 指定用于识别最新文档的日期字段。
unique_key
(必需,字符串数组) 指定用于对数据进行分组的一个或多个字段的数组。
_meta
(可选,对象) 定义可选的转换元数据。
pivot

(必需*,对象) pivot 方法通过聚合和分组数据来转换数据。这些对象定义了 group by 字段和用于减少数据的聚合。

pivot 的属性
aggregationsaggs

(必需,对象) 定义如何聚合分组数据。目前支持以下聚合

group_by

(必需,对象) 定义如何对数据进行分组。每个枢轴可以定义多个分组。目前支持以下分组

分组属性可以选择具有 missing_bucket 属性。如果它是 true,则包含在相应 group_by 字段中没有值的文档。默认为 false

retention_policy

(可选,对象) 为转换定义保留策略。满足定义条件的数据将从目标索引中删除。

retention_policy 的属性
time

(必需,对象) 指定转换使用时间字段来设置保留策略。如果保留策略的 time.field 存在且包含比 max.age 更旧的数据,则会删除数据。

time 的属性
field
(必需,字符串) 用于计算文档年龄的日期字段。将 time.field 设置为现有的日期字段。
max_age
(必需,时间单位) 指定目标索引中文档的最大年龄。比配置值更旧的文档将从目标索引中删除。
settings

(可选,对象) 定义可选的转换设置。

settings 的属性
align_checkpoints
(可选,布尔值) 指定转换检查点范围是否应针对性能进行优化。这种优化可以将检查点范围与日期直方图间隔对齐,前提是日期直方图被指定为转换配置中的组源。作为效果,目标索引中的文档更新将减少,从而提高整体性能。默认值为 true,这意味着如果可能,将优化检查点范围。
dates_as_epoch_millis
(可选,布尔值) 定义输出中的日期是否应以 ISO 格式字符串(默认)或自纪元以来的毫秒数编写。 epoch_millis 是在版本 7.11 之前创建的转换的默认值。 为了兼容输出,请将其设置为 true。 默认值为 false
deduce_mappings
(可选,布尔值) 指定转换是否应从转换配置中推断目标索引映射。 默认值为 true,这意味着如果可能,将推断目标索引映射。
docs_per_second
(可选,浮点数) 指定每秒输入文档数量的限制。 此设置通过在搜索请求之间添加等待时间来限制转换。 默认值为 null,这将禁用限制。
max_page_search_size
(可选,整数) 定义用于每个检查点的复合聚合的初始页面大小。 如果发生断路器异常,页面大小将动态调整为较低的值。 最小值为 10,最大值为 65,536。 默认值为 500
num_failure_retries
(可选,整数) 定义在转换任务标记为 failed 之前可恢复故障的重试次数。 最小值为 0,最大值为 100-1 可用于表示无穷大。 在这种情况下,转换永远不会放弃重试可恢复故障。 默认值为集群级设置 num_transform_failure_retries
unattended
(可选,布尔值) 如果为 true,则转换在无人值守模式下运行。 在无人值守模式下,转换在发生错误时无限期地重试,这意味着转换永远不会失败。 将重试次数设置为除无限以外的任何值都会在验证中失败。 默认值为 false
source

(必需,对象) 转换数据的来源。

source 的属性
index

(必需,字符串或数组) 转换的源索引。 它可以是单个索引、索引模式(例如 "my-index-*")、索引数组(例如 ["my-index-000001", "my-index-000002"])或索引模式数组(例如 ["my-index-*", "my-other-index-*"])。 对于远程索引,请使用语法 "remote_name:index_name"

如果任何索引位于远程集群中,则主节点和至少一个转换节点必须具有 remote_cluster_client 节点角色。

query
(可选,对象) 从源索引检索数据子集的查询子句。 请参阅 Query DSL
runtime_mappings
(可选,对象) 转换可以使用搜索时运行时字段的定义。 对于搜索运行时字段,所有数据节点(包括远程节点)必须为 7.12 或更高版本。
sync

(可选,对象) 定义转换持续运行所需的属性。

sync 的属性
time

(必需,对象) 指定转换使用时间字段来同步源索引和目标索引。

time 的属性
delay
(可选,时间单位) 当前时间与最新输入数据时间之间的时间延迟。 默认值为 60s
field

(必需,字符串) 用于识别源中新文档的日期字段。

强烈建议使用包含 摄取时间戳 的字段。 如果使用其他字段,则可能需要设置 delay 以便它考虑数据传输延迟。

示例编辑

以下转换使用 pivot 方法

PUT _transform/ecommerce_transform1
{
  "source": {
    "index": "kibana_sample_data_ecommerce",
    "query": {
      "term": {
        "geoip.continent_name": {
          "value": "Asia"
        }
      }
    }
  },
  "pivot": {
    "group_by": {
      "customer_id": {
        "terms": {
          "field": "customer_id",
          "missing_bucket": true
        }
      }
    },
    "aggregations": {
      "max_price": {
        "max": {
          "field": "taxful_total_price"
        }
      }
    }
  },
  "description": "Maximum priced ecommerce data by customer_id in Asia",
  "dest": {
    "index": "kibana_sample_data_ecommerce_transform1",
    "pipeline": "add_timestamp_pipeline"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "order_date",
      "delay": "60s"
    }
  },
  "retention_policy": {
    "time": {
      "field": "order_date",
      "max_age": "30d"
    }
  }
}

创建转换时,您将收到以下结果

{
  "acknowledged" : true
}

以下转换使用 latest 方法

PUT _transform/ecommerce_transform2
{
  "source": {
    "index": "kibana_sample_data_ecommerce"
  },
  "latest": {
    "unique_key": ["customer_id"],
    "sort": "order_date"
  },
  "description": "Latest order for each customer",
  "dest": {
    "index": "kibana_sample_data_ecommerce_transform2"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "order_date",
      "delay": "60s"
    }
  }
}