正在加载

Common Expression Language 输入

使用 cel 输入,可以使用 Common Expression Language (CEL)mito CEL 扩展库从文件路径或 HTTP API 读取具有各种有效负载的消息。

CEL 是一种非图灵完备语言,可以对输入中的表达式进行评估,使用 mito 扩展库时可以包含文件和 API 端点。cel 输入会定期运行一个 CEL 程序,该程序被赋予一个可由用户配置的执行环境,并发布程序评估产生的一组事件。CEL 程序可以选择返回光标状态,这些状态将提供给下一次 CEL 程序的执行。光标状态可用于控制程序的行为。

此输入支持

  • 身份验证

    • Basic
    • Digest
    • OAuth2
  • 按可配置的时间间隔检索

  • 分页

  • 重试

  • 速率限制

  • 代理

配置示例

filebeat.inputs:
# Fetch your public IP every minute.
- type: cel
  interval: 1m
  resource.url: https://api.ipify.org/?format=json
  program: |
    get(state.url).Body.as(body, {
        "events": [body.decode_json()]
    })

或者等效地使用 ipify.org 的文本格式

filebeat.inputs:
# Fetch your public IP every minute.
- type: cel
  interval: 1m
  resource.url: https://api.ipify.org/?format=text
  program: |
    {
        "events": [{"ip": string(get(state.url).Body)}]
    }
filebeat.inputs:
- type: cel
  resource.url: http://localhost:9200/_search
  state:
    scroll: 5m
  program: |
    (
        !has(state.cursor) || !has(state.cursor.scroll_id) ?
            post(state.url+"?scroll=5m", "", "")
        :
            post(
                state.url+"/scroll?"+{"scroll_id": [state.cursor.scroll_id]}.format_query(),
                "application/json",
                {"scroll": state.scroll}.encode_json()
            )
    ).as(resp, resp.Body.decode_json().as(body, {
            "events": body.hits.hits,
            "cursor": {"scroll_id": body._scroll_id},
    }))

为输入提供的执行环境包括 mito 库提供的函数、宏和全局变量。一个 JSON 对象作为输入,通过 state 变量访问。state 包含一个字符串 url 字段,并且可以包含通过输入的 state 配置配置的任意其他字段。如果 CEL 程序在程序执行之间保存光标状态,则配置的 state.cursor 值将在执行之前被保存的光标替换。

启动时,state 将是这样的

{
    "url": <resource address>,
    "cursor": { ... },
    ...
}

state.url 字段将存在,并且可以是 HTTP 端点或文件路径。如果文件 URL 中存在方案,CEL 程序有责任处理移除该方案。state.url 字段可以在程序执行期间被改变,但改变后的状态不会在重新启动之间持久化。返回的值中必须包含 state.url 字段,以确保在下次评估中可用,除非程序中硬编码了资源地址或可以从光标中获取。

对象根目录中可能存在其他字段,如果程序允许,光标值可能不存在。只有光标会在重新启动后持久化,但状态中的所有字段在处理循环的迭代之间都会保留,除了产生的事件数组,如下所示。

如果光标存在,程序应根据其值执行并处理请求。如果光标不存在,程序必须有备用逻辑来确定要发出哪些请求。

程序执行完成后,应返回一个单一对象,其结构如下所示

{
    "events": [
        {...},
        ...
    ],
    "cursor": [
        {...},
        ...
    ],
    "url": <resource address>,
    "status_code": <HTTP request status code if a network request>,
    "header": <HTTP response headers if a network request>,
    "rate_limit": <HTTP rate limit map if required by API>,
    "want_more": false
}
  1. 必须存在 events 字段,但可以为空或 null。如果它不为空,则其元素必须仅为对象。该字段应为数组,但在 CEL 程序中出现错误条件时,可以返回单个对象而不是数组;这将作为数组进行包装以供发布,并且会记录错误。如果单个对象包含键“error”,则错误值将用于更新输入的狀態 以报告给 Elastic Agent。这可用于更快速地响应 API 故障。
  2. 如果存在 cursor,它必须是一个单个对象或一个与 events 长度相同的数组;cursor 的每个元素 i 将是获取 events 数组中事件 i 及之后事件的详细信息。如果 cursor 是一个单个对象,它将是获取 events 数组中最后一个事件之后事件的详细信息,并且仅在成功发布 events 数组中的所有事件后才会保留。
  3. 如果存在 rate_limit,它必须是一个包含数字字段 rateburst 的映射。rate_limit 字段也可能包含一个字符串 error 字段和将被记录的其他字段。如果它包含一个 error 字段,则 rateburst 将不会用于设置速率限制行为。 LimitOkta Rate Limit policyDraft Rate Limit policy 文档展示了如何构建此字段。
  4. 如果存在“want_more”字段且为 true,并且返回一个非零的 events 数组,则在移除 events 字段后,使用新状态重复评估。如果在评估失败后存在“want_more”字段,则将其设置为 false。

如果程序不与 HTTP API 端点交互,则可以省略 status_codeheaderrate_limit 值,因此不需要它们来辅助程序控制。

在 DEBUG 级别记录日志时,CEL 输入将在评估后记录完整的状态。这将包括 state 对象中保留的任何敏感或秘密信息,因此在生产环境中 state 对象保留敏感信息时,不应使用 DEBUG 级别的日志记录。有关用于从 DEBUG 日志中排除敏感字段的设置,请参阅 redact 配置参数。

如上所述,cel 输入提供了函数、宏和全局变量来扩展语言。

除了上述软件包中提供的扩展外,还提供了一个全局变量 useragent,它允许用户 CEL 程序访问 filebeat 用户代理字符串。默认情况下,此值会分配给所有请求的用户代理头,除非 CEL 程序已设置了用户代理头值。不希望提供用户代理的程序应将此头设置为空字符串 ""

主机环境变量通过全局映射 env 提供。只有通过 allowed_environment 配置列表允许列出的环境变量对 CEL 程序可见。

CEL 环境使用 此处 定义的版本启用 可选类型 库。

此外,它支持通过 Basic Authentication、Digest Authentication 或 OAuth2 进行身份验证。

带身份验证的配置示例

filebeat.inputs:
- type: cel
  auth.basic:
    user: user@domain.tld
    password: P@$$W0₹D
  resource.url: http://localhost
filebeat.inputs:
- type: cel
  auth.digest:
    user: user@domain.tld
    password: P@$$W0₹D
  resource.url: http://localhost
filebeat.inputs:
- type: cel
  auth.oauth2:
    client.id: 12345678901234567890abcdef
    client.secret: abcdef12345678901234567890
    token_url: http://localhost/oauth2/token
  resource.url: http://localhost
filebeat.inputs:
- type: cel
  auth.oauth2:
    client.id: 12345678901234567890abcdef
    client.secret: abcdef12345678901234567890
    token_url: http://localhost/oauth2/token
    user: user@domain.tld
    password: P@$$W0₹D
  resource.url: http://localhost

cel 输入在请求之间保留运行时状态。此状态可由 CEL 程序访问,并可能包含任意对象。

状态必须包含一个 url 字符串,并且可以包含用户希望存储在其中的任何对象。

所有对象都在运行时存储,除了 cursor,其值在重新启动之间会持久化。

cel 输入支持以下配置选项以及后面描述的 通用选项

重复请求之间的持续时间。如果启用分页,它可能会对初始请求做出额外的分页请求。默认值:60s

每个轮询周期执行的 CEL 程序。此字段是必需的。

max_executions 是 CEL 程序可以请求通过 want_more 字段重新运行的最大次数。这用于确保意外的无限循环不会停止处理。当执行预算超出时,执行将在下一个时间间隔重新开始,并且会在日志中写入警告。默认值:1000。

state 是一个可选对象,在首次执行时传递给 CEL 程序。它作为 state 变量对正在执行的程序可用。在输入的生命周期内,它作为上一次执行的返回值提供给程序的后续执行,但移除了 state.events 字段。除了 state.cursor 字段外,state 不会在重新启动后持久化。

光标是一个对象,作为 state.cursor 可用,可以在其中存储任意值。光标状态在输入重新启动之间保留,并在请求的每个事件发布后更新。使用光标时,CEL 程序必须为程序返回的每个事件创建一个光标状态,或者创建一个反映完整事件集完成情况的单个光标。

filebeat.inputs:
# Fetch your public IP every minute and note when the last request was made.
- type: cel
  interval: 1m
  resource.url: https://api.ipify.org/?format=json
  program: |
    get(state.url).Body.as(body, {
        "events": [body.decode_json().with({
            "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ?
                state.cursor.last_requested_at
            :
                now
        })],
        "cursor": {"last_requested_at": now}
    })

将对 CEL 执行环境可见的主机环境变量列表。默认情况下,没有环境变量可见。

filebeat.inputs:
# Publish the list of files in $PATH every minute.
- type: cel
  interval: 1m
  resource.url: ""
  allowed_environment:
    - PATH
  program: |
{
  "events": {
    "message": env.?PATH.orValue("").split(":")
      .map(p, try(dir(p)))
      .filter(d, type(d) != type(""))
      .flatten()
      .collate("name")
  }
}

一组命名正则表达式,可以在 CEL 程序执行期间使用 regexp 扩展库。正则表达式使用的语法是 RE2

filebeat.inputs:
- type: cel
  # Define two regular expressions, 'products' and 'solutions' for use during CEL execution.
  regexp:
    products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
    solutions: '(?i)(Search|Observability|Security)'

XML 文档可能需要额外的类型信息才能正确解析和摄取。此信息可以通过 xsd 选项作为 XML 文档的 XML Schema Definitions (XSD) 提供。提供 XSD 信息的键通过 decode_xml CEL 扩展访问。

filebeat.inputs:
- type: cel
  # Provide an XSD, 'order', for use during CEL execution (truncated for example).
  xsd:
    order: |
       <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
         <xs:element name="order">
           <xs:complexType>
             <xs:sequence>
               <xs:element name="sender" type="xs:string"/>
               <xs:element name="address">
                 <xs:complexType>
                   <xs:sequence>
                     <xs:element name="name" type="xs:string"/>
                     <xs:element name="company" type="xs:string"/>

设置为 false 时,禁用基本身份验证配置。默认值:true

注意

如果 enabled 设置为 falseauth.basic 部分缺失,则禁用基本身份验证设置。

用于身份验证的用户。

要使用的密码。

设置为 false 时,禁用摘要身份验证配置。默认值:true

注意

如果 enabled 设置为 falseauth.digest 部分缺失,则禁用摘要身份验证设置。

用于身份验证的用户。

要使用的密码。

设置为 true 时,摘要身份验证质询不会被重用。

设置为 false 时,禁用 oauth2 配置。默认值:true

注意

如果 enabled 设置为 falseauth.oauth2 部分缺失,则禁用 OAuth2 设置。

用于配置支持的 oauth2 提供商。每个支持的提供商都需要特定的设置。默认情况下未设置。支持的提供商有:azuregoogleokta

作为身份验证流程的一部分使用的客户端 ID。除了使用 google 作为提供商外,始终需要此字段。对于提供商 defaultazureokta 必需。

作为身份验证流程的一部分使用的客户端 Secret。除了使用 googleokta 作为提供商外,始终需要此字段。对于提供商 defaultazure 必需。

作为身份验证流程的一部分使用的用户。对于身份验证 - 授权类型 password 必需。仅适用于提供商 default

作为身份验证流程的一部分使用的密码。对于身份验证 - 授权类型 password 必需。仅适用于提供商 default

注意

grant_type password 需要用户和密码。如果不使用用户和密码,则将自动使用 token_urlclient credential 方法。

在 oauth2 流程中将请求的范围列表。对于所有提供商都是可选的。

在 oauth2 流程中用于生成令牌的端点。如果未指定提供商,则必需。

注意

对于 azure 提供商,token_urlazure.tenant_id 是必需的。

每次请求 token_url 时将发送的值集合。每个参数键可以有多个值。除了 google 之外的所有提供商都可以设置。

- type: cel
  auth.oauth2:
    endpoint_params:
      Param1:
        - ValueA
        - ValueB
      Param2:
        - Value

在使用 azure 提供商时用于身份验证。由于它用于生成 token_url 的过程,因此不能与其组合使用。它不是必需的。

有关在哪里找到它的信息,您可以参考 https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal

使用 azure 提供商时访问的 WebAPI 资源。它不是必需的。

Google 的凭据文件。

注意

凭据设置只能同时设置一个。如果未提供任何凭据,将尝试通过 ADC 从环境加载默认凭据。有关如何提供 Google 凭据的更多信息,请参阅 https://cloud.google.com/docs/authentication

您的凭据信息以原始 JSON 格式。

注意

凭据设置只能同时设置一个。如果未提供任何凭据,将尝试通过 ADC 从环境加载默认凭据。有关如何提供 Google 凭据的更多信息,请参阅 https://cloud.google.com/docs/authentication

Google 的 JWT 账户密钥文件。

注意

凭据设置只能同时设置一个。如果未提供任何凭据,将尝试通过 ADC 从环境加载默认凭据。有关如何提供 Google 凭据的更多信息,请参阅 https://cloud.google.com/docs/authentication

JWT 账户密钥文件以原始 JSON 格式。

注意

凭据设置只能同时设置一个。如果未提供任何凭据,将尝试通过 ADC 从环境加载默认凭据。有关如何提供 Google 凭据的更多信息,请参阅 https://cloud.google.com/docs/authentication

用于创建凭据的委托账户的电子邮件(通常是管理员)。与 auth.oauth2.google.jwt_fileauth.oauth2.google.jwt_json 结合使用。

您的 Okta Service App 的 RSA JWK 私钥文件,用于与 Okta Org Auth Server 交互以铸造具有 okta.* 范围的令牌。

注意

凭据设置只能同时设置一个。更多信息请参阅 https://developer.okta.com/docs/guides/implement-oauth-for-okta-serviceapp/main/

您的 Okta Service App 的 RSA JWK 私钥 JSON,用于与 Okta Org Auth Server 交互以铸造具有 okta.* 范围的令牌。

注意

凭据设置只能同时设置一个。更多信息请参阅 https://developer.okta.com/docs/guides/implement-oauth-for-okta-serviceapp/main/

您的 Okta Service App 的 RSA JWK 私钥 PEM 块,用于与 Okta Org Auth Server 交互以铸造具有 okta.* 范围的令牌。

注意

凭据设置只能同时设置一个。更多信息请参阅 https://developer.okta.com/docs/guides/implement-oauth-for-okta-serviceapp/main/

HTTP API 的 URL。必需。

API 端点可以通过在 URL 方案中添加 +unix+npipe 来通过 Unix socket 和 Windows 命名管道访问,例如 http+unix:///var/socket/

要添加到所有请求的头部。头部在身份验证头部之前添加,因此此配置中的头部与身份验证头部之间的任何冲突将导致此处的冲突头部不包含在请求中。头部值必须作为数组提供。

filebeat.inputs:
# Fetch your public IP every minute.
- type: cel
  interval: 1m
  resource.url: https://api.ipify.org/?format=text
  resource.headers:
    Custom-Header:
      - Value
    Other-Custom-Header:
      - Other value
  resource.proxy_url: http://proxy.example:8080
  program: |
    {
        "events": [{"ip": string(get(state.url).Body)}]
    }

在声明 HTTP 客户端连接超时之前的时间持续时间。有效的时间单位包括 ns, us, ms, s, m, h。默认值:30s

这指定了 SSL/TLS 配置。如果 ssl 部分缺失,则使用主机的 CA 进行 HTTPS 连接。有关更多信息,请参阅 SSL

这指定了代理配置,格式为 http[s]://<user>:<password>@<server name/ip>:<port>。可以使用 resource.proxy_headers 字段配置代理头部,该字段接受一组键/值对。

filebeat.inputs:
# Fetch your public IP every minute.
- type: cel
  interval: 1m
  resource.url: https://api.ipify.org/?format=text
  resource.proxy_url: http://proxy.example:8080
  program: |
    {
        "events": [{"ip": string(get(state.url).Body)}]
    }

这指定是否禁用 HTTP 端点的 keep-alives。默认值:true

所有主机上的最大空闲连接数。零表示没有限制。默认值:0

每个主机保持的最大空闲连接数。如果为零,则默认为两个。默认值:0

空闲连接在关闭之前保持空闲的最大时间量。有效的时间单位包括 ns, us, ms, s, m, h。零表示没有限制。默认值:0s

HTTP 客户端的最大重试次数。默认值:5

尝试重试前的最短等待时间。默认值:1s

尝试重试前的最长等待时间。默认值:60s

设置为 true 时,在重定向情况下转发请求头部。默认值:false

redirect.forward_headers 设置为 true 时,将转发所有头部,除了此列表中定义的头部。默认值:[]

请求遵循的最大重定向次数。默认值:10

如果非零,客户端将接受的响应正文的最大大小。过大的正文将导致错误,“response body too big”(响应正文过大)。默认值:0

指定最大总体资源请求速率的响应值。

最大突发大小。突发是指在总体速率限制之上可以进行的资源请求的最大数量。

可以在 CEL 程序中将 HTTP 请求和响应记录到本地文件系统以调试配置。通过将 resource.tracer.enabled 设置为 true 并设置 resource.tracer.filename 值来启用此选项。还有其他选项可用于调整日志轮换行为。要删除现有日志,请在不取消设置文件名选项的情况下将 resource.tracer.enabled 设置为 false。

启用此选项会损害安全性,仅应用于调试。

为了区分不同输入实例生成的跟踪文件,可以在文件名中添加占位符 *,它将被输入实例 ID 替换。例如,http-request-trace-*.ndjson。在将 resource.tracer.enable 设置为 false 的情况下设置 resource.tracer.filename 将导致删除任何与文件名选项匹配的现有跟踪日志。

此值设置日志文件在轮换前达到的最大大小(以兆字节为单位)。默认情况下,日志在轮换前允许达到 1MB。单个请求/响应正文将截断到此大小的 10%。

这指定了保留轮换日志文件的天数。如果未设置,则日志文件会无限期保留。

保留旧日志的数量。如果未设置,则根据 resource.tracer.maxage 设置保留所有旧日志。

轮换日志文件名时间戳是使用主机本地时间还是 UTC 时间。

这决定了轮换日志是否应使用 gzip 压缩。

在调试级别日志记录期间,state 对象和评估结果将包含在日志中。这可能导致秘密泄露。为了防止这种情况,可以从记录的 state 中编辑或删除字段。redact 配置允许用户配置此字段编辑行为。出于安全原因,如果 redact 配置缺失,将记录警告。

在不需要编辑的情况下,应使用空的 redact.fields 配置来消除日志警告。

- type: cel
  redact:
    fields: ~

例如,如果在 CEL 程序中使用用户构建的基本身份验证请求,可以这样编辑密码

filebeat.inputs:
- type: cel
  resource.url: http://localhost:9200/_search
  state:
    user: user@domain.tld
    password: P@$$W0₹D
  redact:
    fields:
      - password
    delete: true

请注意,auth 配置层级下的字段不会暴露给 state,因此不需要编辑。因此,如果可能,优先使用这些字段进行身份验证,而不是上面所示的请求构建。

这指定了在调试日志记录之前要从 state 中编辑的字段。此数组中列出的字段将替换为 * 或完全从发送到调试日志的消息中删除。

这指定字段应替换为 * 还是完全从发送到调试日志的消息中删除。如果 delete 为 true,则字段将被删除而不是替换。

可以在本地文件系统记录 CEL 程序评估失败以调试配置。通过将 failure_dump.enabled 设置为 true 并设置 failure_dump.filename 值来启用此选项。要删除现有失败转储,请在不取消设置文件名选项的情况下将 failure_dump.enabled 设置为 false。

启用此选项会损害安全性,仅应用于调试。

这指定了写入失败转储的目录路径。如果它不为空且 CEL 程序评估失败,CEL 程序评估的完整状态集将作为 JSON 文件写入,同时记录报告的错误。此选项仅应用于调试失败,因为它会对输入产生显著的性能影响,并且可能占用大量内存来保存完整状态集。如果配置了失败转储,建议减小数据输入大小以避免过度内存消耗,并生成难以分析的转储。要删除现有失败转储,请在不取消设置文件名选项的情况下将 failure_dump.enabled 设置为 false。

这指定应记录 CEL 代码评估覆盖率并将其记录在调试日志中。这是一个仅限开发人员的选项。

此输入在 HTTP 监控端点 下公开指标。这些指标在 /inputs 路径下公开。它们可用于观察输入的活动。

指标 说明
resource 输入资源的 URL 或路径。
cel_executions CEL 程序已执行的次数。
batches_received_total 接收到的事件数组数量。
events_received_total 接收到的事件数量。
batches_published_total 已发布的事件数组数量。
events_published_total 已发布的事件数量。
cel_processing_time 成功 CEL 程序处理时间的纳秒级直方图。
batch_processing_time 成功批处理时间的纳秒级直方图(非空批次从接收时间到 ACK 时间)。
http_request_total 已处理请求的总数。
http_request_errors_total 请求错误的总数。
http_request_delete_total DELETE 请求的总数。
http_request_get_total GET 请求的总数。
http_request_head_total HEAD 请求的总数。
http_request_options_total OPTIONS 请求的总数。
http_request_patch_total PATCH 请求的总数。
http_request_post_total POST 请求的总数。
http_request_put_total PUT 请求的总数。
http_request_body_bytes_total 请求正文总大小。
http_request_body_bytes 请求正文大小的直方图。
http_response_total 接收到的响应总数。
http_response_errors_total 响应错误的总数。
http_response_1xx_total 1xx 响应的总数。
http_response_2xx_total 2xx 响应的总数。
http_response_3xx_total 3xx 响应的总数。
http_response_4xx_total 4xx 响应的总数。
http_response_5xx_total 5xx 响应的总数。
http_response_body_bytes_total 响应正文总大小。
http_response_body_bytes 响应正文大小的直方图。
http_round_trip_time 往返时间的直方图。

Elastic Mito 仓库中提供了一个独立的 CEL 环境,它实现了 CEL 输入的大部分 Comment Expression Language 功能。此工具可用于帮助开发供输入使用的 CEL 程序。可以通过运行 go install github.com/elastic/mito/cmd/mito@latest 从源代码安装,这需要 Go 工具链。

所有输入都支持以下配置选项。

使用 enabled 选项启用和禁用输入。默认情况下,enabled 设置为 true。

Filebeat 在每个发布的事件的 tags 字段中包含的标签列表。标签使得在 Kibana 中选择特定事件或在 Logstash 中应用条件过滤变得容易。这些标签将附加到通用配置中指定的标签列表。

示例

filebeat.inputs:
- type: cel
  . . .
  tags: ["json"]

可选字段,您可以指定这些字段向输出添加附加信息。例如,您可以添加可用于过滤日志数据的字段。字段可以是标量值、数组、字典或它们的任何嵌套组合。默认情况下,您在此处指定的字段将分组到输出文档的 fields 子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root 选项设置为 true。如果在通用配置中声明了重复字段,则其值将由此处声明的值覆盖。

filebeat.inputs:
- type: cel
  . . .
  fields:
    app_id: query_engine_12

如果此选项设置为 true,则自定义 fields 将作为顶级字段存储在输出文档中,而不是分组在 fields 子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。

要应用于输入数据的处理器列表。

有关在配置中指定处理器的信息,请参阅 Processors

为此输入生成的事件设置的 ingest pipeline ID。

注意

pipeline ID 也可以在 Elasticsearch 输出中配置,但此选项通常会使配置文件更简单。如果 pipeline 在输入和输出中都配置了,则使用输入中的选项。

重要

pipeline 始终是小写的。如果 pipeline: Foo-Bar,则 Elasticsearch 中的 pipeline 名称需要定义为 foo-bar

如果此选项设置为 true,则具有 null 值的字段将在输出文档中发布。默认情况下,keep_null 设置为 false

如果存在,此格式化字符串将覆盖此输入事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能会扩展为 "filebeat-myindex-2019.11.01"

默认情况下,所有事件都包含 host.name。可以将此选项设置为 true 以禁用将此字段添加到所有事件。默认值为 false

© . All rights reserved.