转换示例

编辑

这些示例演示了如何使用转换从您的数据中获得有用的见解。所有示例都使用Kibana 示例数据集之一。有关更详细的逐步示例,请参阅教程:转换电子商务示例数据

查找您的最佳客户

编辑

此示例使用电子商务订单示例数据集来查找在假设的网上商店中花费最多的客户。让我们使用 pivot 类型的转换,使目标索引包含每个客户的订单数量、订单总价、唯一产品数量、每个订单的平均价格以及订购产品的总数量。

Finding your best customers with transforms in Kibana

或者,您可以使用预览转换创建转换 API

API 示例
resp = client.transform.preview_transform(
    source={
        "index": "kibana_sample_data_ecommerce"
    },
    dest={
        "index": "sample_ecommerce_orders_by_customer"
    },
    pivot={
        "group_by": {
            "user": {
                "terms": {
                    "field": "user"
                }
            },
            "customer_id": {
                "terms": {
                    "field": "customer_id"
                }
            }
        },
        "aggregations": {
            "order_count": {
                "value_count": {
                    "field": "order_id"
                }
            },
            "total_order_amt": {
                "sum": {
                    "field": "taxful_total_price"
                }
            },
            "avg_amt_per_order": {
                "avg": {
                    "field": "taxful_total_price"
                }
            },
            "avg_unique_products_per_order": {
                "avg": {
                    "field": "total_unique_products"
                }
            },
            "total_unique_products": {
                "cardinality": {
                    "field": "products.product_id"
                }
            }
        }
    },
)
print(resp)
const response = await client.transform.previewTransform({
  source: {
    index: "kibana_sample_data_ecommerce",
  },
  dest: {
    index: "sample_ecommerce_orders_by_customer",
  },
  pivot: {
    group_by: {
      user: {
        terms: {
          field: "user",
        },
      },
      customer_id: {
        terms: {
          field: "customer_id",
        },
      },
    },
    aggregations: {
      order_count: {
        value_count: {
          field: "order_id",
        },
      },
      total_order_amt: {
        sum: {
          field: "taxful_total_price",
        },
      },
      avg_amt_per_order: {
        avg: {
          field: "taxful_total_price",
        },
      },
      avg_unique_products_per_order: {
        avg: {
          field: "total_unique_products",
        },
      },
      total_unique_products: {
        cardinality: {
          field: "products.product_id",
        },
      },
    },
  },
});
console.log(response);
POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_ecommerce"
  },
  "dest" : { 
    "index" : "sample_ecommerce_orders_by_customer"
  },
  "pivot": {
    "group_by": { 
      "user": { "terms": { "field": "user" }},
      "customer_id": { "terms": { "field": "customer_id" }}
    },
    "aggregations": {
      "order_count": { "value_count": { "field": "order_id" }},
      "total_order_amt": { "sum": { "field": "taxful_total_price" }},
      "avg_amt_per_order": { "avg": { "field": "taxful_total_price" }},
      "avg_unique_products_per_order": { "avg": { "field": "total_unique_products" }},
      "total_unique_products": { "cardinality": { "field": "products.product_id" }}
    }
  }
}

转换的目标索引。它被 _preview 忽略。

选择了两个 group_by 字段。这意味着转换包含每个 usercustomer_id 组合的唯一行。在此数据集中,这两个字段都是唯一的。通过将两者都包含在转换中,可以为最终结果提供更多上下文。

在上面的示例中,使用了精简的 JSON 格式,以便更轻松地读取 pivot 对象。

预览转换 API 使您能够预先查看转换的布局,其中填充了一些示例值。例如

{
  "preview" : [
    {
      "total_order_amt" : 3946.9765625,
      "order_count" : 59.0,
      "total_unique_products" : 116.0,
      "avg_unique_products_per_order" : 2.0,
      "customer_id" : "10",
      "user" : "recip",
      "avg_amt_per_order" : 66.89790783898304
    },
    ...
    ]
  }

此转换使您可以更轻松地回答以下问题,例如

  • 哪些客户花费最多?
  • 哪些客户每个订单花费最多?
  • 哪些客户订购最频繁?
  • 哪些客户订购的不同产品数量最少?

可以使用聚合单独回答这些问题,但是转换允许我们将此数据持久化为以客户为中心的索引。这使我们能够大规模地分析数据,并为从以客户为中心的角度探索和浏览数据提供了更大的灵活性。在某些情况下,它甚至可以使创建可视化效果更加简单。

查找延误最多的航空公司

编辑

此示例使用 Flights 示例数据集来查找哪个航空公司的延误最多。首先,通过使用查询过滤器过滤源数据,使其排除所有已取消的航班。然后转换数据以包含不同的航班数量、延迟分钟的总和以及每个航空公司的飞行分钟的总和。最后,使用bucket_script来确定实际延误的飞行时间百分比。

resp = client.transform.preview_transform(
    source={
        "index": "kibana_sample_data_flights",
        "query": {
            "bool": {
                "filter": [
                    {
                        "term": {
                            "Cancelled": False
                        }
                    }
                ]
            }
        }
    },
    dest={
        "index": "sample_flight_delays_by_carrier"
    },
    pivot={
        "group_by": {
            "carrier": {
                "terms": {
                    "field": "Carrier"
                }
            }
        },
        "aggregations": {
            "flights_count": {
                "value_count": {
                    "field": "FlightNum"
                }
            },
            "delay_mins_total": {
                "sum": {
                    "field": "FlightDelayMin"
                }
            },
            "flight_mins_total": {
                "sum": {
                    "field": "FlightTimeMin"
                }
            },
            "delay_time_percentage": {
                "bucket_script": {
                    "buckets_path": {
                        "delay_time": "delay_mins_total.value",
                        "flight_time": "flight_mins_total.value"
                    },
                    "script": "(params.delay_time / params.flight_time) * 100"
                }
            }
        }
    },
)
print(resp)
const response = await client.transform.previewTransform({
  source: {
    index: "kibana_sample_data_flights",
    query: {
      bool: {
        filter: [
          {
            term: {
              Cancelled: false,
            },
          },
        ],
      },
    },
  },
  dest: {
    index: "sample_flight_delays_by_carrier",
  },
  pivot: {
    group_by: {
      carrier: {
        terms: {
          field: "Carrier",
        },
      },
    },
    aggregations: {
      flights_count: {
        value_count: {
          field: "FlightNum",
        },
      },
      delay_mins_total: {
        sum: {
          field: "FlightDelayMin",
        },
      },
      flight_mins_total: {
        sum: {
          field: "FlightTimeMin",
        },
      },
      delay_time_percentage: {
        bucket_script: {
          buckets_path: {
            delay_time: "delay_mins_total.value",
            flight_time: "flight_mins_total.value",
          },
          script: "(params.delay_time / params.flight_time) * 100",
        },
      },
    },
  },
});
console.log(response);
POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_flights",
    "query": { 
      "bool": {
        "filter": [
          { "term":  { "Cancelled": false } }
        ]
      }
    }
  },
  "dest" : { 
    "index" : "sample_flight_delays_by_carrier"
  },
  "pivot": {
    "group_by": { 
      "carrier": { "terms": { "field": "Carrier" }}
    },
    "aggregations": {
      "flights_count": { "value_count": { "field": "FlightNum" }},
      "delay_mins_total": { "sum": { "field": "FlightDelayMin" }},
      "flight_mins_total": { "sum": { "field": "FlightTimeMin" }},
      "delay_time_percentage": { 
        "bucket_script": {
          "buckets_path": {
            "delay_time": "delay_mins_total.value",
            "flight_time": "flight_mins_total.value"
          },
          "script": "(params.delay_time / params.flight_time) * 100"
        }
      }
    }
  }
}

过滤源数据以仅选择未取消的航班。

转换的目标索引。它被 _preview 忽略。

数据按包含航空公司名称的 Carrier 字段分组。

bucket_script 对聚合返回的结果执行计算。在此特定示例中,它计算旅行时间中有多少百分比被延误占用。

预览显示新索引将包含每个航空公司的数据,如下所示

{
  "preview" : [
    {
      "carrier" : "ES-Air",
      "flights_count" : 2802.0,
      "flight_mins_total" : 1436927.5130677223,
      "delay_time_percentage" : 9.335543983955839,
      "delay_mins_total" : 134145.0
    },
    ...
  ]
}

此转换使您可以更轻松地回答以下问题,例如

  • 哪个航空公司的延误时间占飞行时间的百分比最高?

此数据是虚构的,并不反映任何特色目的地或始发机场的实际延误或航班统计数据。

查找可疑的客户端 IP

编辑

此示例使用 Web 日志示例数据集来识别可疑的客户端 IP。它转换数据,使新索引包含每个客户端 IP 的字节总和、不同 URL 的数量、代理、按位置的传入请求以及地理目的地。它还使用过滤器聚合来计算每个客户端 IP 收到的特定类型的 HTTP 响应。最终,以下示例将 Web 日志数据转换为以实体为中心的索引,其中实体为 clientip

resp = client.transform.put_transform(
    transform_id="suspicious_client_ips",
    source={
        "index": "kibana_sample_data_logs"
    },
    dest={
        "index": "sample_weblogs_by_clientip"
    },
    sync={
        "time": {
            "field": "timestamp",
            "delay": "60s"
        }
    },
    pivot={
        "group_by": {
            "clientip": {
                "terms": {
                    "field": "clientip"
                }
            }
        },
        "aggregations": {
            "url_dc": {
                "cardinality": {
                    "field": "url.keyword"
                }
            },
            "bytes_sum": {
                "sum": {
                    "field": "bytes"
                }
            },
            "geo.src_dc": {
                "cardinality": {
                    "field": "geo.src"
                }
            },
            "agent_dc": {
                "cardinality": {
                    "field": "agent.keyword"
                }
            },
            "geo.dest_dc": {
                "cardinality": {
                    "field": "geo.dest"
                }
            },
            "responses.total": {
                "value_count": {
                    "field": "timestamp"
                }
            },
            "success": {
                "filter": {
                    "term": {
                        "response": "200"
                    }
                }
            },
            "error404": {
                "filter": {
                    "term": {
                        "response": "404"
                    }
                }
            },
            "error5xx": {
                "filter": {
                    "range": {
                        "response": {
                            "gte": 500,
                            "lt": 600
                        }
                    }
                }
            },
            "timestamp.min": {
                "min": {
                    "field": "timestamp"
                }
            },
            "timestamp.max": {
                "max": {
                    "field": "timestamp"
                }
            },
            "timestamp.duration_ms": {
                "bucket_script": {
                    "buckets_path": {
                        "min_time": "timestamp.min.value",
                        "max_time": "timestamp.max.value"
                    },
                    "script": "(params.max_time - params.min_time)"
                }
            }
        }
    },
)
print(resp)
const response = await client.transform.putTransform({
  transform_id: "suspicious_client_ips",
  source: {
    index: "kibana_sample_data_logs",
  },
  dest: {
    index: "sample_weblogs_by_clientip",
  },
  sync: {
    time: {
      field: "timestamp",
      delay: "60s",
    },
  },
  pivot: {
    group_by: {
      clientip: {
        terms: {
          field: "clientip",
        },
      },
    },
    aggregations: {
      url_dc: {
        cardinality: {
          field: "url.keyword",
        },
      },
      bytes_sum: {
        sum: {
          field: "bytes",
        },
      },
      "geo.src_dc": {
        cardinality: {
          field: "geo.src",
        },
      },
      agent_dc: {
        cardinality: {
          field: "agent.keyword",
        },
      },
      "geo.dest_dc": {
        cardinality: {
          field: "geo.dest",
        },
      },
      "responses.total": {
        value_count: {
          field: "timestamp",
        },
      },
      success: {
        filter: {
          term: {
            response: "200",
          },
        },
      },
      error404: {
        filter: {
          term: {
            response: "404",
          },
        },
      },
      error5xx: {
        filter: {
          range: {
            response: {
              gte: 500,
              lt: 600,
            },
          },
        },
      },
      "timestamp.min": {
        min: {
          field: "timestamp",
        },
      },
      "timestamp.max": {
        max: {
          field: "timestamp",
        },
      },
      "timestamp.duration_ms": {
        bucket_script: {
          buckets_path: {
            min_time: "timestamp.min.value",
            max_time: "timestamp.max.value",
          },
          script: "(params.max_time - params.min_time)",
        },
      },
    },
  },
});
console.log(response);
PUT _transform/suspicious_client_ips
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "dest" : { 
    "index" : "sample_weblogs_by_clientip"
  },
  "sync" : { 
    "time": {
      "field": "timestamp",
      "delay": "60s"
    }
  },
  "pivot": {
    "group_by": {  
      "clientip": { "terms": { "field": "clientip" } }
      },
    "aggregations": {
      "url_dc": { "cardinality": { "field": "url.keyword" }},
      "bytes_sum": { "sum": { "field": "bytes" }},
      "geo.src_dc": { "cardinality": { "field": "geo.src" }},
      "agent_dc": { "cardinality": { "field": "agent.keyword" }},
      "geo.dest_dc": { "cardinality": { "field": "geo.dest" }},
      "responses.total": { "value_count": { "field": "timestamp" }},
      "success" : { 
         "filter": {
            "term": { "response" : "200"}}
        },
      "error404" : {
         "filter": {
            "term": { "response" : "404"}}
        },
      "error5xx" : {
         "filter": {
            "range": { "response" : { "gte": 500, "lt": 600}}}
        },
      "timestamp.min": { "min": { "field": "timestamp" }},
      "timestamp.max": { "max": { "field": "timestamp" }},
      "timestamp.duration_ms": { 
        "bucket_script": {
          "buckets_path": {
            "min_time": "timestamp.min.value",
            "max_time": "timestamp.max.value"
          },
          "script": "(params.max_time - params.min_time)"
        }
      }
    }
  }
}

转换的目标索引。

配置转换以连续运行。它使用 timestamp 字段来同步源索引和目标索引。最坏情况下的提取延迟为 60 秒。

数据按 clientip 字段分组。

过滤器聚合,用于计算 response 字段中成功的 (200) 响应的出现次数。以下两个聚合 (error404error5xx) 按错误代码计数错误响应,匹配确切的值或一系列响应代码。

bucket_script 根据聚合的结果计算 clientip 访问的持续时间。

创建转换后,您必须启动它

resp = client.transform.start_transform(
    transform_id="suspicious_client_ips",
)
print(resp)
response = client.transform.start_transform(
  transform_id: 'suspicious_client_ips'
)
puts response
const response = await client.transform.startTransform({
  transform_id: "suspicious_client_ips",
});
console.log(response);
POST _transform/suspicious_client_ips/_start

此后不久,第一个结果应在目标索引中可用

resp = client.search(
    index="sample_weblogs_by_clientip",
)
print(resp)
response = client.search(
  index: 'sample_weblogs_by_clientip'
)
puts response
const response = await client.search({
  index: "sample_weblogs_by_clientip",
});
console.log(response);
GET sample_weblogs_by_clientip/_search

搜索结果显示每个客户端 IP 的数据,如下所示

    "hits" : [
      {
        "_index" : "sample_weblogs_by_clientip",
        "_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "geo" : {
            "src_dc" : 2.0,
            "dest_dc" : 2.0
          },
          "success" : 2,
          "error404" : 0,
          "error503" : 0,
          "clientip" : "0.72.176.46",
          "agent_dc" : 2.0,
          "bytes_sum" : 4422.0,
          "responses" : {
            "total" : 2.0
          },
          "url_dc" : 2.0,
          "timestamp" : {
            "duration_ms" : 5.2191698E8,
            "min" : "2020-03-16T07:51:57.333Z",
            "max" : "2020-03-22T08:50:34.313Z"
          }
        }
      }
    ]

与其他 Kibana 示例数据集一样,Web 日志示例数据集包含相对于您安装它时的时间戳,包括将来的时间戳。一旦数据点过去,连续转换将拾取数据点。如果您在一段时间前安装了 Web 日志示例数据集,则可以卸载并重新安装它,并且时间戳将更改。

此转换使您可以更轻松地回答以下问题,例如

  • 哪些客户端 IP 正在传输最多的数据量?
  • 哪些客户端 IP 正在与大量的不同 URL 交互?
  • 哪些客户端 IP 的错误率较高?
  • 哪些客户端 IP 正在与大量的目标国家/地区交互?

查找每个 IP 地址的最后一个日志事件

编辑

此示例使用 Web 日志示例数据集来查找来自 IP 地址的最后一个日志。让我们在连续模式下使用 latest 类型的转换。它将每个唯一键的最新文档从源索引复制到目标索引,并在新数据进入源索引时更新目标索引。

选择 clientip 字段作为唯一键;数据按此字段分组。选择 timestamp 作为按时间顺序对数据进行排序的日期字段。对于连续模式,指定用于识别新文档的日期字段,以及检查源索引中更改的间隔。

Finding the last log event for each IP address with transforms in Kibana

假设我们有兴趣仅保留最近出现在日志中的 IP 地址的文档。您可以定义保留策略并指定用于计算文档年龄的日期字段。此示例使用用于对数据进行排序的同一日期字段。然后设置文档的最长保留时间;比您设置的值旧的文档将从目标索引中删除。

Defining retention policy for transforms in Kibana

此转换创建目标索引,该索引包含每个客户端 IP 的最新登录日期。由于转换以连续模式运行,因此当新数据进入源索引时,目标索引将更新。最后,由于应用了保留策略,每个超过 30 天的文档都将从目标索引中删除。

API 示例
resp = client.transform.put_transform(
    transform_id="last-log-from-clientip",
    source={
        "index": [
            "kibana_sample_data_logs"
        ]
    },
    latest={
        "unique_key": [
            "clientip"
        ],
        "sort": "timestamp"
    },
    frequency="1m",
    dest={
        "index": "last-log-from-clientip"
    },
    sync={
        "time": {
            "field": "timestamp",
            "delay": "60s"
        }
    },
    retention_policy={
        "time": {
            "field": "timestamp",
            "max_age": "30d"
        }
    },
    settings={
        "max_page_search_size": 500
    },
)
print(resp)
const response = await client.transform.putTransform({
  transform_id: "last-log-from-clientip",
  source: {
    index: ["kibana_sample_data_logs"],
  },
  latest: {
    unique_key: ["clientip"],
    sort: "timestamp",
  },
  frequency: "1m",
  dest: {
    index: "last-log-from-clientip",
  },
  sync: {
    time: {
      field: "timestamp",
      delay: "60s",
    },
  },
  retention_policy: {
    time: {
      field: "timestamp",
      max_age: "30d",
    },
  },
  settings: {
    max_page_search_size: 500,
  },
});
console.log(response);
PUT _transform/last-log-from-clientip
{
  "source": {
    "index": [
      "kibana_sample_data_logs"
    ]
  },
  "latest": {
    "unique_key": [ 
      "clientip"
    ],
    "sort": "timestamp" 
  },
  "frequency": "1m", 
  "dest": {
    "index": "last-log-from-clientip"
  },
  "sync": { 
    "time": {
      "field": "timestamp",
      "delay": "60s"
    }
  },
  "retention_policy": { 
    "time": {
      "field": "timestamp",
      "max_age": "30d"
    }
  },
  "settings": {
    "max_page_search_size": 500
  }
}

指定用于对数据进行分组的字段。

指定用于对数据进行排序的日期字段。

设置转换检查源索引中更改的间隔。

包含用于同步源索引和目标索引的时间字段和延迟设置。

指定转换的保留策略。比配置的值旧的文档将从目标索引中删除。

创建转换后,启动它

resp = client.transform.start_transform(
    transform_id="last-log-from-clientip",
)
print(resp)
response = client.transform.start_transform(
  transform_id: 'last-log-from-clientip'
)
puts response
const response = await client.transform.startTransform({
  transform_id: "last-log-from-clientip",
});
console.log(response);
POST _transform/last-log-from-clientip/_start

转换处理数据后,搜索目标索引

resp = client.search(
    index="last-log-from-clientip",
)
print(resp)
response = client.search(
  index: 'last-log-from-clientip'
)
puts response
const response = await client.search({
  index: "last-log-from-clientip",
});
console.log(response);
GET last-log-from-clientip/_search

搜索结果显示每个客户端 IP 的数据,如下所示

{
  "_index" : "last-log-from-clientip",
  "_id" : "MOeHH_cUL5urmartKj-b5UQAAAAAAAAA",
  "_score" : 1.0,
  "_source" : {
    "referer" : "http://twitter.com/error/don-lind",
    "request" : "/elasticsearch",
    "agent" : "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)",
    "extension" : "",
    "memory" : null,
    "ip" : "0.72.176.46",
    "index" : "kibana_sample_data_logs",
    "message" : "0.72.176.46 - - [2018-09-18T06:31:00.572Z] \"GET /elasticsearch HTTP/1.1\" 200 7065 \"-\" \"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)\"",
    "url" : "https://elastic.ac.cn/downloads/elasticsearch",
    "tags" : [
      "success",
      "info"
    ],
    "geo" : {
      "srcdest" : "IN:PH",
      "src" : "IN",
      "coordinates" : {
        "lon" : -124.1127917,
        "lat" : 40.80338889
      },
      "dest" : "PH"
    },
    "utc_time" : "2021-05-04T06:31:00.572Z",
    "bytes" : 7065,
    "machine" : {
      "os" : "ios",
      "ram" : 12884901888
    },
    "response" : 200,
    "clientip" : "0.72.176.46",
    "host" : "www.elastic.co",
    "event" : {
      "dataset" : "sample_web_logs"
    },
    "phpmemory" : null,
    "timestamp" : "2021-05-04T06:31:00.572Z"
  }
}

此转换使您可以更轻松地回答以下问题,例如

  • 与特定 IP 地址关联的最新日志事件是什么?

查找发送到服务器字节数最多的客户端 IP

编辑

此示例使用 Web 日志示例数据集来查找每小时向服务器发送字节数最多的客户端 IP。该示例使用具有 top_metrics 聚合的 pivot 转换。

按时间字段上的日期直方图(间隔为一小时)对数据进行分组。使用 bytes 字段上的最大聚合来获取发送到服务器的最大数据量。如果没有 max 聚合,API 调用仍会返回发送字节最多的客户端 IP,但是不会返回它发送的字节数。在 top_metrics 属性中,指定 clientipgeo.src,然后按降序按 bytes 字段对它们进行排序。转换返回发送最大数据量的客户端 IP 和相应位置的 2 个字母的 ISO 代码。

resp = client.transform.preview_transform(
    source={
        "index": "kibana_sample_data_logs"
    },
    pivot={
        "group_by": {
            "timestamp": {
                "date_histogram": {
                    "field": "timestamp",
                    "fixed_interval": "1h"
                }
            }
        },
        "aggregations": {
            "bytes.max": {
                "max": {
                    "field": "bytes"
                }
            },
            "top": {
                "top_metrics": {
                    "metrics": [
                        {
                            "field": "clientip"
                        },
                        {
                            "field": "geo.src"
                        }
                    ],
                    "sort": {
                        "bytes": "desc"
                    }
                }
            }
        }
    },
)
print(resp)
const response = await client.transform.previewTransform({
  source: {
    index: "kibana_sample_data_logs",
  },
  pivot: {
    group_by: {
      timestamp: {
        date_histogram: {
          field: "timestamp",
          fixed_interval: "1h",
        },
      },
    },
    aggregations: {
      "bytes.max": {
        max: {
          field: "bytes",
        },
      },
      top: {
        top_metrics: {
          metrics: [
            {
              field: "clientip",
            },
            {
              field: "geo.src",
            },
          ],
          sort: {
            bytes: "desc",
          },
        },
      },
    },
  },
});
console.log(response);
POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "pivot": {
    "group_by": { 
      "timestamp": {
        "date_histogram": {
          "field": "timestamp",
          "fixed_interval": "1h"
        }
      }
    },
    "aggregations": {
      "bytes.max": { 
        "max": {
          "field": "bytes"
        }
      },
      "top": {
        "top_metrics": { 
          "metrics": [
            {
              "field": "clientip"
            },
            {
              "field": "geo.src"
            }
          ],
          "sort": {
            "bytes": "desc"
          }
        }
      }
    }
  }
}

数据按时间字段的日期直方图(间隔为一小时)分组。

计算 bytes 字段的最大值。

指定要返回的顶部文档的字段(clientipgeo.src)和排序方法(具有最高 bytes 值的文档)。

上面的 API 调用返回类似于此的响应

{
  "preview" : [
    {
      "top" : {
        "clientip" : "223.87.60.27",
        "geo.src" : "IN"
      },
      "bytes" : {
        "max" : 6219
      },
      "timestamp" : "2021-04-25T00:00:00.000Z"
    },
    {
      "top" : {
        "clientip" : "99.74.118.237",
        "geo.src" : "LK"
      },
      "bytes" : {
        "max" : 14113
      },
      "timestamp" : "2021-04-25T03:00:00.000Z"
    },
    {
      "top" : {
        "clientip" : "218.148.135.12",
        "geo.src" : "BR"
      },
      "bytes" : {
        "max" : 4531
      },
      "timestamp" : "2021-04-25T04:00:00.000Z"
    },
    ...
  ]
}

按客户 ID 获取客户姓名和电子邮件地址

编辑

此示例使用电子商务示例数据集基于客户 ID 创建以实体为中心的索引,并通过使用 top_metrics 聚合来获取客户姓名和电子邮件地址。

customer_id 对数据进行分组,然后添加 top_metrics 聚合,其中 metricsemailcustomer_first_name.keywordcustomer_last_name.keyword 字段。按降序按 order_datetop_metrics 进行排序。API 调用如下所示

resp = client.transform.preview_transform(
    source={
        "index": "kibana_sample_data_ecommerce"
    },
    pivot={
        "group_by": {
            "customer_id": {
                "terms": {
                    "field": "customer_id"
                }
            }
        },
        "aggregations": {
            "last": {
                "top_metrics": {
                    "metrics": [
                        {
                            "field": "email"
                        },
                        {
                            "field": "customer_first_name.keyword"
                        },
                        {
                            "field": "customer_last_name.keyword"
                        }
                    ],
                    "sort": {
                        "order_date": "desc"
                    }
                }
            }
        }
    },
)
print(resp)
const response = await client.transform.previewTransform({
  source: {
    index: "kibana_sample_data_ecommerce",
  },
  pivot: {
    group_by: {
      customer_id: {
        terms: {
          field: "customer_id",
        },
      },
    },
    aggregations: {
      last: {
        top_metrics: {
          metrics: [
            {
              field: "email",
            },
            {
              field: "customer_first_name.keyword",
            },
            {
              field: "customer_last_name.keyword",
            },
          ],
          sort: {
            order_date: "desc",
          },
        },
      },
    },
  },
});
console.log(response);
POST _transform/_preview
{
  "source": {
    "index": "kibana_sample_data_ecommerce"
  },
  "pivot": {
    "group_by": { 
      "customer_id": {
        "terms": {
          "field": "customer_id"
        }
      }
    },
    "aggregations": {
      "last": {
        "top_metrics": { 
          "metrics": [
            {
              "field": "email"
            },
            {
              "field": "customer_first_name.keyword"
            },
            {
              "field": "customer_last_name.keyword"
            }
          ],
          "sort": {
            "order_date": "desc"
          }
        }
      }
    }
  }
}

数据按 customer_id 字段上的 terms 聚合分组。

指定要返回的字段(电子邮件和姓名字段),按订单日期降序排列。

API 返回类似于此的响应

 {
  "preview" : [
    {
      "last" : {
        "customer_last_name.keyword" : "Long",
        "customer_first_name.keyword" : "Recip",
        "email" : "[email protected]"
      },
      "customer_id" : "10"
    },
    {
      "last" : {
        "customer_last_name.keyword" : "Jackson",
        "customer_first_name.keyword" : "Fitzgerald",
        "email" : "[email protected]"
      },
      "customer_id" : "11"
    },
    {
      "last" : {
        "customer_last_name.keyword" : "Cross",
        "customer_first_name.keyword" : "Brigitte",
        "email" : "[email protected]"
      },
      "customer_id" : "12"
    },
    ...
  ]
}