教程:转换电子商务示例数据
编辑教程:转换电子商务示例数据
编辑转换 使您能够从 Elasticsearch 索引中检索信息,对其进行转换,并将其存储到另一个索引中。让我们使用 Kibana 示例数据 来演示如何使用转换对数据进行透视和汇总。
- 验证您的环境是否已正确设置以使用转换。如果启用了 Elasticsearch 安全功能,则要完成本教程,您需要一个具有预览和创建转换权限的用户。您还必须拥有源索引和目标索引的特定索引权限。请参阅 设置。
-
选择您的源索引。
在本例中,我们将使用电子商务订单示例数据。如果您还不熟悉
kibana_sample_data_ecommerce
索引,请使用 Kibana 中的收入仪表板来浏览数据。考虑一下您可能希望从这些电子商务数据中获得哪些见解。 -
选择转换的透视类型,并尝试各种数据分组和聚合选项。
有两种类型的转换,但首先我们将尝试透视您的数据,这涉及使用至少一个字段对其进行分组并应用至少一个聚合。您可以预览转换后的数据的外观,因此请继续操作并尝试一下!您还可以启用直方图来更好地了解数据中值的分布。
例如,您可能希望按产品 ID 对数据进行分组,并计算每个产品的总销售额及其平均价格。或者,您可能希望查看单个客户的行为,并计算每个客户的总支出以及他们购买的不同产品类别数量。或者,您可能希望考虑货币或地理位置。您可以用哪些最有趣的方式转换和解释这些数据?
转到 Kibana 中的管理 > 堆栈管理 > 数据 > 转换,并使用向导创建转换
按客户 ID 对数据进行分组,并添加一个或多个聚合以了解每个客户的订单的更多信息。例如,让我们计算他们购买的产品总量、购买总价、他们在单个订单中购买产品的最大数量以及他们的订单总数。我们将通过对
total_quantity
和taxless_total_price
字段使用sum
聚合、对total_quantity
字段使用max
聚合 以及对order_id
字段使用cardinality
聚合 来实现这一点如果您愿意,可以使用 预览转换 API。
API 示例
resp = client.transform.preview_transform( source={ "index": "kibana_sample_data_ecommerce", "query": { "bool": { "filter": { "term": { "currency": "EUR" } } } } }, pivot={ "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } }, ) print(resp)
const response = await client.transform.previewTransform({ source: { index: "kibana_sample_data_ecommerce", query: { bool: { filter: { term: { currency: "EUR", }, }, }, }, }, pivot: { group_by: { customer_id: { terms: { field: "customer_id", }, }, }, aggregations: { "total_quantity.sum": { sum: { field: "total_quantity", }, }, "taxless_total_price.sum": { sum: { field: "taxless_total_price", }, }, "total_quantity.max": { max: { field: "total_quantity", }, }, "order_id.cardinality": { cardinality: { field: "order_id", }, }, }, }, }); console.log(response);
POST _transform/_preview { "source": { "index": "kibana_sample_data_ecommerce", "query": { "bool": { "filter": { "term": {"currency": "EUR"} } } } }, "pivot": { "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } } }
-
当您对预览中看到的内容感到满意时,请创建转换。
- 提供转换 ID、目标索引的名称以及可选说明。如果目标索引不存在,则在您启动转换时会自动创建它。
- 确定您希望转换运行一次还是连续运行。由于此示例数据索引是不变的,因此让我们使用默认行为,只运行转换一次。但是,如果您想尝试一下,请继续并点击连续模式。您必须选择一个转换可以用来检查哪些实体已更改的字段。通常,最好使用摄取时间戳字段。但是,在本例中,您可以使用
order_date
字段。 - 可选地,您可以配置适用于转换的保留策略。选择一个用于识别目标索引中旧文档的日期字段,并提供最大年龄。早于配置值的文档将从目标索引中删除。
在 Kibana 中,在完成创建转换之前,您可以将预览转换 API 请求复制到剪贴板。此信息在您稍后决定是否要手动创建目标索引时很有用。
如果您愿意,可以使用 创建转换 API。
API 示例
resp = client.transform.put_transform( transform_id="ecommerce-customer-transform", source={ "index": [ "kibana_sample_data_ecommerce" ], "query": { "bool": { "filter": { "term": { "currency": "EUR" } } } } }, pivot={ "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } }, dest={ "index": "ecommerce-customers" }, retention_policy={ "time": { "field": "order_date", "max_age": "60d" } }, ) print(resp)
const response = await client.transform.putTransform({ transform_id: "ecommerce-customer-transform", source: { index: ["kibana_sample_data_ecommerce"], query: { bool: { filter: { term: { currency: "EUR", }, }, }, }, }, pivot: { group_by: { customer_id: { terms: { field: "customer_id", }, }, }, aggregations: { "total_quantity.sum": { sum: { field: "total_quantity", }, }, "taxless_total_price.sum": { sum: { field: "taxless_total_price", }, }, "total_quantity.max": { max: { field: "total_quantity", }, }, "order_id.cardinality": { cardinality: { field: "order_id", }, }, }, }, dest: { index: "ecommerce-customers", }, retention_policy: { time: { field: "order_date", max_age: "60d", }, }, }); console.log(response);
PUT _transform/ecommerce-customer-transform { "source": { "index": [ "kibana_sample_data_ecommerce" ], "query": { "bool": { "filter": { "term": { "currency": "EUR" } } } } }, "pivot": { "group_by": { "customer_id": { "terms": { "field": "customer_id" } } }, "aggregations": { "total_quantity.sum": { "sum": { "field": "total_quantity" } }, "taxless_total_price.sum": { "sum": { "field": "taxless_total_price" } }, "total_quantity.max": { "max": { "field": "total_quantity" } }, "order_id.cardinality": { "cardinality": { "field": "order_id" } } } }, "dest": { "index": "ecommerce-customers" }, "retention_policy": { "time": { "field": "order_date", "max_age": "60d" } } }
-
可选:创建目标索引。
如果目标索引不存在,则在您第一次启动转换时会创建它。透视转换会根据源索引和转换聚合推断目标索引的映射。如果目标索引中存在来自脚本派生的字段(例如,如果您使用
scripted_metrics
或bucket_scripts
聚合),则使用 动态映射 创建它们。您可以使用预览转换 API 预览它将用于目标索引的映射。在 Kibana 中,如果您已将 API 请求复制到剪贴板,请将其粘贴到控制台中,然后参考 API 响应中的generated_dest_index
对象。API 示例
{ "preview" : [ { "total_quantity" : { "max" : 2, "sum" : 118.0 }, "taxless_total_price" : { "sum" : 3946.9765625 }, "customer_id" : "10", "order_id" : { "cardinality" : 59 } }, ... ], "generated_dest_index" : { "mappings" : { "_meta" : { "_transform" : { "transform" : "transform-preview", "version" : { "created" : "8.0.0" }, "creation_date_in_millis" : 1621991264061 }, "created_by" : "transform" }, "properties" : { "total_quantity.sum" : { "type" : "double" }, "total_quantity" : { "type" : "object" }, "taxless_total_price" : { "type" : "object" }, "taxless_total_price.sum" : { "type" : "double" }, "order_id.cardinality" : { "type" : "long" }, "customer_id" : { "type" : "keyword" }, "total_quantity.max" : { "type" : "integer" }, "order_id" : { "type" : "object" } } }, "settings" : { "index" : { "number_of_shards" : "1", "auto_expand_replicas" : "0-1" } }, "aliases" : { } } }
在某些情况下,推断的映射可能与实际数据不兼容。例如,可能会发生数字溢出,或者动态映射的字段可能同时包含数字和字符串。为避免此问题,请在启动转换之前创建目标索引。有关更多信息,请参阅 创建索引 API。
API 示例
您可以使用转换预览中的信息来创建目标索引。例如
resp = client.indices.create( index="ecommerce-customers", mappings={ "properties": { "total_quantity.sum": { "type": "double" }, "total_quantity": { "type": "object" }, "taxless_total_price": { "type": "object" }, "taxless_total_price.sum": { "type": "double" }, "order_id.cardinality": { "type": "long" }, "customer_id": { "type": "keyword" }, "total_quantity.max": { "type": "integer" }, "order_id": { "type": "object" } } }, ) print(resp)
response = client.indices.create( index: 'ecommerce-customers', body: { mappings: { properties: { 'total_quantity.sum' => { type: 'double' }, total_quantity: { type: 'object' }, taxless_total_price: { type: 'object' }, 'taxless_total_price.sum' => { type: 'double' }, 'order_id.cardinality' => { type: 'long' }, customer_id: { type: 'keyword' }, 'total_quantity.max' => { type: 'integer' }, order_id: { type: 'object' } } } } ) puts response
const response = await client.indices.create({ index: "ecommerce-customers", mappings: { properties: { "total_quantity.sum": { type: "double", }, total_quantity: { type: "object", }, taxless_total_price: { type: "object", }, "taxless_total_price.sum": { type: "double", }, "order_id.cardinality": { type: "long", }, customer_id: { type: "keyword", }, "total_quantity.max": { type: "integer", }, order_id: { type: "object", }, }, }, }); console.log(response);
PUT /ecommerce-customers { "mappings": { "properties": { "total_quantity.sum" : { "type" : "double" }, "total_quantity" : { "type" : "object" }, "taxless_total_price" : { "type" : "object" }, "taxless_total_price.sum" : { "type" : "double" }, "order_id.cardinality" : { "type" : "long" }, "customer_id" : { "type" : "keyword" }, "total_quantity.max" : { "type" : "integer" }, "order_id" : { "type" : "object" } } } }
-
启动转换。
即使资源利用率会根据集群负载自动调整,但在转换运行期间,它也会增加集群上的搜索和索引负载。但是,如果您遇到过大的负载,则可以停止它。
您可以在 Kibana 中启动、停止、重置和管理转换
或者,您可以使用 启动转换、停止转换 和 重置转换 API。
如果重置转换,则所有检查点、状态和目标索引(如果由转换创建)都将被删除。转换已准备好再次启动,就像刚刚创建一样。
API 示例
resp = client.transform.start_transform( transform_id="ecommerce-customer-transform", ) print(resp)
response = client.transform.start_transform( transform_id: 'ecommerce-customer-transform' ) puts response
const response = await client.transform.startTransform({ transform_id: "ecommerce-customer-transform", }); console.log(response);
POST _transform/ecommerce-customer-transform/_start
如果您选择了批处理转换,则它是一个具有单个检查点的单个操作。完成时无法重新启动它。连续转换的不同之处在于,它们会随着新源数据的摄取不断递增和处理检查点。
-
浏览新索引中的数据。
例如,使用 Kibana 中的发现应用程序
-
可选:创建另一个转换,这次使用
latest
方法。此方法使用每个唯一键值的最新文档填充目标索引。例如,您可能希望查找每个客户或每个国家/地区和地区的最新订单(按
order_date
字段排序)。API 示例
resp = client.transform.preview_transform( source={ "index": "kibana_sample_data_ecommerce", "query": { "bool": { "filter": { "term": { "currency": "EUR" } } } } }, latest={ "unique_key": [ "geoip.country_iso_code", "geoip.region_name" ], "sort": "order_date" }, ) print(resp)
const response = await client.transform.previewTransform({ source: { index: "kibana_sample_data_ecommerce", query: { bool: { filter: { term: { currency: "EUR", }, }, }, }, }, latest: { unique_key: ["geoip.country_iso_code", "geoip.region_name"], sort: "order_date", }, }); console.log(response);
POST _transform/_preview { "source": { "index": "kibana_sample_data_ecommerce", "query": { "bool": { "filter": { "term": {"currency": "EUR"} } } } }, "latest": { "unique_key": ["geoip.country_iso_code", "geoip.region_name"], "sort": "order_date" } }
如果目标索引不存在,则在您第一次启动转换时会创建它。但是,与透视转换不同,最新转换在创建索引时不会推断映射定义。相反,它们使用动态映射。要使用显式映射,请在启动转换之前创建目标索引。
- 如果您不想保留转换,则可以在 Kibana 中删除它或使用 删除转换 API。默认情况下,当您删除转换时,其目标索引和 Kibana 索引模式会保留。
既然您已经为 Kibana 示例数据创建了简单的转换,请考虑您自己的数据的可能用例。有关更多想法,请参阅 何时使用转换 和 示例。