转换示例编辑

这些示例演示了如何使用 transforms 从您的数据中得出有用的见解。所有示例都使用一个 Kibana 示例数据集。有关更详细的分步示例,请参阅 教程:转换电子商务示例数据

查找您的最佳客户编辑

此示例使用电子商务订单示例数据集来查找在假设的网上商店中花费最多的客户。让我们使用 pivot 类型的 transform,以便目标索引包含每个客户的订单数量、订单总价、唯一产品数量、每笔订单的平均价格以及订购产品的总数量。

Finding your best customers with transforms in Kibana

或者,您可以使用 预览 transform创建 transform API

API 示例
POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_ecommerce"
  },
  "dest" : { 
    "index" : "sample_ecommerce_orders_by_customer"
  },
  "pivot": {
    "group_by": { 
      "user": { "terms": { "field": "user" }},
      "customer_id": { "terms": { "field": "customer_id" }}
    },
    "aggregations": {
      "order_count": { "value_count": { "field": "order_id" }},
      "total_order_amt": { "sum": { "field": "taxful_total_price" }},
      "avg_amt_per_order": { "avg": { "field": "taxful_total_price" }},
      "avg_unique_products_per_order": { "avg": { "field": "total_unique_products" }},
      "total_unique_products": { "cardinality": { "field": "products.product_id" }}
    }
  }
}

transform 的目标索引。它被 _preview 忽略。

选择了两个 group_by 字段。这意味着 transform 每个 usercustomer_id 组合包含一个唯一的行。在此数据集中,这两个字段都是唯一的。通过将它们都包含在 transform 中,它为最终结果提供了更多上下文。

在上面的示例中,使用压缩的 JSON 格式以提高 pivot 对象的可读性。

预览 transforms API 使您能够提前查看 transform 的布局,并填充一些示例值。例如

{
  "preview" : [
    {
      "total_order_amt" : 3946.9765625,
      "order_count" : 59.0,
      "total_unique_products" : 116.0,
      "avg_unique_products_per_order" : 2.0,
      "customer_id" : "10",
      "user" : "recip",
      "avg_amt_per_order" : 66.89790783898304
    },
    ...
    ]
  }

此 transform 使回答以下问题变得更容易

  • 哪些客户花费最多?
  • 哪些客户每笔订单花费最多?
  • 哪些客户最常订购?
  • 哪些客户订购的不同产品数量最少?

可以使用聚合来回答这些问题,但是 transforms 允许我们将这些数据持久化为以客户为中心的索引。这使我们能够大规模分析数据,并提供更多灵活性来从以客户为中心的视角探索和浏览数据。在某些情况下,它甚至可以使创建可视化变得更加简单。

查找延误最多的航空公司编辑

此示例使用航班示例数据集来找出哪家航空公司延误最多。首先,使用查询过滤器过滤源数据,以便它排除所有取消的航班。然后转换数据,使其包含每个航空公司的航班数量、延误分钟数之和以及飞行分钟数之和。最后,使用 bucket_script 来确定实际延误的飞行时间百分比。

POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_flights",
    "query": { 
      "bool": {
        "filter": [
          { "term":  { "Cancelled": false } }
        ]
      }
    }
  },
  "dest" : { 
    "index" : "sample_flight_delays_by_carrier"
  },
  "pivot": {
    "group_by": { 
      "carrier": { "terms": { "field": "Carrier" }}
    },
    "aggregations": {
      "flights_count": { "value_count": { "field": "FlightNum" }},
      "delay_mins_total": { "sum": { "field": "FlightDelayMin" }},
      "flight_mins_total": { "sum": { "field": "FlightTimeMin" }},
      "delay_time_percentage": { 
        "bucket_script": {
          "buckets_path": {
            "delay_time": "delay_mins_total.value",
            "flight_time": "flight_mins_total.value"
          },
          "script": "(params.delay_time / params.flight_time) * 100"
        }
      }
    }
  }
}

过滤源数据以仅选择未取消的航班。

transform 的目标索引。它被 _preview 忽略。

数据按包含航空公司名称的 Carrier 字段分组。

bucket_script 对聚合返回的结果执行计算。在此特定示例中,它计算延误占旅行时间的百分比。

预览显示您新的索引将包含类似于此的数据,适用于每家航空公司

{
  "preview" : [
    {
      "carrier" : "ES-Air",
      "flights_count" : 2802.0,
      "flight_mins_total" : 1436927.5130677223,
      "delay_time_percentage" : 9.335543983955839,
      "delay_mins_total" : 134145.0
    },
    ...
  ]
}

此 transform 使回答以下问题变得更容易

  • 哪家航空公司延误占飞行时间的百分比最高?

此数据是虚构的,不反映任何特色目的地或始发机场的实际延误或飞行统计数据。

查找可疑的客户端 IP编辑

此示例使用 Web 日志示例数据集来识别可疑的客户端 IP。它转换数据,以便新索引包含每个客户端 IP 的字节数之和、唯一 URL 数量、代理、按位置的传入请求以及地理目的地。它还使用过滤器聚合来统计每个客户端 IP 收到的特定类型的 HTTP 响应。最终,以下示例将 Web 日志数据转换为以实体为中心的索引,其中实体是 clientip

PUT _transform/suspicious_client_ips
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "dest" : { 
    "index" : "sample_weblogs_by_clientip"
  },
  "sync" : { 
    "time": {
      "field": "timestamp",
      "delay": "60s"
    }
  },
  "pivot": {
    "group_by": {  
      "clientip": { "terms": { "field": "clientip" } }
      },
    "aggregations": {
      "url_dc": { "cardinality": { "field": "url.keyword" }},
      "bytes_sum": { "sum": { "field": "bytes" }},
      "geo.src_dc": { "cardinality": { "field": "geo.src" }},
      "agent_dc": { "cardinality": { "field": "agent.keyword" }},
      "geo.dest_dc": { "cardinality": { "field": "geo.dest" }},
      "responses.total": { "value_count": { "field": "timestamp" }},
      "success" : { 
         "filter": {
            "term": { "response" : "200"}}
        },
      "error404" : {
         "filter": {
            "term": { "response" : "404"}}
        },
      "error5xx" : {
         "filter": {
            "range": { "response" : { "gte": 500, "lt": 600}}}
        },
      "timestamp.min": { "min": { "field": "timestamp" }},
      "timestamp.max": { "max": { "field": "timestamp" }},
      "timestamp.duration_ms": { 
        "bucket_script": {
          "buckets_path": {
            "min_time": "timestamp.min.value",
            "max_time": "timestamp.max.value"
          },
          "script": "(params.max_time - params.min_time)"
        }
      }
    }
  }
}

transform 的目标索引。

配置 transform 以连续运行。它使用 timestamp 字段来同步源索引和目标索引。最坏情况下的摄取延迟为 60 秒。

数据按 clientip 字段分组。

过滤器聚合,统计 response 字段中成功 (200) 响应的出现次数。以下两个聚合 (error404error5xx) 按错误代码统计错误响应,匹配确切值或响应代码范围。

bucket_script 根据聚合的结果计算 clientip 访问的持续时间。

创建 transform 后,您必须启动它

response = client.transform.start_transform(
  transform_id: 'suspicious_client_ips'
)
puts response
POST _transform/suspicious_client_ips/_start

此后不久,第一个结果应在目标索引中可用

response = client.search(
  index: 'sample_weblogs_by_clientip'
)
puts response
GET sample_weblogs_by_clientip/_search

搜索结果显示您类似于此的数据,适用于每个客户端 IP

    "hits" : [
      {
        "_index" : "sample_weblogs_by_clientip",
        "_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "geo" : {
            "src_dc" : 2.0,
            "dest_dc" : 2.0
          },
          "success" : 2,
          "error404" : 0,
          "error503" : 0,
          "clientip" : "0.72.176.46",
          "agent_dc" : 2.0,
          "bytes_sum" : 4422.0,
          "responses" : {
            "total" : 2.0
          },
          "url_dc" : 2.0,
          "timestamp" : {
            "duration_ms" : 5.2191698E8,
            "min" : "2020-03-16T07:51:57.333Z",
            "max" : "2020-03-22T08:50:34.313Z"
          }
        }
      }
    ]

与其他 Kibana 示例数据集一样,Web 日志示例数据集包含相对于您安装它的时间戳,包括将来的时间戳。连续 transform 将在这些数据点过去后将其拾取。如果您在一段时间前安装了 Web 日志示例数据集,您可以卸载并重新安装它,时间戳将发生变化。

此 transform 使回答以下问题变得更容易

  • 哪些客户端 IP 传输了最多的数据量?
  • 哪些客户端 IP 与大量不同的 URL 交互?
  • 哪些客户端 IP 具有较高的错误率?
  • 哪些客户端 IP 与大量目标国家/地区交互?

查找每个 IP 地址的最后一个日志事件编辑

此示例使用 Web 日志示例数据集来查找来自 IP 地址的最后一个日志。让我们在连续模式下使用 latest 类型的 transform。它将每个唯一键的最新文档从源索引复制到目标索引,并在新数据进入源索引时更新目标索引。

选择 clientip 字段作为唯一键;数据按此字段分组。选择 timestamp 作为按时间顺序排序数据的日期字段。对于连续模式,请指定一个用于识别新文档的日期字段,以及在源索引中检查更改的间隔。

Finding the last log event for each IP address with transforms in Kibana

假设我们只对最近出现在日志中的 IP 地址保留文档。您可以定义一个保留策略,并指定一个用于计算文档年龄的日期字段。此示例使用与用于排序数据的日期字段相同的日期字段。然后设置文档的最大年龄;比您设置的值更旧的文档将从目标索引中删除。

Defining retention policy for transforms in Kibana

此 transform 创建目标索引,其中包含每个客户端 IP 的最新登录日期。由于 transform 在连续模式下运行,因此目标索引将在新数据进入源索引时更新。最后,由于应用了保留策略,因此所有比 30 天更旧的文档都将从目标索引中删除。

API 示例
PUT _transform/last-log-from-clientip
{
  "source": {
    "index": [
      "kibana_sample_data_logs"
    ]
  },
  "latest": {
    "unique_key": [ 
      "clientip"
    ],
    "sort": "timestamp" 
  },
  "frequency": "1m", 
  "dest": {
    "index": "last-log-from-clientip"
  },
  "sync": { 
    "time": {
      "field": "timestamp",
      "delay": "60s"
    }
  },
  "retention_policy": { 
    "time": {
      "field": "timestamp",
      "max_age": "30d"
    }
  },
  "settings": {
    "max_page_search_size": 500
  }
}

指定用于对数据进行分组的字段。

指定用于排序数据的日期字段。

设置 transform 检查源索引中更改的间隔。

包含用于同步源索引和目标索引的时间字段和延迟设置。

指定 transform 的保留策略。比配置值更旧的文档将从目标索引中删除。

创建 transform 后,启动它

response = client.transform.start_transform(
  transform_id: 'last-log-from-clientip'
)
puts response
POST _transform/last-log-from-clientip/_start

transform 处理完数据后,搜索目标索引

response = client.search(
  index: 'last-log-from-clientip'
)
puts response
GET last-log-from-clientip/_search

搜索结果显示您类似于此的数据,适用于每个客户端 IP

{
  "_index" : "last-log-from-clientip",
  "_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
  "_score" : 1.0,
  "_source" : {
    "referer" : "http://twitter.com/error/don-lind",
    "request" : "/elasticsearch",
    "agent" : "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)",
    "extension" : "",
    "memory" : null,
    "ip" : "0.72.176.46",
    "index" : "kibana_sample_data_logs",
    "message" : "0.72.176.46 - - [2018-09-18T06:31:00.572Z] \"GET /elasticsearch HTTP/1.1\" 200 7065 \"-\" \"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)\"",
    "url" : "https://elastic.ac.cn/downloads/elasticsearch",
    "tags" : [
      "success",
      "info"
    ],
    "geo" : {
      "srcdest" : "IN:PH",
      "src" : "IN",
      "coordinates" : {
        "lon" : -124.1127917,
        "lat" : 40.80338889
      },
      "dest" : "PH"
    },
    "utc_time" : "2021-05-04T06:31:00.572Z",
    "bytes" : 7065,
    "machine" : {
      "os" : "ios",
      "ram" : 12884901888
    },
    "response" : 200,
    "clientip" : "0.72.176.46",
    "host" : "www.elastic.co",
    "event" : {
      "dataset" : "sample_web_logs"
    },
    "phpmemory" : null,
    "timestamp" : "2021-05-04T06:31:00.572Z"
  }
}

此 transform 使回答以下问题变得更容易

  • 与特定 IP 地址关联的最新的日志事件是什么?

查找向服务器发送最多字节的客户端 IP编辑

此示例使用 Web 日志示例数据集来查找每小时向服务器发送最多字节的客户端 IP。此示例使用 pivot transform,并使用 top_metrics 聚合。

按时间字段上的 日期直方图 对数据进行分组,间隔为一小时。在 bytes 字段上使用 最大聚合 来获取发送到服务器的最大数据量。如果没有 max 聚合,API 调用仍然会返回发送最多字节的客户端 IP,但是,它发送的字节数不会返回。在 top_metrics 属性中,指定 clientipgeo.src,然后按 bytes 字段降序排序。transform 返回发送最大数据量的客户端 IP 以及相应位置的 2 个字母的 ISO 代码。

POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "pivot": {
    "group_by": { 
      "timestamp": {
        "date_histogram": {
          "field": "timestamp",
          "fixed_interval": "1h"
        }
      }
    },
    "aggregations": {
      "bytes.max": { 
        "max": {
          "field": "bytes"
        }
      },
      "top": {
        "top_metrics": { 
          "metrics": [
            {
              "field": "clientip"
            },
            {
              "field": "geo.src"
            }
          ],
          "sort": {
            "bytes": "desc"
          }
        }
      }
    }
  }
}

数据按时间字段的日期直方图分组,间隔为一小时。

计算 bytes 字段的最大值。

指定要返回的顶部文档的字段 (clientipgeo.src) 以及排序方法(具有最高 bytes 值的文档)。

上面的 API 调用返回类似于此的响应

{
  "preview" : [
    {
      "top" : {
        "clientip" : "223.87.60.27",
        "geo.src" : "IN"
      },
      "bytes" : {
        "max" : 6219
      },
      "timestamp" : "2021-04-25T00:00:00.000Z"
    },
    {
      "top" : {
        "clientip" : "99.74.118.237",
        "geo.src" : "LK"
      },
      "bytes" : {
        "max" : 14113
      },
      "timestamp" : "2021-04-25T03:00:00.000Z"
    },
    {
      "top" : {
        "clientip" : "218.148.135.12",
        "geo.src" : "BR"
      },
      "bytes" : {
        "max" : 4531
      },
      "timestamp" : "2021-04-25T04:00:00.000Z"
    },
    ...
  ]
}

通过客户 ID 获取客户姓名和电子邮件地址编辑

此示例使用电子商务示例数据集来创建基于客户 ID 的以实体为中心的索引,并使用 top_metrics 聚合来获取客户姓名和电子邮件地址。

将数据按 customer_id 分组,然后添加一个 top_metrics 聚合,其中 metricsemailcustomer_first_name.keywordcustomer_last_name.keyword 字段。按 order_date 降序排列 top_metrics。API 调用如下所示

POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_ecommerce"
  },
  "pivot": {
    "group_by": { 
      "customer_id": {
        "terms": {
          "field": "customer_id"
        }
      }
    },
    "aggregations": {
      "last": {
        "top_metrics": { 
          "metrics": [
            {
              "field": "email"
            },
            {
              "field": "customer_first_name.keyword"
            },
            {
              "field": "customer_last_name.keyword"
            }
          ],
          "sort": {
            "order_date": "desc"
          }
        }
      }
    }
  }
}

数据按 customer_id 字段上的 terms 聚合进行分组。

指定要返回的字段(电子邮件和姓名字段),按订单日期降序排列。

API 返回的响应类似于以下内容

 {
  "preview" : [
    {
      "last" : {
        "customer_last_name.keyword" : "Long",
        "customer_first_name.keyword" : "Recip",
        "email" : "[email protected]"
      },
      "customer_id" : "10"
    },
    {
      "last" : {
        "customer_last_name.keyword" : "Jackson",
        "customer_first_name.keyword" : "Fitzgerald",
        "email" : "[email protected]"
      },
      "customer_id" : "11"
    },
    {
      "last" : {
        "customer_last_name.keyword" : "Cross",
        "customer_first_name.keyword" : "Brigitte",
        "email" : "[email protected]"
      },
      "customer_id" : "12"
    },
    ...
  ]
}