教程:转换电子商务示例数据
编辑教程:转换电子商务示例数据
编辑转换使您能够从 Elasticsearch 索引检索信息、对其进行转换并将其存储在另一个索引中。让我们使用 Kibana 示例数据来演示如何使用转换来透视和汇总您的数据。
- 验证您的环境是否已正确设置以使用转换。如果启用了 Elasticsearch 安全功能,则要完成本教程,您需要一个有权预览和创建转换的用户。您还必须具有源索引和目标索引的特定索引权限。请参阅设置。
-
选择您的源索引。
在本示例中,我们将使用电子商务订单示例数据。如果您还不熟悉
kibana_sample_data_ecommerce
索引,请使用 Kibana 中的 收入仪表板来探索数据。考虑一下您可能希望从这些电子商务数据中得出哪些见解。 -
选择透视类型的转换,并尝试各种用于分组和聚合数据的选项。
有两种类型的转换,但首先我们将尝试透视您的数据,这涉及使用至少一个字段对其进行分组并应用至少一个聚合。您可以预览转换后的数据外观,所以请继续尝试!您还可以启用直方图图表,以更好地了解数据中值的分布。
例如,您可能希望按产品 ID 对数据进行分组,并计算每个产品的总销售量及其平均价格。或者,您可能希望查看单个客户的行为,并计算每个客户的总消费额以及他们购买的不同产品类别数量。或者,您可能希望考虑货币或地理位置。您可以转换和解释这些数据的最有趣的方式是什么?
转到 Kibana 中的管理 > 堆栈管理 > 数据 > 转换,并使用向导创建转换
按客户 ID 对数据进行分组,并添加一个或多个聚合,以了解有关每个客户订单的更多信息。例如,让我们计算他们购买的产品总数、购买的总价格、他们在单个订单中购买的最大产品数量以及他们的订单总数。我们将通过在
total_quantity
和taxless_total_price
字段上使用sum
聚合,在total_quantity
字段上使用max
聚合,以及在order_id
字段上使用cardinality
聚合来实现此目的如果您愿意,可以使用预览转换 API。
API 示例
resp = client.transform.preview_transform( source={ "index": "kibana_sample_data_ecommerce", "query": { "bool": { "filter": { "term": { "currency": "EUR" } } } } }, pivot={ "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } }, ) print(resp)
const response = await client.transform.previewTransform({ source: { index: "kibana_sample_data_ecommerce", query: { bool: { filter: { term: { currency: "EUR", }, }, }, }, }, pivot: { group_by: { customer_id: { terms: { field: "customer_id", }, }, }, aggregations: { "total_quantity.sum": { sum: { field: "total_quantity", }, }, "taxless_total_price.sum": { sum: { field: "taxless_total_price", }, }, "total_quantity.max": { max: { field: "total_quantity", }, }, "order_id.cardinality": { cardinality: { field: "order_id", }, }, }, }, }); console.log(response);
POST _transform/_preview { "source": { "index": "kibana_sample_data_ecommerce", "query": { "bool": { "filter": { "term": {"currency": "EUR"} } } } }, "pivot": { "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } } }
-
当您对预览中看到的内容感到满意时,创建转换。
- 提供转换 ID、目标索引的名称以及可选的描述。如果目标索引不存在,则在启动转换时会自动创建该索引。
- 确定您希望转换运行一次还是连续运行。由于此示例数据索引是不变的,因此让我们使用默认行为,仅运行一次转换。但是,如果您想尝试一下,请继续单击连续模式。您必须选择一个转换可以用来检查哪些实体已更改的字段。一般来说,最好使用 ingest 时间戳字段。但是,在本示例中,您可以使用
order_date
字段。 - (可选)您可以配置应用于转换的保留策略。选择一个用于标识目标索引中旧文档的日期字段,并提供最大期限。早于配置值的文档将从目标索引中删除。
在 Kibana 中,在完成转换创建之前,您可以将预览转换 API 请求复制到剪贴板。当您决定是否要手动创建目标索引时,此信息稍后会很有用。
如果您愿意,可以使用创建转换 API。
API 示例
resp = client.transform.put_transform( transform_id="ecommerce-customer-transform", source={ "index": [ "kibana_sample_data_ecommerce" ], "query": { "bool": { "filter": { "term": { "currency": "EUR" } } } } }, pivot={ "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } }, dest={ "index": "ecommerce-customers" }, retention_policy={ "time": { "field": "order_date", "max_age": "60d" } }, ) print(resp)
const response = await client.transform.putTransform({ transform_id: "ecommerce-customer-transform", source: { index: ["kibana_sample_data_ecommerce"], query: { bool: { filter: { term: { currency: "EUR", }, }, }, }, }, pivot: { group_by: { customer_id: { terms: { field: "customer_id", }, }, }, aggregations: { "total_quantity.sum": { sum: { field: "total_quantity", }, }, "taxless_total_price.sum": { sum: { field: "taxless_total_price", }, }, "total_quantity.max": { max: { field: "total_quantity", }, }, "order_id.cardinality": { cardinality: { field: "order_id", }, }, }, }, dest: { index: "ecommerce-customers", }, retention_policy: { time: { field: "order_date", max_age: "60d", }, }, }); console.log(response);
PUT _transform/ecommerce-customer-transform { "source": { "index": [ "kibana_sample_data_ecommerce" ], "query": { "bool": { "filter": { "term": { "currency": "EUR" } } } } }, "pivot": { "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } }, "dest": { "index": "ecommerce-customers" }, "retention_policy": { "time": { "field": "order_date", "max_age": "60d" } } }
-
可选:创建目标索引。
如果目标索引不存在,则会在您第一次启动转换时创建该索引。透视转换从源索引和转换聚合中推断出目标索引的映射。如果目标索引中有从脚本派生的字段(例如,如果您使用
scripted_metrics
或bucket_scripts
聚合),则会使用动态映射创建它们。您可以使用预览转换 API 来预览它将用于目标索引的映射。在 Kibana 中,如果您已将 API 请求复制到剪贴板,请将其粘贴到控制台中,然后参考 API 响应中的generated_dest_index
对象。API 示例
{ "preview" : [ { "total_quantity" : { "max" : 2, "sum" : 118.0 }, "taxless_total_price" : { "sum" : 3946.9765625 }, "customer_id" : "10", "order_id" : { "cardinality" : 59 } }, ... ], "generated_dest_index" : { "mappings" : { "_meta" : { "_transform" : { "transform" : "transform-preview", "version" : { "created" : "8.0.0" }, "creation_date_in_millis" : 1621991264061 }, "created_by" : "transform" }, "properties" : { "total_quantity.sum" : { "type" : "double" }, "total_quantity" : { "type" : "object" }, "taxless_total_price" : { "type" : "object" }, "taxless_total_price.sum" : { "type" : "double" }, "order_id.cardinality" : { "type" : "long" }, "customer_id" : { "type" : "keyword" }, "total_quantity.max" : { "type" : "integer" }, "order_id" : { "type" : "object" } } }, "settings" : { "index" : { "number_of_shards" : "1", "auto_expand_replicas" : "0-1" } }, "aliases" : { } } }
在某些情况下,推断出的映射可能与实际数据不兼容。例如,可能会发生数值溢出,或者动态映射的字段可能同时包含数字和字符串。为避免此问题,请在启动转换之前创建目标索引。有关更多信息,请参阅创建索引 API。
API 示例
您可以使用转换预览中的信息来创建目标索引。例如
resp = client.indices.create( index="ecommerce-customers", mappings={ "properties": { "total_quantity.sum": { "type": "double" }, "total_quantity": { "type": "object" }, "taxless_total_price": { "type": "object" }, "taxless_total_price.sum": { "type": "double" }, "order_id.cardinality": { "type": "long" }, "customer_id": { "type": "keyword" }, "total_quantity.max": { "type": "integer" }, "order_id": { "type": "object" } } }, ) print(resp)
response = client.indices.create( index: 'ecommerce-customers', body: { mappings: { properties: { 'total_quantity.sum' => { type: 'double' }, total_quantity: { type: 'object' }, taxless_total_price: { type: 'object' }, 'taxless_total_price.sum' => { type: 'double' }, 'order_id.cardinality' => { type: 'long' }, customer_id: { type: 'keyword' }, 'total_quantity.max' => { type: 'integer' }, order_id: { type: 'object' } } } } ) puts response
const response = await client.indices.create({ index: "ecommerce-customers", mappings: { properties: { "total_quantity.sum": { type: "double", }, total_quantity: { type: "object", }, taxless_total_price: { type: "object", }, "taxless_total_price.sum": { type: "double", }, "order_id.cardinality": { type: "long", }, customer_id: { type: "keyword", }, "total_quantity.max": { type: "integer", }, order_id: { type: "object", }, }, }, }); console.log(response);
PUT /ecommerce-customers { "mappings": { "properties": { "total_quantity.sum" : { "type" : "double" }, "total_quantity" : { "type" : "object" }, "taxless_total_price" : { "type" : "object" }, "taxless_total_price.sum" : { "type" : "double" }, "order_id.cardinality" : { "type" : "long" }, "customer_id" : { "type" : "keyword" }, "total_quantity.max" : { "type" : "integer" }, "order_id" : { "type" : "object" } } } }
-
启动转换。
即使资源利用率会根据集群负载自动调整,但转换在运行时也会增加集群的搜索和索引负载。但是,如果您遇到过多的负载,可以停止它。
您可以在 Kibana 中启动、停止、重置和管理转换
如果您重置转换,则所有检查点、状态和目标索引(如果它是由转换创建的)都将被删除。转换已准备好再次启动,就像刚刚创建一样。
API 示例
resp = client.transform.start_transform( transform_id="ecommerce-customer-transform", ) print(resp)
response = client.transform.start_transform( transform_id: 'ecommerce-customer-transform' ) puts response
const response = await client.transform.startTransform({ transform_id: "ecommerce-customer-transform", }); console.log(response);
POST _transform/ecommerce-customer-transform/_start
如果您选择批量转换,则它是具有单个检查点的单个操作。它完成后您无法重新启动它。连续转换的不同之处在于,它们会随着新源数据的提取而不断递增和处理检查点。
-
探索新索引中的数据。
例如,使用 Kibana 中的发现应用程序
-
可选:创建另一个转换,这次使用
latest
方法。此方法使用每个唯一键值的最新文档填充目标索引。例如,您可能想查找每个客户或每个国家/地区的最新订单(按
order_date
字段排序)。API 示例
resp = client.transform.preview_transform( source={ "index": "kibana_sample_data_ecommerce", "query": { "bool": { "filter": { "term": { "currency": "EUR" } } } } }, latest={ "unique_key": [ "geoip.country_iso_code", "geoip.region_name" ], "sort": "order_date" }, ) print(resp)
const response = await client.transform.previewTransform({ source: { index: "kibana_sample_data_ecommerce", query: { bool: { filter: { term: { currency: "EUR", }, }, }, }, }, latest: { unique_key: ["geoip.country_iso_code", "geoip.region_name"], sort: "order_date", }, }); console.log(response);
POST _transform/_preview { "source": { "index": "kibana_sample_data_ecommerce", "query": { "bool": { "filter": { "term": {"currency": "EUR"} } } } }, "latest": { "unique_key": ["geoip.country_iso_code", "geoip.region_name"], "sort": "order_date" } }
如果目标索引不存在,则会在您第一次启动转换时创建该索引。但是,与透视转换不同,最新转换在创建索引时不会推断映射定义。相反,它们使用动态映射。要使用显式映射,请在启动转换之前创建目标索引。
- 如果您不想保留转换,可以在 Kibana 中删除它,或使用删除转换 API。默认情况下,当您删除转换时,其目标索引和 Kibana 索引模式将保留。
现在您已经为 Kibana 示例数据创建了简单的转换,请考虑您自己数据的可能用例。有关更多想法,请参阅何时使用转换和示例。