Watcher 的工作原理

编辑

添加 Watch 以在满足特定条件时自动执行操作。这些条件通常基于您已加载到 Watch 中的数据,也称为Watch 负载。此负载可以从不同的来源加载 - 来自 Elasticsearch、外部 HTTP 服务,甚至两者的组合。

例如,您可以配置一个 Watch,以便当日志数据中的搜索指示最近 5 分钟内有太多 503 错误时,向系统管理员发送电子邮件。

本主题介绍 Watch 的元素以及 Watch 的运行方式。

Watch 定义

编辑

一个 Watch 由触发器输入条件操作组成。操作定义满足条件后需要执行的操作。此外,您可以定义条件转换,以便在执行操作之前处理和准备 Watch 负载。

触发器
确定何时检查 Watch。Watch 必须有一个触发器。
输入
将数据加载到 Watch 负载中。如果未指定输入,则加载空负载。
条件
控制是否执行 Watch 操作。如果未指定条件,则条件默认为 always
转换
处理 Watch 负载,以便为 Watch 操作做好准备。您可以在 Watch 级别定义转换,也可以定义特定于操作的转换。可选。
操作
指定满足 Watch 条件时发生的操作。

例如,以下代码片段显示了一个创建或更新 Watch 请求,该请求定义了一个查找日志错误事件的 Watch

resp = client.watcher.put_watch(
    id="log_errors",
    metadata={
        "color": "red"
    },
    trigger={
        "schedule": {
            "interval": "5m"
        }
    },
    input={
        "search": {
            "request": {
                "indices": "log-events",
                "body": {
                    "size": 0,
                    "query": {
                        "match": {
                            "status": "error"
                        }
                    }
                }
            }
        }
    },
    condition={
        "compare": {
            "ctx.payload.hits.total": {
                "gt": 5
            }
        }
    },
    transform={
        "search": {
            "request": {
                "indices": "log-events",
                "body": {
                    "query": {
                        "match": {
                            "status": "error"
                        }
                    }
                }
            }
        }
    },
    actions={
        "my_webhook": {
            "webhook": {
                "method": "POST",
                "host": "mylisteninghost",
                "port": 9200,
                "path": "/{{watch_id}}",
                "body": "Encountered {{ctx.payload.hits.total}} errors"
            }
        },
        "email_administrator": {
            "email": {
                "to": "[email protected]",
                "subject": "Encountered {{ctx.payload.hits.total}} errors",
                "body": "Too many error in the system, see attached data",
                "attachments": {
                    "attached_data": {
                        "data": {
                            "format": "json"
                        }
                    }
                },
                "priority": "high"
            }
        }
    },
)
print(resp)
const response = await client.watcher.putWatch({
  id: "log_errors",
  metadata: {
    color: "red",
  },
  trigger: {
    schedule: {
      interval: "5m",
    },
  },
  input: {
    search: {
      request: {
        indices: "log-events",
        body: {
          size: 0,
          query: {
            match: {
              status: "error",
            },
          },
        },
      },
    },
  },
  condition: {
    compare: {
      "ctx.payload.hits.total": {
        gt: 5,
      },
    },
  },
  transform: {
    search: {
      request: {
        indices: "log-events",
        body: {
          query: {
            match: {
              status: "error",
            },
          },
        },
      },
    },
  },
  actions: {
    my_webhook: {
      webhook: {
        method: "POST",
        host: "mylisteninghost",
        port: 9200,
        path: "/{{watch_id}}",
        body: "Encountered {{ctx.payload.hits.total}} errors",
      },
    },
    email_administrator: {
      email: {
        to: "[email protected]",
        subject: "Encountered {{ctx.payload.hits.total}} errors",
        body: "Too many error in the system, see attached data",
        attachments: {
          attached_data: {
            data: {
              format: "json",
            },
          },
        },
        priority: "high",
      },
    },
  },
});
console.log(response);
PUT _watcher/watch/log_errors
{
  "metadata" : { 
    "color" : "red"
  },
  "trigger" : { 
    "schedule" : {
      "interval" : "5m"
    }
  },
  "input" : { 
    "search" : {
      "request" : {
        "indices" : "log-events",
        "body" : {
          "size" : 0,
          "query" : { "match" : { "status" : "error" } }
        }
      }
    }
  },
  "condition" : { 
    "compare" : { "ctx.payload.hits.total" : { "gt" : 5 }}
  },
  "transform" : { 
    "search" : {
        "request" : {
          "indices" : "log-events",
          "body" : {
            "query" : { "match" : { "status" : "error" } }
          }
        }
    }
  },
  "actions" : { 
    "my_webhook" : {
      "webhook" : {
        "method" : "POST",
        "host" : "mylisteninghost",
        "port" : 9200,
        "path" : "/{{watch_id}}",
        "body" : "Encountered {{ctx.payload.hits.total}} errors"
      }
    },
    "email_administrator" : {
      "email" : {
        "to" : "[email protected]",
        "subject" : "Encountered {{ctx.payload.hits.total}} errors",
        "body" : "Too many error in the system, see attached data",
        "attachments" : {
          "attached_data" : {
            "data" : {
              "format" : "json"
            }
          }
        },
        "priority" : "high"
      }
    }
  }
}

元数据 - 您可以将可选的静态元数据附加到 Watch。

触发器 - 此计划触发器每 5 分钟执行一次 Watch。

输入 - 此输入在 log-events 索引中搜索错误,并将响应加载到 Watch 负载中。

条件 - 此条件检查是否有超过 5 个错误事件(搜索响应中的命中)。如果有,则继续执行所有 actions

转换 - 如果满足 Watch 条件,此转换会通过使用默认搜索类型 query_then_fetch 搜索错误,将所有错误加载到 Watch 负载中。所有 Watch 操作都可以访问此负载。

操作 - 此 Watch 有两个操作。my_webhook 操作会通知第三方系统有关该问题。email_administrator 操作会向系统管理员发送高优先级电子邮件。包含错误的 Watch 负载会附加到电子邮件中。

Watch 执行

编辑

当您添加 Watch 时,Watcher 会立即向相应的触发器引擎注册其触发器。具有 schedule 触发器的 Watch 会向 scheduler 触发器引擎注册。

调度程序会跟踪时间,并根据其计划触发 Watch。在包含 .watches 分片之一的每个节点上,都会运行一个绑定到 Watcher 生命周期运行的调度程序。即使考虑所有主分片和副本分片,当触发 Watch 时,Watcher 还会确保每个 Watch 仅在其中一个分片上触发。您添加的副本分片越多,可以执行的 Watch 分布就越广泛。如果您添加或删除副本,则需要重新加载所有 Watch。如果分片被重新定位,则此特定分片的主分片和所有副本都将重新加载。

由于 Watch 在 Watch 分片所在的节点上执行,您可以使用分片分配筛选来创建专用的 Watcher 节点。为此,请使用专用的 node.attr.role: watcher 属性配置节点。

由于 .watches 索引是系统索引,您不能使用普通的 .watcher/_settings 端点来修改其路由分配。相反,您可以使用以下专用端点来调整 .watches 分片到具有 watcher 角色属性的节点的分配

resp = client.perform_request(
    "PUT",
    "/_watcher/settings",
    headers={"Content-Type": "application/json"},
    body={
        "index.routing.allocation.include.role": "watcher"
    },
)
print(resp)
const response = await client.transport.request({
  method: "PUT",
  path: "/_watcher/settings",
  body: {
    "index.routing.allocation.include.role": "watcher",
  },
});
console.log(response);
PUT _watcher/settings
{
  "index.routing.allocation.include.role": "watcher"
}

当 Watcher 服务停止时,调度程序也会随之停止。触发器引擎使用一个独立的线程池,该线程池与用于执行 Watch 的线程池不同。

当触发 Watch 时,Watcher 会将其排队以进行执行。将创建一个 watch_record 文档并添加到 Watch 历史记录中,并将 Watch 的状态设置为 awaits_execution

当执行开始时,Watcher 会为 Watch 创建 Watch 执行上下文。执行上下文为脚本和模板提供对 Watch 元数据、负载、Watch ID、执行时间和触发器信息的访问权限。有关更多信息,请参阅 Watch 执行上下文

在执行过程中,Watcher 会

  1. 将输入数据作为 Watch 执行上下文中的负载加载。这使得数据可用于执行过程中的所有后续步骤。此步骤由 Watch 的输入控制。
  2. 评估 Watch 条件以确定是否继续处理 Watch。如果满足条件(评估为 true),则处理将前进到下一步。如果未满足条件(评估为 false),则停止执行 Watch。
  3. 将转换应用于 Watch 负载(如果需要)。
  4. 执行 Watch 操作,前提是满足条件且 Watch 未被限制

当 Watch 执行完成后,执行结果将作为Watch 记录记录在 Watch 历史记录中。Watch 记录包括执行时间和持续时间、是否满足 Watch 条件以及执行的每个操作的状态。

下图显示了 Watch 执行过程

watch execution

Watch 确认和限制

编辑

Watcher 支持基于时间和基于确认的限制。这使您能够防止针对同一事件重复执行操作。

默认情况下,Watcher 使用基于时间的限制,限制周期为 5 秒。这意味着,如果 Watch 每秒执行一次,则即使始终满足条件,其操作最多每 5 秒执行一次。您可以在每个操作的基础上或在 Watch 级别配置限制周期。

基于确认的限制使您可以告知 Watcher,只要满足 Watch 的条件,就不要发送任何关于 Watch 的更多通知。一旦条件评估为 false,就会清除确认,并且 Watcher 会恢复正常执行 Watch 操作。

有关更多信息,请参阅确认和限制

Watch 活动状态

编辑

默认情况下,当您添加 Watch 时,它会立即设置为活动状态,并向相应的触发器引擎注册,并根据其配置的触发器执行。

您还可以将 Watch 设置为非活动状态。非活动的 Watch 不会向触发器引擎注册,并且永远不会被触发。

要在创建 Watch 时将其设置为非活动状态,请将active参数设置为inactive。要停用现有的 Watch,请使用停用 Watch API。要重新激活非活动的 Watch,请使用激活 Watch API

您可以使用执行 Watch API强制执行 Watch,即使它处于非活动状态。

在各种情况下,停用 Watch 都非常有用。例如,如果您有一个 Watch 用于监视外部系统,并且您需要关闭该系统进行维护,则可以停用该 Watch 以防止它在维护期间错误地报告可用性问题。

停用 Watch 还使您可以将其保留以供将来使用,而无需从系统中删除它。

脚本和模板

编辑

您可以在定义 Watch 时使用脚本和模板。脚本和模板可以引用 Watch 执行上下文中的元素,包括 Watch 负载。执行上下文定义了您可以在脚本中使用的变量和模板中的参数占位符。

Watcher 使用 Elasticsearch 脚本基础结构,该基础结构支持内联存储的。脚本和模板由 Elasticsearch 编译和缓存,以优化重复执行。还支持自动加载。有关更多信息,请参阅脚本如何编写脚本

Watch 执行上下文

编辑

以下代码片段显示了Watch 执行上下文的基本结构

{
  "ctx" : {
    "metadata" : { ... }, 
    "payload" : { ... }, 
    "watch_id" : "<id>", 
    "execution_time" : "20150220T00:00:10Z", 
    "trigger" : { 
      "triggered_time" : "20150220T00:00:10Z",
      "scheduled_time" : "20150220T00:00:00Z"
    },
    "vars" : { ... } 
}

在 Watch 定义中指定的任何静态元数据。

当前的 Watch 负载。

正在执行的 Watch 的 ID。

一个时间戳,显示 Watch 执行开始的时间。

有关触发器事件的信息。对于schedule触发器,这包括triggered_time(Watch 被触发的时间)和scheduled_time(Watch 被计划触发的时间)。

可以在执行期间由不同构造设置和访问的动态变量。这些变量的作用域限定为单个执行(即,它们不会被持久化,并且不能在同一 Watch 的不同执行之间使用)

使用脚本

编辑

您可以使用脚本来定义条件转换。默认的脚本语言是Painless

从 5.0 版本开始,Elasticsearch 搭载了新的 Painless 脚本语言。Painless 专门为在 Elasticsearch 中使用而创建和设计。除了提供丰富的功能集外,它最大的特点是经过适当的沙盒化处理,可以在系统的任何位置(包括 Watcher 中)安全使用,而无需启用动态脚本。

脚本可以引用 watch 执行上下文中的任何值,或者通过脚本参数显式传递的值。

例如,如果 watch 元数据包含一个 color 字段(例如,"metadata" : {"color": "red"}),你可以通过 ctx.metadata.color 变量访问其值。如果你在条件或转换定义中传递一个 color 参数(例如,"params" : {"color": "red"}),你可以通过 color 变量访问其值。

使用模板

编辑

你可以使用模板为 watch 定义动态内容。在执行时,模板会从 watch 执行上下文中提取数据。例如,你可以使用模板来填充 email 操作的 subject 字段,并使用存储在 watch payload 中的数据。模板还可以访问通过模板参数显式传递的值。

你可以使用 Mustache 脚本语言来指定模板。

例如,以下代码片段展示了模板如何实现发送电子邮件时的动态主题

{
  "actions" : {
    "email_notification" : {
      "email" : {
        "subject" : "{{ctx.metadata.color}} alert"
      }
    }
  }
}
内联模板和脚本
编辑

要定义内联模板或脚本,只需直接在字段的值中指定它即可。例如,以下代码片段使用内联模板配置 email 操作的主题,该模板引用上下文中元数据的 color 值。

"actions" : {
  "email_notification" : {
     "email" : {
       "subject" : "{{ctx.metadata.color}} alert"
     }
   }
  }
}

对于脚本,只需将内联脚本指定为 script 字段的值即可。例如

"condition" : {
  "script" : "return true"
}

你还可以通过使用正式的对象定义作为字段值来显式指定内联类型。例如

"actions" : {
  "email_notification" : {
    "email" : {
      "subject" : {
         "source" : "{{ctx.metadata.color}} alert"
      }
    }
  }
}

脚本的正式对象定义如下

"condition" : {
  "script" : {
    "source": "return true"
  }
}
存储的模板和脚本
编辑

如果你 存储 了你的模板和脚本,你可以通过 id 引用它们。

要引用存储的脚本或模板,你需要使用正式的对象定义,并在 id 字段中指定其 id。例如,以下代码片段引用了 email_notification_subject 模板

{
  ...
  "actions" : {
    "email_notification" : {
      "email" : {
        "subject" : {
          "id" : "email_notification_subject",
          "params" : {
            "color" : "red"
          }
        }
      }
    }
  }
}