Watcher 条件上下文

编辑

使用 Painless 脚本作为 监视条件,用于确定是否执行监视或监视中的特定操作。条件脚本返回布尔值以指示条件的状态。

以下变量在所有 Watcher 上下文中可用。

变量

params (Map,只读)
作为查询的一部分传递的用户定义参数。
ctx['watch_id'] (String,只读)
监视的 ID。
ctx['id'] (String,只读)
服务器为运行监视生成的唯一标识符。
ctx['metadata'] (Map,只读)
元数据可以添加到监视定义的顶层。这是用户定义的,通常用于合并监视中的重复值。
ctx['execution_time'] (ZonedDateTime,只读)
监视开始执行的时间。
ctx['trigger']['scheduled_time'] (ZonedDateTime,只读)
监视的计划触发时间。这是监视应执行的时间。
ctx['trigger']['triggered_time'] (ZonedDateTime,只读)
监视的实际触发时间。这是触发监视执行的时间。
ctx['payload'] (Map,只读)
基于 监视输入 的可访问监视数据。

返回值

布尔值
如果条件满足,则期望 true,如果不满足,则期望 false

API

标准的 Painless API 可用。

示例

要运行示例,请首先按照 上下文示例 中的步骤操作。

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.aggregations.theatres.buckets.stream()       
          .filter(theatre -> theatre.money.value < 15000 ||
                             theatre.money.value > 50000)               
          .count() > 0                                                  
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The output of the search was : {{ctx.payload.aggregations.theatres.buckets}}"
        }
      }
    }
  }
}

条件中使用了 Java Stream API。此 API 允许以管道方式操作列表的元素。

流过滤器会移除不满足过滤器条件的项目。

如果列表中至少有一个项目,则条件评估为 true,并且执行监视。

以下操作条件脚本根据数据集中戏剧的售出座位数控制 my_log 操作的执行。该脚本聚合每个戏剧的售出座位总数,如果至少有一部戏剧售出超过 10,000 美元,则返回 true。

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "size": 0,
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": {
                      "field" : "cost",
                      "script": {
                       "source": "doc.cost.value * doc.number.value"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    },
    "actions" : {
      "my_log" : {
        "condition": {                                                
          "script" :
          """
            return ctx.payload.aggregations.theatres.buckets.stream()
              .anyMatch(theatre -> theatre.money.value > 10000)       
          """
        },
        "logging" : {
          "text" : "At least one play has grossed over $10,000: {{ctx.payload.aggregations.theatres.buckets}}"
        }
      }
    }
  }
}

此示例使用与上一个示例几乎相同的条件。以下差异很细微,值得指出来。

条件的位置不再位于顶层,而是在单个操作内。

使用 anyMatch 而不是过滤器来返回布尔值。