创建转换 API

编辑

实例化一个转换。

请求

编辑

PUT _transform/<transform_id>

先决条件

编辑

需要以下权限

  • 集群:manage_transformtransform_admin 内置角色授予此权限)
  • 源索引:readview_index_metadata
  • 目标索引:readcreate_indexindex。如果配置了 retention_policy,则还需要 delete 权限。

描述

编辑

此 API 定义了一个转换,它从源索引复制数据、转换数据并将其持久化到以实体为中心的目标索引中。如果您选择对转换使用 pivot 方法,则实体由 pivot 对象中的 group_by 字段集定义。如果您选择使用 latest 方法,则实体由 latest 对象中的 unique_key 字段值定义。

您也可以将目标索引视为二维表格数据结构(称为数据帧)。数据帧中每个文档的 ID 由实体的哈希值生成,因此每个实体都有一行唯一行。有关更多信息,请参阅转换数据

创建转换时,会进行一系列验证以确保其成功。例如,会检查源索引是否存在,并检查目标索引是否不是源索引模式的一部分。您可以使用 defer_validation 参数来跳过这些检查。

在启动转换时,始终会运行延迟验证,特权检查除外。

  • 转换会记住创建它的用户在创建时拥有的角色,并使用相同的角色。如果这些角色在源索引和目标索引上没有所需的权限,则当转换尝试未经授权的操作时会失败。如果您提供辅助授权标头,则会使用这些凭据。
  • 您必须使用 Kibana 或此 API 创建转换。不要使用 Elasticsearch 索引 API 将转换直接添加到任何 .transform-internal* 索引中。如果启用了 Elasticsearch 安全功能,请不要向用户授予任何 .transform-internal* 索引的权限。如果您在 7.5 之前使用过转换,也不要向用户授予任何 .data-frame-internal* 索引的权限。

您必须为转换选择 latest 或 pivot 方法;您不能在单个转换中同时使用两者。

路径参数

编辑
<transform_id>
(必需,字符串)转换的标识符。此标识符可以包含小写字母数字字符(a-z 和 0-9)、连字符和下划线。它具有 64 个字符的限制,并且必须以字母数字字符开头和结尾。

查询参数

编辑
defer_validation
(可选,布尔值)如果为 true,则不会运行可延迟的验证。如果源索引在创建转换后才存在,则可能需要此行为。
timeout
(可选,时间)等待响应的时间段。如果在超时到期之前未收到任何响应,则请求失败并返回错误。默认为 30s

请求正文

编辑
description
(可选,字符串)转换的自由文本描述。
dest

(必需,对象)转换的目标。

dest 的属性
index
(必需,字符串)转换的目标索引

pivot 转换的情况下,目标索引的映射在可能的情况下根据源字段推断出来。如果需要替代映射,请在启动转换之前使用创建索引 API

latest 转换的情况下,永远不会推断映射。如果目标索引的动态映射是不可取的,请在启动转换之前使用创建索引 API

aliases
(可选,对象数组)转换的目标索引应具有的别名。别名是使用转换的存储凭据操作的,这意味着在创建时提供的辅助凭据(如果同时指定了主凭据和辅助凭据)。

无论目标索引是由转换创建还是用户预先创建的,都会将目标索引添加到别名。

+ aliases 的属性

详情
alias
(必需,字符串)别名的名称。
move_on_creation
(可选,布尔值)目标索引是否应为此别名中唯一的索引。如果为 true,则在将目标索引添加到此别名之前,将从此别名中删除所有其他索引。默认为 false
pipeline
(可选,字符串)Ingest管道的唯一标识符。
frequency
(可选,时间单位)当转换持续运行时,检查源索引中更改的间隔。最小值是 1s,最大值是 1h。默认值为 1m
latest

(必需*,对象)latest 方法通过查找每个唯一键的最新文档来转换数据。

latest 的属性
sort
(必需,字符串)指定用于标识最新文档的日期字段。
unique_key
(必需,字符串数组)指定用于对数据进行分组的一个或多个字段的数组。
_meta
(可选,对象)定义可选的转换元数据。
pivot

(必需*,对象)pivot 方法通过聚合和分组数据来转换数据。这些对象定义 group by 字段和用于减少数据的聚合。

pivot 的属性
aggregationsaggs

(必需,对象)定义如何聚合分组数据。当前支持以下聚合

group_by

(必需,对象)定义如何分组数据。每个透视可以定义多个分组。当前支持以下分组

分组属性可以选择具有 missing_bucket 属性。如果为 true,则包含在相应的 group_by 字段中没有值的文档。默认为 false

retention_policy

(可选,对象)定义转换的保留策略。满足定义的条件的数据将从目标索引中删除。

retention_policy 的属性
time

(必需,对象)指定转换使用时间字段来设置保留策略。如果存在保留策略的 time.field 并且包含早于 max.age 的数据,则会删除数据。

time 的属性
field
(必需,字符串)用于计算文档年龄的日期字段。将 time.field 设置为现有的日期字段。
max_age
(必需,时间单位)指定目标索引中文档的最大年龄。早于配置值的文档将从目标索引中删除。
settings

(可选,对象)定义可选的转换设置。

settings 的属性
align_checkpoints
(可选,布尔值)指定是否应优化转换检查点范围以提高性能。当在转换配置中将日期直方图指定为组源时,此类优化可以将检查点范围与日期直方图间隔对齐。这样做的效果是,在目标索引中执行的文档更新更少,从而提高了整体性能。默认值为 true,这意味着如果可能,将优化检查点范围。
dates_as_epoch_millis
(可选,布尔值)定义输出中的日期应写入为 ISO 格式的字符串(默认)还是自 epoch 以来的毫秒数。epoch_millis 一直是 7.11 版本之前创建的转换的默认值。对于兼容的输出,请将其设置为 true。默认值为 false
deduce_mappings
(可选,布尔值)指定是否应从转换配置中推断目标索引映射。默认值为 true,表示如果可能,将推断目标索引映射。
docs_per_second
(可选,浮点数)指定每秒输入文档数量的限制。此设置通过在搜索请求之间添加等待时间来限制转换。默认值为 null,表示禁用限制。
max_page_search_size
(可选,整数)定义每个检查点用于复合聚合的初始页面大小。如果发生断路器异常,则页面大小会动态调整为较低的值。最小值是 10,最大值是 65,536。默认值为 500
num_failure_retries
(可选,整数)定义在将转换任务标记为 failed 之前,可恢复的失败的重试次数。最小值是 0,最大值是 100-1 可用于表示无限。在这种情况下,转换永远不会放弃重试可恢复的失败。默认值是集群级别的设置 num_transform_failure_retries
unattended
(可选,布尔值)如果为 true,则转换在无人值守模式下运行。在无人值守模式下,如果发生错误,转换会无限期重试,这意味着转换永远不会失败。将重试次数设置为无限次以外的值将在验证中失败。默认为 false
source

(必需,对象)转换的数据源。

source 的属性
index

(必需,字符串或数组)转换的源索引。它可以是单个索引、索引模式(例如,"my-index-*")、索引数组(例如,["my-index-000001", "my-index-000002"])或索引模式数组(例如,["my-index-*", "my-other-index-*"])。对于远程索引,请使用语法 "remote_name:index_name"

如果任何索引位于远程集群中,则主节点和至少一个转换节点必须具有 remote_cluster_client 节点角色。

query
(可选,对象)一个查询子句,用于从源索引检索数据子集。请参阅 查询 DSL
runtime_mappings
(可选,对象)转换可以使用的搜索时运行时字段的定义。对于搜索运行时字段,所有数据节点(包括远程节点)必须为 7.12 或更高版本。
sync

(可选,对象)定义转换连续运行所需的属性。

sync 的属性
time

(必需,对象)指定转换使用时间字段来同步源索引和目标索引。

time 的属性
delay
(可选,时间单位)当前时间和最新输入数据时间之间的时间延迟。默认值为 60s
field

(必需,字符串)用于标识源中新文档的日期字段。

强烈建议使用包含 摄取时间戳的字段。如果使用其他字段,则可能需要设置 delay,使其考虑数据传输延迟。

示例

编辑

以下转换使用 pivot 方法

resp = client.transform.put_transform(
    transform_id="ecommerce_transform1",
    source={
        "index": "kibana_sample_data_ecommerce",
        "query": {
            "term": {
                "geoip.continent_name": {
                    "value": "Asia"
                }
            }
        }
    },
    pivot={
        "group_by": {
            "customer_id": {
                "terms": {
                    "field": "customer_id",
                    "missing_bucket": True
                }
            }
        },
        "aggregations": {
            "max_price": {
                "max": {
                    "field": "taxful_total_price"
                }
            }
        }
    },
    description="Maximum priced ecommerce data by customer_id in Asia",
    dest={
        "index": "kibana_sample_data_ecommerce_transform1",
        "pipeline": "add_timestamp_pipeline"
    },
    frequency="5m",
    sync={
        "time": {
            "field": "order_date",
            "delay": "60s"
        }
    },
    retention_policy={
        "time": {
            "field": "order_date",
            "max_age": "30d"
        }
    },
)
print(resp)
const response = await client.transform.putTransform({
  transform_id: "ecommerce_transform1",
  source: {
    index: "kibana_sample_data_ecommerce",
    query: {
      term: {
        "geoip.continent_name": {
          value: "Asia",
        },
      },
    },
  },
  pivot: {
    group_by: {
      customer_id: {
        terms: {
          field: "customer_id",
          missing_bucket: true,
        },
      },
    },
    aggregations: {
      max_price: {
        max: {
          field: "taxful_total_price",
        },
      },
    },
  },
  description: "Maximum priced ecommerce data by customer_id in Asia",
  dest: {
    index: "kibana_sample_data_ecommerce_transform1",
    pipeline: "add_timestamp_pipeline",
  },
  frequency: "5m",
  sync: {
    time: {
      field: "order_date",
      delay: "60s",
    },
  },
  retention_policy: {
    time: {
      field: "order_date",
      max_age: "30d",
    },
  },
});
console.log(response);
PUT _transform/ecommerce_transform1
{
  "source": {
    "index": "kibana_sample_data_ecommerce",
    "query": {
      "term": {
        "geoip.continent_name": {
          "value": "Asia"
        }
      }
    }
  },
  "pivot": {
    "group_by": {
      "customer_id": {
        "terms": {
          "field": "customer_id",
          "missing_bucket": true
        }
      }
    },
    "aggregations": {
      "max_price": {
        "max": {
          "field": "taxful_total_price"
        }
      }
    }
  },
  "description": "Maximum priced ecommerce data by customer_id in Asia",
  "dest": {
    "index": "kibana_sample_data_ecommerce_transform1",
    "pipeline": "add_timestamp_pipeline"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "order_date",
      "delay": "60s"
    }
  },
  "retention_policy": {
    "time": {
      "field": "order_date",
      "max_age": "30d"
    }
  }
}

创建转换时,您会收到以下结果

{
  "acknowledged" : true
}

以下转换使用 latest 方法

resp = client.transform.put_transform(
    transform_id="ecommerce_transform2",
    source={
        "index": "kibana_sample_data_ecommerce"
    },
    latest={
        "unique_key": [
            "customer_id"
        ],
        "sort": "order_date"
    },
    description="Latest order for each customer",
    dest={
        "index": "kibana_sample_data_ecommerce_transform2"
    },
    frequency="5m",
    sync={
        "time": {
            "field": "order_date",
            "delay": "60s"
        }
    },
)
print(resp)
const response = await client.transform.putTransform({
  transform_id: "ecommerce_transform2",
  source: {
    index: "kibana_sample_data_ecommerce",
  },
  latest: {
    unique_key: ["customer_id"],
    sort: "order_date",
  },
  description: "Latest order for each customer",
  dest: {
    index: "kibana_sample_data_ecommerce_transform2",
  },
  frequency: "5m",
  sync: {
    time: {
      field: "order_date",
      delay: "60s",
    },
  },
});
console.log(response);
PUT _transform/ecommerce_transform2
{
  "source": {
    "index": "kibana_sample_data_ecommerce"
  },
  "latest": {
    "unique_key": ["customer_id"],
    "sort": "order_date"
  },
  "description": "Latest order for each customer",
  "dest": {
    "index": "kibana_sample_data_ecommerce_transform2"
  },
  "frequency": "5m",
  "sync": {
    "time": {
      "field": "order_date",
      "delay": "60s"
    }
  }
}