示例
Elastic Stack Serverless
这些示例演示了如何使用转换从数据中获得有用的见解。所有示例都使用其中一个 Kibana 示例数据集。有关更详细的、逐步的示例,请参阅教程:转换电子商务示例数据。
此示例使用电子商务订单示例数据集来查找在假设的网上商店中花费最多的客户。 让我们使用 pivot
类型的转换,以便目标索引包含每个客户的订单数量、订单总价、唯一产品数量、每个订单的平均价格以及订购产品的总数。

API 示例
POST _transform/_preview
{
"source": {
"index": "kibana_sample_data_ecommerce"
},
"dest" : {
"index" : "sample_ecommerce_orders_by_customer"
},
"pivot": {
"group_by": {
"user": { "terms": { "field": "user" }},
"customer_id": { "terms": { "field": "customer_id" }}
},
"aggregations": {
"order_count": { "value_count": { "field": "order_id" }},
"total_order_amt": { "sum": { "field": "taxful_total_price" }},
"avg_amt_per_order": { "avg": { "field": "taxful_total_price" }},
"avg_unique_products_per_order": { "avg": { "field": "total_unique_products" }},
"total_unique_products": { "cardinality": { "field": "products.product_id" }}
}
}
}
- 转换的目标索引。它被
_preview
忽略。 - 选择了两个
group_by
字段。 这意味着转换包含每个user
和customer_id
组合的唯一行。 在此数据集中,这两个字段都是唯一的。 通过在转换中包含两者,可以为最终结果提供更多上下文。
在上面的示例中,使用了简化的 JSON 格式,以便更容易阅读 pivot 对象。
预览转换 API 使您可以提前查看转换的布局,其中填充了一些示例值。例如
{
"preview" : [
{
"total_order_amt" : 3946.9765625,
"order_count" : 59.0,
"total_unique_products" : 116.0,
"avg_unique_products_per_order" : 2.0,
"customer_id" : "10",
"user" : "recip",
"avg_amt_per_order" : 66.89790783898304
},
...
]
}
此转换可以更轻松地回答以下问题:
- 哪些客户花费最多?
- 哪些客户每次订单花费最多?
- 哪些客户订购最频繁?
- 哪些客户订购的不同产品数量最少?
可以使用聚合单独回答这些问题,但是,转换允许我们将此数据持久化为以客户为中心的索引。 这使我们能够大规模分析数据,并提供更大的灵活性,可以从以客户为中心的角度探索和导航数据。 在某些情况下,它甚至可以使创建可视化效果更加简单。
此示例使用 Flights 示例数据集来找出哪个航空公司的延误最多。 首先,过滤源数据,通过使用查询过滤器来排除所有取消的航班。 然后转换数据以包含不同数量的航班、延误分钟数的总和以及航空公司飞行分钟数的总和。 最后,使用 bucket_script
来确定实际延误的飞行时间百分比。
POST _transform/_preview
{
"source": {
"index": "kibana_sample_data_flights",
"query": {
"bool": {
"filter": [
{ "term": { "Cancelled": false } }
]
}
}
},
"dest" : {
"index" : "sample_flight_delays_by_carrier"
},
"pivot": {
"group_by": {
"carrier": { "terms": { "field": "Carrier" }}
},
"aggregations": {
"flights_count": { "value_count": { "field": "FlightNum" }},
"delay_mins_total": { "sum": { "field": "FlightDelayMin" }},
"flight_mins_total": { "sum": { "field": "FlightTimeMin" }},
"delay_time_percentage": {
"bucket_script": {
"buckets_path": {
"delay_time": "delay_mins_total.value",
"flight_time": "flight_mins_total.value"
},
"script": "(params.delay_time / params.flight_time) * 100"
}
}
}
}
}
- 过滤源数据以仅选择未取消的航班。
- 转换的目标索引。它被
_preview
忽略。 - 数据按
Carrier
字段分组,该字段包含航空公司名称。 - 此
bucket_script
对聚合返回的结果执行计算。 在此特定示例中,它计算旅行时间中被延误占用的百分比。
预览向您显示新索引将包含每个运营商的此类数据
{
"preview" : [
{
"carrier" : "ES-Air",
"flights_count" : 2802.0,
"flight_mins_total" : 1436927.5130677223,
"delay_time_percentage" : 9.335543983955839,
"delay_mins_total" : 134145.0
},
...
]
}
此转换可以更轻松地回答以下问题:
- 哪个航空公司的延误占飞行时间的百分比最高?
此数据是虚构的,并不反映任何特色目的地或出发机场的实际延误或飞行统计数据。
此示例使用 Web 日志示例数据集来识别可疑的客户端 IP。 它转换数据,以便新索引包含每个客户端 IP 的字节总和以及不同 URL、代理、按位置的传入请求和地理目标的数量。 它还使用过滤器聚合来计算每个客户端 IP 收到的特定类型的 HTTP 响应。 最终,以下示例将 Web 日志数据转换为以实体为中心的索引,其中实体为 clientip
。
PUT _transform/suspicious_client_ips
{
"source": {
"index": "kibana_sample_data_logs"
},
"dest" : {
"index" : "sample_weblogs_by_clientip"
},
"sync" : {
"time": {
"field": "timestamp",
"delay": "60s"
}
},
"pivot": {
"group_by": {
"clientip": { "terms": { "field": "clientip" } }
},
"aggregations": {
"url_dc": { "cardinality": { "field": "url.keyword" }},
"bytes_sum": { "sum": { "field": "bytes" }},
"geo.src_dc": { "cardinality": { "field": "geo.src" }},
"agent_dc": { "cardinality": { "field": "agent.keyword" }},
"geo.dest_dc": { "cardinality": { "field": "geo.dest" }},
"responses.total": { "value_count": { "field": "timestamp" }},
"success" : {
"filter": {
"term": { "response" : "200"}}
},
"error404" : {
"filter": {
"term": { "response" : "404"}}
},
"error5xx" : {
"filter": {
"range": { "response" : { "gte": 500, "lt": 600}}}
},
"timestamp.min": { "min": { "field": "timestamp" }},
"timestamp.max": { "max": { "field": "timestamp" }},
"timestamp.duration_ms": {
"bucket_script": {
"buckets_path": {
"min_time": "timestamp.min.value",
"max_time": "timestamp.max.value"
},
"script": "(params.max_time - params.min_time)"
}
}
}
}
}
- 转换的目标索引。
- 配置转换以持续运行。 它使用
timestamp
字段来同步源索引和目标索引。 最坏情况下的摄取延迟为 60 秒。 - 数据按
clientip
字段分组。 - 过滤器聚合,用于计算
response
字段中成功 (200
) 响应的发生次数。 以下两个聚合 (error404
和error5xx
) 通过错误代码来计数错误响应,从而匹配确切的值或一系列响应代码。 - 此
bucket_script
根据聚合的结果计算clientip
访问的持续时间。
创建转换后,必须启动它
POST _transform/suspicious_client_ips/_start
此后不久,第一个结果应在目标索引中可用
GET sample_weblogs_by_clientip/_search
搜索结果显示每个客户端 IP 的此类数据
"hits" : [
{
"_index" : "sample_weblogs_by_clientip",
"_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
"_score" : 1.0,
"_source" : {
"geo" : {
"src_dc" : 2.0,
"dest_dc" : 2.0
},
"success" : 2,
"error404" : 0,
"error503" : 0,
"clientip" : "0.72.176.46",
"agent_dc" : 2.0,
"bytes_sum" : 4422.0,
"responses" : {
"total" : 2.0
},
"url_dc" : 2.0,
"timestamp" : {
"duration_ms" : 5.2191698E8,
"min" : "2020-03-16T07:51:57.333Z",
"max" : "2020-03-22T08:50:34.313Z"
}
}
}
]
与其他 Kibana 示例数据集一样,Web 日志示例数据集包含相对于您安装它时的时间戳,包括未来的时间戳。 持续转换将拾取过去的数据点。 如果您在一段时间前安装了 Web 日志示例数据集,则可以卸载并重新安装它,并且时间戳将会更改。
此转换可以更轻松地回答以下问题:
- 哪些客户端 IP 正在传输最多的数据量?
- 哪些客户端 IP 正在与大量不同的 URL 交互?
- 哪些客户端 IP 具有高错误率?
- 哪些客户端 IP 正在与大量目标国家/地区交互?
此示例使用 Web 日志示例数据集来查找来自 IP 地址的最后一个日志。 让我们在连续模式下使用 latest
类型的转换。 它将每个唯一键的最新文档从源索引复制到目标索引,并在新数据进入源索引时更新目标索引。
选择 clientip
字段作为唯一键; 数据按此字段分组。 选择 timestamp
作为按时间顺序对数据进行排序的日期字段。 对于连续模式,请指定一个日期字段,该字段用于识别新文档,以及检查源索引中更改的间隔。

让我们假设我们只对保留最近出现在日志中的 IP 地址的文档感兴趣。 您可以定义保留策略,并指定用于计算文档年龄的日期字段。 此示例使用与用于对数据进行排序的日期字段相同的日期字段。 然后设置文档的最长年龄; 比您设置的值旧的文档将从目标索引中删除。

此转换创建目标索引,其中包含每个客户端 IP 的最新登录日期。 由于转换在连续模式下运行,因此目标索引将在新数据进入源索引时进行更新。 最后,由于应用了保留策略,因此将从目标索引中删除每个超过 30 天的文档。
API 示例
PUT _transform/last-log-from-clientip
{
"source": {
"index": [
"kibana_sample_data_logs"
]
},
"latest": {
"unique_key": [
"clientip"
],
"sort": "timestamp"
},
"frequency": "1m",
"dest": {
"index": "last-log-from-clientip"
},
"sync": {
"time": {
"field": "timestamp",
"delay": "60s"
}
},
"retention_policy": {
"time": {
"field": "timestamp",
"max_age": "30d"
}
},
"settings": {
"max_page_search_size": 500
}
}
- 指定用于对数据进行分组的字段。
- 指定用于对数据进行排序的日期字段。
- 设置转换检查源索引中更改的间隔。
- 包含用于同步源索引和目标索引的时间字段和延迟设置。
- 指定转换的保留策略。 将从目标索引中删除旧于配置值的文档。
创建转换后,启动它
POST _transform/last-log-from-clientip/_start
转换处理数据后,搜索目标索引
GET last-log-from-clientip/_search
搜索结果显示每个客户端 IP 的此类数据
{
"_index" : "last-log-from-clientip",
"_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
"_score" : 1.0,
"_source" : {
"referer" : "http://twitter.com/error/don-lind",
"request" : "/elasticsearch",
"agent" : "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)",
"extension" : "",
"memory" : null,
"ip" : "0.72.176.46",
"index" : "kibana_sample_data_logs",
"message" : "0.72.176.46 - - [2018-09-18T06:31:00.572Z] \"GET /elasticsearch HTTP/1.1\" 200 7065 \"-\" \"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)\"",
"url" : "https://elastic.ac.cn/downloads/elasticsearch",
"tags" : [
"success",
"info"
],
"geo" : {
"srcdest" : "IN:PH",
"src" : "IN",
"coordinates" : {
"lon" : -124.1127917,
"lat" : 40.80338889
},
"dest" : "PH"
},
"utc_time" : "2021-05-04T06:31:00.572Z",
"bytes" : 7065,
"machine" : {
"os" : "ios",
"ram" : 12884901888
},
"response" : 200,
"clientip" : "0.72.176.46",
"host" : "www.elastic.co",
"event" : {
"dataset" : "sample_web_logs"
},
"phpmemory" : null,
"timestamp" : "2021-05-04T06:31:00.572Z"
}
}
此转换可以更轻松地回答以下问题:
- 与特定 IP 地址关联的最新日志事件是什么?
此示例使用 Web 日志示例数据集来查找每个小时向服务器发送最多字节的客户端 IP。 此示例使用带有 top_metrics
聚合的 pivot
转换。
通过 date histogram 按时间字段分组数据,间隔为一个小时。 使用 bytes
字段上的 max aggregation 来获取发送到服务器的最大数据量。 如果没有 max
聚合,API 调用仍然返回发送最多字节的客户端 IP,但是,不会返回它发送的字节数。 在 top_metrics
属性中,指定 clientip
和 geo.src
,然后按降序按 bytes
字段对它们进行排序。 转换将返回发送最大数据量的客户端 IP 和相应位置的 2 字母 ISO 代码。
POST _transform/_preview
{
"source": {
"index": "kibana_sample_data_logs"
},
"pivot": {
"group_by": {
"timestamp": {
"date_histogram": {
"field": "timestamp",
"fixed_interval": "1h"
}
}
},
"aggregations": {
"bytes.max": {
"max": {
"field": "bytes"
}
},
"top": {
"top_metrics": {
"metrics": [
{
"field": "clientip"
},
{
"field": "geo.src"
}
],
"sort": {
"bytes": "desc"
}
}
}
}
}
}
- 数据按时间字段的 date histogram 分组,间隔为一个小时。
- 计算
bytes
字段的最大值。 - 指定要返回的顶部文档的字段(
clientip
和geo.src
)以及排序方法(具有最高bytes
值的文档)。
上面的 API 调用返回类似于以下的响应
{
"preview" : [
{
"top" : {
"clientip" : "223.87.60.27",
"geo.src" : "IN"
},
"bytes" : {
"max" : 6219
},
"timestamp" : "2021-04-25T00:00:00.000Z"
},
{
"top" : {
"clientip" : "99.74.118.237",
"geo.src" : "LK"
},
"bytes" : {
"max" : 14113
},
"timestamp" : "2021-04-25T03:00:00.000Z"
},
{
"top" : {
"clientip" : "218.148.135.12",
"geo.src" : "BR"
},
"bytes" : {
"max" : 4531
},
"timestamp" : "2021-04-25T04:00:00.000Z"
},
...
]
}
此示例使用电子商务示例数据集来创建基于客户 ID 的以实体为中心的索引,并通过使用 top_metrics
聚合来获取客户姓名和电子邮件地址。
按 customer_id
对数据进行分组,然后添加一个 top_metrics
聚合,其中 metrics
是 email
、customer_first_name.keyword
和 customer_last_name.keyword
字段。 按降序按 order_date
对 top_metrics
进行排序。 API 调用如下所示
POST _transform/_preview
{
"source": {
"index": "kibana_sample_data_ecommerce"
},
"pivot": {
"group_by": {
"customer_id": {
"terms": {
"field": "customer_id"
}
}
},
"aggregations": {
"last": {
"top_metrics": {
"metrics": [
{
"field": "email"
},
{
"field": "customer_first_name.keyword"
},
{
"field": "customer_last_name.keyword"
}
],
"sort": {
"order_date": "desc"
}
}
}
}
}
}
- 数据按
customer_id
字段上的terms
聚合分组。 - 指定要按订单日期降序返回的字段(电子邮件和名称字段)。
API 返回的响应类似于以下内容
{
"preview" : [
{
"last" : {
"customer_last_name.keyword" : "Long",
"customer_first_name.keyword" : "Recip",
"email" : "recip@long-family.zzz"
},
"customer_id" : "10"
},
{
"last" : {
"customer_last_name.keyword" : "Jackson",
"customer_first_name.keyword" : "Fitzgerald",
"email" : "fitzgerald@jackson-family.zzz"
},
"customer_id" : "11"
},
{
"last" : {
"customer_last_name.keyword" : "Cross",
"customer_first_name.keyword" : "Brigitte",
"email" : "brigitte@cross-family.zzz"
},
"customer_id" : "12"
},
...
]
}