Watcher 条件上下文

编辑

使用 Painless 脚本作为watch 条件,以确定是否执行 watch 或 watch 中的特定操作。条件脚本返回一个布尔值,以指示条件的状态。

以下变量在所有 watcher 上下文中都可用。

变量

params ( Map,只读)
作为查询一部分传入的用户定义的参数。
ctx['watch_id'] ( String,只读)
watch 的 ID。
ctx['id'] ( String,只读)
服务器为运行 watch 生成的唯一标识符。
ctx['metadata'] ( Map,只读)
元数据可以添加到 watch 定义的顶层。这是用户定义的,通常用于合并 watch 中的重复值。
ctx['execution_time'] ( ZonedDateTime,只读)
watch 开始执行的时间。
ctx['trigger']['scheduled_time'] ( ZonedDateTime,只读)
watch 的计划触发时间。这是应该执行 watch 的时间。
ctx['trigger']['triggered_time'] ( ZonedDateTime,只读)
watch 的实际触发时间。这是触发执行 watch 的时间。
ctx['payload'] ( Map,只读)
基于watch 输入的可访问 watch 数据。

返回

boolean
如果满足条件,则期望返回 true,如果不满足条件,则返回 false

API

可以使用标准的 Painless API

示例

要运行示例,请首先按照上下文示例中的步骤操作。

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.aggregations.theatres.buckets.stream()       
          .filter(theatre -> theatre.money.value < 15000 ||
                             theatre.money.value > 50000)               
          .count() > 0                                                  
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The output of the search was : {{ctx.payload.aggregations.theatres.buckets}}"
        }
      }
    }
  }
}

Java Stream API 用于条件中。 此 API 允许在管道中操作列表的元素。

流过滤器删除不符合过滤条件的项。

如果列表中至少有一个项,则条件评估为 true,并且执行 watch。

以下操作条件脚本根据数据集中戏剧的售票数量控制 my_log 操作的执行。 该脚本汇总每个戏剧的总售票数,如果至少有一个戏剧的售票额超过 $10,000,则返回 true。

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "size": 0,
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": {
                      "field" : "cost",
                      "script": {
                       "source": "doc.cost.value * doc.number.value"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    },
    "actions" : {
      "my_log" : {
        "condition": {                                                
          "script" :
          """
            return ctx.payload.aggregations.theatres.buckets.stream()
              .anyMatch(theatre -> theatre.money.value > 10000)       
          """
        },
        "logging" : {
          "text" : "At least one play has grossed over $10,000: {{ctx.payload.aggregations.theatres.buckets}}"
        }
      }
    }
  }
}

此示例使用了与上一个示例几乎相同的条件。 以下差异很细微,值得指出。

条件的位置不再位于顶层,而是在单个操作内。

不是使用过滤器,而是使用 anyMatch 来返回布尔值