Watcher 变换上下文
编辑Watcher 变换上下文
编辑使用 Painless 脚本作为 watch 变换 将有效负载转换为新的有效负载,以便在 watch 中进一步使用。变换脚本返回新有效负载的 Object 值。
以下变量在所有 watcher 上下文中可用。
变量
-
params
(Map
,只读) - 作为查询一部分传递的用户定义参数。
-
ctx['watch_id']
(String
,只读) - watch 的 ID。
-
ctx['id']
(String
,只读) - 服务器为运行 watch 生成的唯一标识符。
-
ctx['metadata']
(Map
,只读) - 元数据可以添加到 watch 定义的顶层。这是用户定义的,通常用于整合 watch 中重复的值。
-
ctx['execution_time']
(ZonedDateTime
,只读) - watch 开始执行的时间。
-
ctx['trigger']['scheduled_time']
(ZonedDateTime
,只读) - watch 的计划触发时间。这是 watch 应该执行的时间。
-
ctx['trigger']['triggered_time']
(ZonedDateTime
,只读) - watch 的实际触发时间。这是 watch 被触发执行的时间。
-
ctx['payload']
(Map
,只读) - 基于 watch 输入 的可访问 watch 数据。
返回值
-
Object
- 新的有效负载。
API
标准的 Painless API 可用。
示例
要运行示例,请首先按照 上下文示例 中的步骤操作。
POST _watcher/watch/_execute { "watch" : { "trigger" : { "schedule" : { "interval" : "24h" } }, "input" : { "search" : { "request" : { "indices" : [ "seats" ], "body" : { "query" : { "term": { "sold": "true"} }, "aggs" : { "theatres" : { "terms" : { "field" : "play" }, "aggs" : { "money" : { "sum": { "field" : "cost" } } } } } } } } }, "transform" : { "script": """ return [ 'money_makers': ctx.payload.aggregations.theatres.buckets.stream() .filter(t -> { return t.money.value > 50000 }) .map(t -> { return ['play': t.key, 'total_value': t.money.value ] }).collect(Collectors.toList()), 'duds' : ctx.payload.aggregations.theatres.buckets.stream() .filter(t -> { return t.money.value < 15000 }) .map(t -> { return ['play': t.key, 'total_value': t.money.value ] }).collect(Collectors.toList()) ] """ }, "actions" : { "my_log" : { "logging" : { "text" : "The output of the payload was transformed to {{ctx.payload}}" } } } } }
变换中使用了 Java Stream API。此 API 允许以管道方式操作列表的元素。 |
|
流过滤器删除不满足过滤器条件的项目。 |
|
流映射将每个元素转换为一个新对象。 |
|
收集器将流缩减为 |
|
这在变换中的第二组值中再次执行。 |
以下操作变换将 mod_log 操作中的每个值更改为 String
。此变换不会更改 unmod_log 操作中的值。
POST _watcher/watch/_execute { "watch" : { "trigger" : { "schedule" : { "interval" : "24h" } }, "input" : { "search" : { "request" : { "indices" : [ "seats" ], "body" : { "query" : { "term": { "sold": "true"} }, "aggs" : { "theatres" : { "terms" : { "field" : "play" }, "aggs" : { "money" : { "sum": { "field" : "cost" } } } } } } } } }, "actions" : { "mod_log" : { "transform": { "script" : """ def formatter = NumberFormat.getCurrencyInstance(); return [ 'msg': ctx.payload.aggregations.theatres.buckets.stream() .map(t-> formatter.format(t.money.value) + ' for the play ' + t.key) .collect(Collectors.joining(", ")) ] """ }, "logging" : { "text" : "The output of the payload was transformed to: {{ctx.payload.msg}}" } }, "unmod_log" : { "logging" : { "text" : "The output of the payload was not transformed and this value should not exist: {{ctx.payload.msg}}" } } } } }
此示例以非常类似的方式使用流式 API。以下差异很细微,值得指出。
以下示例显示了完整 watch 上下文中的脚本化 watch 和操作变换。此 watch 还使用脚本化的 条件。
POST _watcher/watch/_execute { "watch" : { "metadata" : { "high_threshold": 4000, "low_threshold": 1000 }, "trigger" : { "schedule" : { "interval" : "24h" } }, "input" : { "search" : { "request" : { "indices" : [ "seats" ], "body" : { "query" : { "term": { "sold": "true"} }, "aggs" : { "theatres" : { "terms" : { "field" : "play" }, "aggs" : { "money" : { "sum": { "field" : "cost", "script": { "source": "doc.cost.value * doc.number.value" } } } } } } } } } }, "condition" : { "script" : """ return ctx.payload.aggregations.theatres.buckets.stream() .anyMatch(theatre -> theatre.money.value < ctx.metadata.low_threshold || theatre.money.value > ctx.metadata.high_threshold) """ }, "transform" : { "script": """ return [ 'money_makers': ctx.payload.aggregations.theatres.buckets.stream() .filter(t -> { return t.money.value > ctx.metadata.high_threshold }) .map(t -> { return ['play': t.key, 'total_value': t.money.value ] }).collect(Collectors.toList()), 'duds' : ctx.payload.aggregations.theatres.buckets.stream() .filter(t -> { return t.money.value < ctx.metadata.low_threshold }) .map(t -> { return ['play': t.key, 'total_value': t.money.value ] }).collect(Collectors.toList()) ] """ }, "actions" : { "log_money_makers" : { "condition": { "script" : "return ctx.payload.money_makers.size() > 0" }, "transform": { "script" : """ def formatter = NumberFormat.getCurrencyInstance(); return [ 'plays_value': ctx.payload.money_makers.stream() .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play) .collect(Collectors.joining(", ")) ] """ }, "logging" : { "text" : "The following plays contain the highest grossing total income: {{ctx.payload.plays_value}}" } }, "log_duds" : { "condition": { "script" : "return ctx.payload.duds.size() > 0" }, "transform": { "script" : """ def formatter = NumberFormat.getCurrencyInstance(); return [ 'plays_value': ctx.payload.duds.stream() .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play) .collect(Collectors.joining(", ")) ] """ }, "logging" : { "text" : "The following plays need more advertising due to their low total income: {{ctx.payload.plays_value}}" } } } } }
以下示例显示了元数据的使用以及将日期转换为可读格式。
POST _watcher/watch/_execute { "watch" : { "metadata" : { "min_hits": 10 }, "trigger" : { "schedule" : { "interval" : "24h" } }, "input" : { "search" : { "request" : { "indices" : [ "seats" ], "body" : { "query" : { "term": { "sold": "true"} }, "aggs" : { "theatres" : { "terms" : { "field" : "play" }, "aggs" : { "money" : { "sum": { "field" : "cost" } } } } } } } } }, "condition" : { "script" : """ return ctx.payload.hits.total > ctx.metadata.min_hits """ }, "transform" : { "script" : """ def theDate = ZonedDateTime.ofInstant(ctx.execution_time.toInstant(), ctx.execution_time.getZone()); return ['human_date': DateTimeFormatter.RFC_1123_DATE_TIME.format(theDate), 'aggregations': ctx.payload.aggregations] """ }, "actions" : { "my_log" : { "logging" : { "text" : "The watch was successfully executed on {{ctx.payload.human_date}} and contained {{ctx.payload.aggregations.theatres.buckets.size}} buckets" } } } } }