教程:转换电子商务示例数据

编辑

转换使您能够从 Elasticsearch 索引检索信息、对其进行转换并将其存储在另一个索引中。让我们使用 Kibana 示例数据来演示如何使用转换来透视和汇总您的数据。

  1. 验证您的环境是否已正确设置以使用转换。如果启用了 Elasticsearch 安全功能,则要完成本教程,您需要一个有权预览和创建转换的用户。您还必须具有源索引和目标索引的特定索引权限。请参阅设置
  2. 选择您的源索引

    在本示例中,我们将使用电子商务订单示例数据。如果您还不熟悉 kibana_sample_data_ecommerce 索引,请使用 Kibana 中的 收入仪表板来探索数据。考虑一下您可能希望从这些电子商务数据中得出哪些见解。

  3. 选择透视类型的转换,并尝试各种用于分组和聚合数据的选项。

    有两种类型的转换,但首先我们将尝试透视您的数据,这涉及使用至少一个字段对其进行分组并应用至少一个聚合。您可以预览转换后的数据外观,所以请继续尝试!您还可以启用直方图图表,以更好地了解数据中值的分布。

    例如,您可能希望按产品 ID 对数据进行分组,并计算每个产品的总销售量及其平均价格。或者,您可能希望查看单个客户的行为,并计算每个客户的总消费额以及他们购买的不同产品类别数量。或者,您可能希望考虑货币或地理位置。您可以转换和解释这些数据的最有趣的方式是什么?

    转到 Kibana 中的管理 > 堆栈管理 > 数据 > 转换,并使用向导创建转换

    Creating a simple transform in Kibana

    按客户 ID 对数据进行分组,并添加一个或多个聚合,以了解有关每个客户订单的更多信息。例如,让我们计算他们购买的产品总数、购买的总价格、他们在单个订单中购买的最大产品数量以及他们的订单总数。我们将通过在 total_quantitytaxless_total_price 字段上使用 sum 聚合,在 total_quantity 字段上使用 max 聚合,以及在 order_id 字段上使用 cardinality 聚合来实现此目的

    Adding multiple aggregations to a transform in Kibana

    如果您对数据的子集感兴趣,您可以选择包含一个 查询元素。在本示例中,我们过滤了数据,因此我们只查看 currencyEUR 的订单。或者,我们也可以按该字段对数据进行分组。如果您想使用更复杂的查询,您可以从 保存的搜索创建数据帧。

    如果您愿意,可以使用预览转换 API

    API 示例
    resp = client.transform.preview_transform(
        source={
            "index": "kibana_sample_data_ecommerce",
            "query": {
                "bool": {
                    "filter": {
                        "term": {
                            "currency": "EUR"
                        }
                    }
                }
            }
        },
        pivot={
            "group_by": {
                "customer_id": {
                    "terms": {
                        "field": "customer_id"
                    }
                }
            },
            "aggregations": {
                "total_quantity.sum": {
                    "sum": {
                        "field": "total_quantity"
                    }
                },
                "taxless_total_price.sum": {
                    "sum": {
                        "field": "taxless_total_price"
                    }
                },
                "total_quantity.max": {
                    "max": {
                        "field": "total_quantity"
                    }
                },
                "order_id.cardinality": {
                    "cardinality": {
                        "field": "order_id"
                    }
                }
            }
        },
    )
    print(resp)
    const response = await client.transform.previewTransform({
      source: {
        index: "kibana_sample_data_ecommerce",
        query: {
          bool: {
            filter: {
              term: {
                currency: "EUR",
              },
            },
          },
        },
      },
      pivot: {
        group_by: {
          customer_id: {
            terms: {
              field: "customer_id",
            },
          },
        },
        aggregations: {
          "total_quantity.sum": {
            sum: {
              field: "total_quantity",
            },
          },
          "taxless_total_price.sum": {
            sum: {
              field: "taxless_total_price",
            },
          },
          "total_quantity.max": {
            max: {
              field: "total_quantity",
            },
          },
          "order_id.cardinality": {
            cardinality: {
              field: "order_id",
            },
          },
        },
      },
    });
    console.log(response);
    POST _transform/_preview
    {
      "source": {
        "index": "kibana_sample_data_ecommerce",
        "query": {
          "bool": {
            "filter": {
              "term": {"currency": "EUR"}
            }
          }
        }
      },
      "pivot": {
        "group_by": {
          "customer_id": {
            "terms": {
              "field": "customer_id"
            }
          }
        },
        "aggregations": {
          "total_quantity.sum": {
            "sum": {
              "field": "total_quantity"
            }
          },
          "taxless_total_price.sum": {
            "sum": {
              "field": "taxless_total_price"
            }
          },
          "total_quantity.max": {
            "max": {
              "field": "total_quantity"
            }
          },
          "order_id.cardinality": {
            "cardinality": {
              "field": "order_id"
            }
          }
        }
      }
    }
  4. 当您对预览中看到的内容感到满意时,创建转换。

    1. 提供转换 ID、目标索引的名称以及可选的描述。如果目标索引不存在,则在启动转换时会自动创建该索引。
    2. 确定您希望转换运行一次还是连续运行。由于此示例数据索引是不变的,因此让我们使用默认行为,仅运行一次转换。但是,如果您想尝试一下,请继续单击连续模式。您必须选择一个转换可以用来检查哪些实体已更改的字段。一般来说,最好使用 ingest 时间戳字段。但是,在本示例中,您可以使用 order_date 字段。
    3. (可选)您可以配置应用于转换的保留策略。选择一个用于标识目标索引中旧文档的日期字段,并提供最大期限。早于配置值的文档将从目标索引中删除。
    Adding transfrom ID and retention policy to a transform in Kibana

    在 Kibana 中,在完成转换创建之前,您可以将预览转换 API 请求复制到剪贴板。当您决定是否要手动创建目标索引时,此信息稍后会很有用。

    Copy the Dev Console statement of the transform preview to the clipboard

    如果您愿意,可以使用创建转换 API

    API 示例
    resp = client.transform.put_transform(
        transform_id="ecommerce-customer-transform",
        source={
            "index": [
                "kibana_sample_data_ecommerce"
            ],
            "query": {
                "bool": {
                    "filter": {
                        "term": {
                            "currency": "EUR"
                        }
                    }
                }
            }
        },
        pivot={
            "group_by": {
                "customer_id": {
                    "terms": {
                        "field": "customer_id"
                    }
                }
            },
            "aggregations": {
                "total_quantity.sum": {
                    "sum": {
                        "field": "total_quantity"
                    }
                },
                "taxless_total_price.sum": {
                    "sum": {
                        "field": "taxless_total_price"
                    }
                },
                "total_quantity.max": {
                    "max": {
                        "field": "total_quantity"
                    }
                },
                "order_id.cardinality": {
                    "cardinality": {
                        "field": "order_id"
                    }
                }
            }
        },
        dest={
            "index": "ecommerce-customers"
        },
        retention_policy={
            "time": {
                "field": "order_date",
                "max_age": "60d"
            }
        },
    )
    print(resp)
    const response = await client.transform.putTransform({
      transform_id: "ecommerce-customer-transform",
      source: {
        index: ["kibana_sample_data_ecommerce"],
        query: {
          bool: {
            filter: {
              term: {
                currency: "EUR",
              },
            },
          },
        },
      },
      pivot: {
        group_by: {
          customer_id: {
            terms: {
              field: "customer_id",
            },
          },
        },
        aggregations: {
          "total_quantity.sum": {
            sum: {
              field: "total_quantity",
            },
          },
          "taxless_total_price.sum": {
            sum: {
              field: "taxless_total_price",
            },
          },
          "total_quantity.max": {
            max: {
              field: "total_quantity",
            },
          },
          "order_id.cardinality": {
            cardinality: {
              field: "order_id",
            },
          },
        },
      },
      dest: {
        index: "ecommerce-customers",
      },
      retention_policy: {
        time: {
          field: "order_date",
          max_age: "60d",
        },
      },
    });
    console.log(response);
    PUT _transform/ecommerce-customer-transform
    {
      "source": {
        "index": [
          "kibana_sample_data_ecommerce"
        ],
        "query": {
          "bool": {
            "filter": {
              "term": {
                "currency": "EUR"
              }
            }
          }
        }
      },
      "pivot": {
        "group_by": {
          "customer_id": {
            "terms": {
              "field": "customer_id"
            }
          }
        },
        "aggregations": {
          "total_quantity.sum": {
            "sum": {
              "field": "total_quantity"
            }
          },
          "taxless_total_price.sum": {
            "sum": {
              "field": "taxless_total_price"
            }
          },
          "total_quantity.max": {
            "max": {
              "field": "total_quantity"
            }
          },
          "order_id.cardinality": {
            "cardinality": {
              "field": "order_id"
            }
          }
        }
      },
      "dest": {
        "index": "ecommerce-customers"
      },
      "retention_policy": {
        "time": {
          "field": "order_date",
          "max_age": "60d"
        }
      }
    }
  5. 可选:创建目标索引。

    如果目标索引不存在,则会在您第一次启动转换时创建该索引。透视转换从源索引和转换聚合中推断出目标索引的映射。如果目标索引中有从脚本派生的字段(例如,如果您使用 scripted_metricsbucket_scripts 聚合),则会使用动态映射创建它们。您可以使用预览转换 API 来预览它将用于目标索引的映射。在 Kibana 中,如果您已将 API 请求复制到剪贴板,请将其粘贴到控制台中,然后参考 API 响应中的 generated_dest_index 对象。

    与 Kibana 中提供的选项相比,转换可能具有 API 提供的更多配置选项。例如,您可以通过调用 创建转换来为 dest 设置 ingest 管道。有关所有转换配置选项,请参阅文档

    API 示例
    {
      "preview" : [
        {
          "total_quantity" : {
            "max" : 2,
            "sum" : 118.0
          },
          "taxless_total_price" : {
            "sum" : 3946.9765625
          },
          "customer_id" : "10",
          "order_id" : {
            "cardinality" : 59
          }
        },
        ...
      ],
      "generated_dest_index" : {
        "mappings" : {
          "_meta" : {
            "_transform" : {
              "transform" : "transform-preview",
              "version" : {
                "created" : "8.0.0"
              },
              "creation_date_in_millis" : 1621991264061
            },
            "created_by" : "transform"
          },
          "properties" : {
            "total_quantity.sum" : {
              "type" : "double"
            },
            "total_quantity" : {
              "type" : "object"
            },
            "taxless_total_price" : {
              "type" : "object"
            },
            "taxless_total_price.sum" : {
              "type" : "double"
            },
            "order_id.cardinality" : {
              "type" : "long"
            },
            "customer_id" : {
              "type" : "keyword"
            },
            "total_quantity.max" : {
              "type" : "integer"
            },
            "order_id" : {
              "type" : "object"
            }
          }
        },
        "settings" : {
          "index" : {
            "number_of_shards" : "1",
            "auto_expand_replicas" : "0-1"
          }
        },
        "aliases" : { }
      }
    }

    在某些情况下,推断出的映射可能与实际数据不兼容。例如,可能会发生数值溢出,或者动态映射的字段可能同时包含数字和字符串。为避免此问题,请在启动转换之前创建目标索引。有关更多信息,请参阅创建索引 API

    API 示例

    您可以使用转换预览中的信息来创建目标索引。例如

    resp = client.indices.create(
        index="ecommerce-customers",
        mappings={
            "properties": {
                "total_quantity.sum": {
                    "type": "double"
                },
                "total_quantity": {
                    "type": "object"
                },
                "taxless_total_price": {
                    "type": "object"
                },
                "taxless_total_price.sum": {
                    "type": "double"
                },
                "order_id.cardinality": {
                    "type": "long"
                },
                "customer_id": {
                    "type": "keyword"
                },
                "total_quantity.max": {
                    "type": "integer"
                },
                "order_id": {
                    "type": "object"
                }
            }
        },
    )
    print(resp)
    response = client.indices.create(
      index: 'ecommerce-customers',
      body: {
        mappings: {
          properties: {
            'total_quantity.sum' => {
              type: 'double'
            },
            total_quantity: {
              type: 'object'
            },
            taxless_total_price: {
              type: 'object'
            },
            'taxless_total_price.sum' => {
              type: 'double'
            },
            'order_id.cardinality' => {
              type: 'long'
            },
            customer_id: {
              type: 'keyword'
            },
            'total_quantity.max' => {
              type: 'integer'
            },
            order_id: {
              type: 'object'
            }
          }
        }
      }
    )
    puts response
    const response = await client.indices.create({
      index: "ecommerce-customers",
      mappings: {
        properties: {
          "total_quantity.sum": {
            type: "double",
          },
          total_quantity: {
            type: "object",
          },
          taxless_total_price: {
            type: "object",
          },
          "taxless_total_price.sum": {
            type: "double",
          },
          "order_id.cardinality": {
            type: "long",
          },
          customer_id: {
            type: "keyword",
          },
          "total_quantity.max": {
            type: "integer",
          },
          order_id: {
            type: "object",
          },
        },
      },
    });
    console.log(response);
    PUT /ecommerce-customers
    {
      "mappings": {
        "properties": {
          "total_quantity.sum" : {
            "type" : "double"
          },
          "total_quantity" : {
            "type" : "object"
          },
          "taxless_total_price" : {
            "type" : "object"
          },
          "taxless_total_price.sum" : {
            "type" : "double"
          },
          "order_id.cardinality" : {
            "type" : "long"
          },
          "customer_id" : {
            "type" : "keyword"
          },
          "total_quantity.max" : {
            "type" : "integer"
          },
          "order_id" : {
            "type" : "object"
          }
        }
      }
    }
  6. 启动转换。

    即使资源利用率会根据集群负载自动调整,但转换在运行时也会增加集群的搜索和索引负载。但是,如果您遇到过多的负载,可以停止它。

    您可以在 Kibana 中启动、停止、重置和管理转换

    Managing transforms in Kibana

    或者,您可以使用启动转换停止转换重置转换API。

    如果您重置转换,则所有检查点、状态和目标索引(如果它是由转换创建的)都将被删除。转换已准备好再次启动,就像刚刚创建一样。

    API 示例
    resp = client.transform.start_transform(
        transform_id="ecommerce-customer-transform",
    )
    print(resp)
    response = client.transform.start_transform(
      transform_id: 'ecommerce-customer-transform'
    )
    puts response
    const response = await client.transform.startTransform({
      transform_id: "ecommerce-customer-transform",
    });
    console.log(response);
    POST _transform/ecommerce-customer-transform/_start

    如果您选择批量转换,则它是具有单个检查点的单个操作。它完成后您无法重新启动它。连续转换的不同之处在于,它们会随着新源数据的提取而不断递增和处理检查点。

  7. 探索新索引中的数据。

    例如,使用 Kibana 中的发现应用程序

    Exploring the new index in Kibana
  8. 可选:创建另一个转换,这次使用 latest 方法。

    此方法使用每个唯一键值的最新文档填充目标索引。例如,您可能想查找每个客户或每个国家/地区的最新订单(按 order_date 字段排序)。

    Creating a latest transform in Kibana
    API 示例
    resp = client.transform.preview_transform(
        source={
            "index": "kibana_sample_data_ecommerce",
            "query": {
                "bool": {
                    "filter": {
                        "term": {
                            "currency": "EUR"
                        }
                    }
                }
            }
        },
        latest={
            "unique_key": [
                "geoip.country_iso_code",
                "geoip.region_name"
            ],
            "sort": "order_date"
        },
    )
    print(resp)
    const response = await client.transform.previewTransform({
      source: {
        index: "kibana_sample_data_ecommerce",
        query: {
          bool: {
            filter: {
              term: {
                currency: "EUR",
              },
            },
          },
        },
      },
      latest: {
        unique_key: ["geoip.country_iso_code", "geoip.region_name"],
        sort: "order_date",
      },
    });
    console.log(response);
    POST _transform/_preview
    {
      "source": {
        "index": "kibana_sample_data_ecommerce",
        "query": {
          "bool": {
            "filter": {
              "term": {"currency": "EUR"}
            }
          }
        }
      },
      "latest": {
        "unique_key": ["geoip.country_iso_code", "geoip.region_name"],
        "sort": "order_date"
      }
    }

    如果目标索引不存在,则会在您第一次启动转换时创建该索引。但是,与透视转换不同,最新转换在创建索引时不会推断映射定义。相反,它们使用动态映射。要使用显式映射,请在启动转换之前创建目标索引。

  9. 如果您不想保留转换,可以在 Kibana 中删除它,或使用删除转换 API。默认情况下,当您删除转换时,其目标索引和 Kibana 索引模式将保留。

现在您已经为 Kibana 示例数据创建了简单的转换,请考虑您自己数据的可能用例。有关更多想法,请参阅何时使用转换示例