脚本度量聚合

编辑

一种使用脚本执行以提供度量输出的度量聚合。

scripted_metric 在 Elastic Cloud Serverless 中不可用。

使用脚本可能会导致搜索速度变慢。请参阅 脚本、缓存和搜索速度

示例

resp = client.search(
    index="ledger",
    size="0",
    query={
        "match_all": {}
    },
    aggs={
        "profit": {
            "scripted_metric": {
                "init_script": "state.transactions = []",
                "map_script": "state.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)",
                "combine_script": "double profit = 0; for (t in state.transactions) { profit += t } return profit",
                "reduce_script": "double profit = 0; for (a in states) { profit += a } return profit"
            }
        }
    },
)
print(resp)
response = client.search(
  index: 'ledger',
  size: 0,
  body: {
    query: {
      match_all: {}
    },
    aggregations: {
      profit: {
        scripted_metric: {
          init_script: 'state.transactions = []',
          map_script: "state.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)",
          combine_script: 'double profit = 0; for (t in state.transactions) { profit += t } return profit',
          reduce_script: 'double profit = 0; for (a in states) { profit += a } return profit'
        }
      }
    }
  }
)
puts response
const response = await client.search({
  index: "ledger",
  size: 0,
  query: {
    match_all: {},
  },
  aggs: {
    profit: {
      scripted_metric: {
        init_script: "state.transactions = []",
        map_script:
          "state.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)",
        combine_script:
          "double profit = 0; for (t in state.transactions) { profit += t } return profit",
        reduce_script:
          "double profit = 0; for (a in states) { profit += a } return profit",
      },
    },
  },
});
console.log(response);
POST ledger/_search?size=0
{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "profit": {
      "scripted_metric": {
        "init_script": "state.transactions = []", 
        "map_script": "state.transactions.add(doc.type.value == 'sale' ? doc.amount.value : -1 * doc.amount.value)",
        "combine_script": "double profit = 0; for (t in state.transactions) { profit += t } return profit",
        "reduce_script": "double profit = 0; for (a in states) { profit += a } return profit"
      }
    }
  }
}

init_script 是一个可选参数,所有其他脚本都是必需的。

上面的聚合演示了如何使用脚本聚合来计算销售和成本交易的总利润。

上面聚合的响应

{
  "took": 218,
  ...
  "aggregations": {
    "profit": {
      "value": 240.0
    }
  }
}

上面的示例也可以使用存储的脚本指定,如下所示

resp = client.search(
    index="ledger",
    size="0",
    aggs={
        "profit": {
            "scripted_metric": {
                "init_script": {
                    "id": "my_init_script"
                },
                "map_script": {
                    "id": "my_map_script"
                },
                "combine_script": {
                    "id": "my_combine_script"
                },
                "params": {
                    "field": "amount"
                },
                "reduce_script": {
                    "id": "my_reduce_script"
                }
            }
        }
    },
)
print(resp)
response = client.search(
  index: 'ledger',
  size: 0,
  body: {
    aggregations: {
      profit: {
        scripted_metric: {
          init_script: {
            id: 'my_init_script'
          },
          map_script: {
            id: 'my_map_script'
          },
          combine_script: {
            id: 'my_combine_script'
          },
          params: {
            field: 'amount'
          },
          reduce_script: {
            id: 'my_reduce_script'
          }
        }
      }
    }
  }
)
puts response
const response = await client.search({
  index: "ledger",
  size: 0,
  aggs: {
    profit: {
      scripted_metric: {
        init_script: {
          id: "my_init_script",
        },
        map_script: {
          id: "my_map_script",
        },
        combine_script: {
          id: "my_combine_script",
        },
        params: {
          field: "amount",
        },
        reduce_script: {
          id: "my_reduce_script",
        },
      },
    },
  },
});
console.log(response);
POST ledger/_search?size=0
{
  "aggs": {
    "profit": {
      "scripted_metric": {
        "init_script": {
          "id": "my_init_script"
        },
        "map_script": {
          "id": "my_map_script"
        },
        "combine_script": {
          "id": "my_combine_script"
        },
        "params": {
          "field": "amount"           
        },
        "reduce_script": {
          "id": "my_reduce_script"
        }
      }
    }
  }
}

initmapcombine 脚本的脚本参数必须在全局 params 对象中指定,以便可以在脚本之间共享。

有关指定脚本的更多详细信息,请参阅 脚本文档

允许的返回类型

编辑

虽然可以在单个脚本中使用任何有效的脚本对象,但脚本必须仅返回或存储在 state 对象中以下类型

  • 原始类型
  • 字符串
  • 映射(仅包含此处列出的类型的键和值)
  • 数组(仅包含此处列出的类型的元素)

脚本范围

编辑

脚本度量聚合在其执行的 4 个阶段中使用脚本

init_script

在收集任何文档之前执行。允许聚合设置任何初始状态。

在上面的示例中,init_scriptstate 对象中创建一个数组 transactions

map_script

每个收集的文档执行一次。这是一个必需的脚本。

在上面的示例中,map_script 检查 type 字段的值。如果值为sale,则将 amount 字段的值添加到 transactions 数组。如果 type 字段的值不是sale,则将 amount 字段的否定值添加到 transactions。

combine_script

在文档收集完成后,在每个分片上执行一次。这是一个必需的脚本。允许聚合合并从每个分片返回的状态。

在上面的示例中,combine_script 遍历所有存储的 transactions,将值求和到 profit 变量中,最后返回 profit

reduce_script

在所有分片返回其结果后,在协调节点上执行一次。这是一个必需的脚本。该脚本可以访问一个变量 states,该变量是每个分片上的 combine_script 结果的数组。

在上面的示例中,reduce_script 遍历每个分片返回的 profit,将值求和,然后返回最终组合利润,该利润将返回到聚合的响应中。

工作示例

编辑

假设您将以下文档索引到具有 2 个分片的索引中

resp = client.bulk(
    index="transactions",
    refresh=True,
    operations=[
        {
            "index": {
                "_id": 1
            }
        },
        {
            "type": "sale",
            "amount": 80
        },
        {
            "index": {
                "_id": 2
            }
        },
        {
            "type": "cost",
            "amount": 10
        },
        {
            "index": {
                "_id": 3
            }
        },
        {
            "type": "cost",
            "amount": 30
        },
        {
            "index": {
                "_id": 4
            }
        },
        {
            "type": "sale",
            "amount": 130
        }
    ],
)
print(resp)
response = client.bulk(
  index: 'transactions',
  refresh: true,
  body: [
    {
      index: {
        _id: 1
      }
    },
    {
      type: 'sale',
      amount: 80
    },
    {
      index: {
        _id: 2
      }
    },
    {
      type: 'cost',
      amount: 10
    },
    {
      index: {
        _id: 3
      }
    },
    {
      type: 'cost',
      amount: 30
    },
    {
      index: {
        _id: 4
      }
    },
    {
      type: 'sale',
      amount: 130
    }
  ]
)
puts response
const response = await client.bulk({
  index: "transactions",
  refresh: "true",
  operations: [
    {
      index: {
        _id: 1,
      },
    },
    {
      type: "sale",
      amount: 80,
    },
    {
      index: {
        _id: 2,
      },
    },
    {
      type: "cost",
      amount: 10,
    },
    {
      index: {
        _id: 3,
      },
    },
    {
      type: "cost",
      amount: 30,
    },
    {
      index: {
        _id: 4,
      },
    },
    {
      type: "sale",
      amount: 130,
    },
  ],
});
console.log(response);
PUT /transactions/_bulk?refresh
{"index":{"_id":1}}
{"type": "sale","amount": 80}
{"index":{"_id":2}}
{"type": "cost","amount": 10}
{"index":{"_id":3}}
{"type": "cost","amount": 30}
{"index":{"_id":4}}
{"type": "sale","amount": 130}

假设文档 1 和 3 最终位于分片 A 上,文档 2 和 4 最终位于分片 B 上。以下是上述示例中每个阶段的聚合结果的分解。

在 init_script 之前

编辑

state 初始化为一个新的空对象。

"state" : {}

在 init_script 之后

编辑

这在每个分片上运行一次,在执行任何文档收集之前,因此我们将在每个分片上都有一个副本

分片 A
"state" : {
    "transactions" : []
}
分片 B
"state" : {
    "transactions" : []
}

在 map_script 之后

编辑

每个分片收集其文档,并在每个收集的文档上运行 map_script

分片 A
"state" : {
    "transactions" : [ 80, -30 ]
}
分片 B
"state" : {
    "transactions" : [ -10, 130 ]
}

在 combine_script 之后

编辑

在文档收集完成后,在每个分片上执行 combine_script,并将所有 transactions 减少为每个分片的单个利润数字(通过对 transactions 数组中的值求和),该数字将传递回协调节点

分片 A
50
分片 B
120

在 reduce_script 之后

编辑

reduce_script 接收一个 states 数组,其中包含每个分片的 combine script 的结果

"states" : [
    50,
    120
]

它将分片响应减少为最终的总体利润数字(通过对值求和),并将其作为聚合的结果返回以生成响应

{
  ...

  "aggregations": {
    "profit": {
      "value": 170
    }
  }
}

其他参数

编辑

params

可选。一个对象,其内容将作为变量传递给 init_scriptmap_scriptcombine_script。这对于允许用户控制聚合的行为以及在脚本之间存储状态非常有用。如果未指定,则默认值等效于提供

"params" : {}

空桶

编辑

如果脚本度量聚合的父存储桶未收集任何文档,则将从分片返回一个空的聚合响应,其值为 null。在这种情况下,reduce_scriptstates 变量将包含来自该分片的 null 作为响应。reduce_script 因此应预期并处理来自分片的 null 响应。