Watcher 变换上下文

编辑

使用 Painless 脚本作为 watch 变换 将有效负载转换为新的有效负载,以便在 watch 中进一步使用。变换脚本返回新有效负载的 Object 值。

以下变量在所有 watcher 上下文中可用。

变量

params (Map,只读)
作为查询一部分传递的用户定义参数。
ctx['watch_id'] (String,只读)
watch 的 ID。
ctx['id'] (String,只读)
服务器为运行 watch 生成的唯一标识符。
ctx['metadata'] (Map,只读)
元数据可以添加到 watch 定义的顶层。这是用户定义的,通常用于整合 watch 中重复的值。
ctx['execution_time'] (ZonedDateTime,只读)
watch 开始执行的时间。
ctx['trigger']['scheduled_time'] (ZonedDateTime,只读)
watch 的计划触发时间。这是 watch 应该执行的时间。
ctx['trigger']['triggered_time'] (ZonedDateTime,只读)
watch 的实际触发时间。这是 watch 被触发执行的时间。
ctx['payload'] (Map,只读)
基于 watch 输入 的可访问 watch 数据。

返回值

Object
新的有效负载。

API

标准的 Painless API 可用。

示例

要运行示例,请首先按照 上下文示例 中的步骤操作。

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : { "term": { "sold": "true"} },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "transform" : {
      "script":
      """
        return [
          'money_makers': ctx.payload.aggregations.theatres.buckets.stream()  
            .filter(t -> {                                                    
                return t.money.value > 50000
            })
            .map(t -> {                                                       
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList()),                                  
          'duds' : ctx.payload.aggregations.theatres.buckets.stream()         
            .filter(t -> {
                return t.money.value < 15000
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList())
          ]
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The output of the payload was transformed to {{ctx.payload}}"
        }
      }
    }
  }
}

变换中使用了 Java Stream API。此 API 允许以管道方式操作列表的元素。

流过滤器删除不满足过滤器条件的项目。

流映射将每个元素转换为一个新对象。

收集器将流缩减为 java.util.List

这在变换中的第二组值中再次执行。

以下操作变换将 mod_log 操作中的每个值更改为 String。此变换不会更改 unmod_log 操作中的值。

POST _watcher/watch/_execute
{
  "watch" : {
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "actions" : {
      "mod_log" : {
        "transform": {                                                                
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'msg': ctx.payload.aggregations.theatres.buckets.stream()
              .map(t-> formatter.format(t.money.value) + ' for the play ' + t.key)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The output of the payload was transformed to: {{ctx.payload.msg}}"
        }
      },
      "unmod_log" : {                                                                 
        "logging" : {
          "text" : "The output of the payload was not transformed and this value should not exist: {{ctx.payload.msg}}"
        }
      }
    }
  }
}

此示例以非常类似的方式使用流式 API。以下差异很细微,值得指出。

变换的位置不再位于顶层,而是在单个操作中。

为了参考,给出了第二个不转换有效负载的操作。

以下示例显示了完整 watch 上下文中的脚本化 watch 和操作变换。此 watch 还使用脚本化的 条件

POST _watcher/watch/_execute
{
  "watch" : {
    "metadata" : { "high_threshold": 4000, "low_threshold": 1000 },
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": {
                      "field" : "cost",
                      "script": {
                       "source": "doc.cost.value * doc.number.value"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.aggregations.theatres.buckets.stream()
          .anyMatch(theatre -> theatre.money.value < ctx.metadata.low_threshold ||
                               theatre.money.value > ctx.metadata.high_threshold)
      """
    },
    "transform" : {
      "script":
      """
        return [
          'money_makers': ctx.payload.aggregations.theatres.buckets.stream()
            .filter(t -> {
                return t.money.value > ctx.metadata.high_threshold
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList()),
          'duds' : ctx.payload.aggregations.theatres.buckets.stream()
            .filter(t -> {
                return t.money.value < ctx.metadata.low_threshold
            })
            .map(t -> {
                return ['play': t.key, 'total_value': t.money.value ]
            }).collect(Collectors.toList())
          ]
      """
    },
    "actions" : {
      "log_money_makers" : {
        "condition": {
          "script" : "return ctx.payload.money_makers.size() > 0"
        },
        "transform": {
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'plays_value': ctx.payload.money_makers.stream()
              .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The following plays contain the highest grossing total income: {{ctx.payload.plays_value}}"
        }
      },
      "log_duds" : {
        "condition": {
          "script" : "return ctx.payload.duds.size() > 0"
        },
        "transform": {
          "script" :
          """
          def formatter = NumberFormat.getCurrencyInstance();
          return [
            'plays_value': ctx.payload.duds.stream()
              .map(t-> formatter.format(t.total_value) + ' for the play ' + t.play)
              .collect(Collectors.joining(", "))
          ]
          """
        },
        "logging" : {
          "text" : "The following plays need more advertising due to their low total income: {{ctx.payload.plays_value}}"
        }
      }
    }
  }
}

以下示例显示了元数据的使用以及将日期转换为可读格式。

POST _watcher/watch/_execute
{
  "watch" : {
    "metadata" : { "min_hits": 10 },
    "trigger" : { "schedule" : { "interval" : "24h" } },
    "input" : {
      "search" : {
        "request" : {
          "indices" : [ "seats" ],
          "body" : {
            "query" : {
              "term": { "sold": "true"}
            },
            "aggs" : {
              "theatres" : {
                "terms" : { "field" : "play" },
                "aggs" : {
                  "money" : {
                    "sum": { "field" : "cost" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "condition" : {
      "script" :
      """
        return ctx.payload.hits.total > ctx.metadata.min_hits
      """
    },
    "transform" : {
      "script" :
      """
        def theDate = ZonedDateTime.ofInstant(ctx.execution_time.toInstant(), ctx.execution_time.getZone());
        return ['human_date': DateTimeFormatter.RFC_1123_DATE_TIME.format(theDate),
                'aggregations': ctx.payload.aggregations]
      """
    },
    "actions" : {
      "my_log" : {
        "logging" : {
          "text" : "The watch was successfully executed on {{ctx.payload.human_date}} and contained {{ctx.payload.aggregations.theatres.buckets.size}} buckets"
        }
      }
    }
  }
}