流式输入

编辑

此功能为技术预览版,可能会在未来的版本中更改或删除。Elastic 将努力修复任何问题,但技术预览版的功能不受官方 GA 功能的支持 SLA 的约束。

streaming 输入从流式数据源(例如 websocket 服务器)读取消息。此输入在内部使用 CEL 引擎mito 库来解析和处理消息。支持 CEL 使您能够以更灵活的方式解析和处理消息。它在 CEL 程序的编写方式上与 cel 输入有很多相似之处,但在消息的读取和处理方式上有所不同。目前支持 websocket 服务器或 API 端点以及 Crowdstrike Falcon 流式 API。

websocket 流式输入支持

  • 身份验证

    • 基本
    • 持有者
    • 自定义

streaming 输入 websocket 处理程序目前不支持 XML 消息。目前也不支持自动重新连接,因此重新连接将在输入重启时发生。

Crowdstrike 流式输入需要 OAuth2.0,如 Crowdstrike API 文档中所述。使用 Crowdstrike 流式类型时,必须设置 crowdstrike_app_id 配置字段。此字段指定发送到 Crowdstrike API 的 appId 参数。有关详细信息,请参阅 Crowdstrike 文档。

stream_type 配置字段指定要使用的流式输入类型,“websocket”或“crowdstrike”。如果未设置,则输入默认为 websocket 流式传输。

执行

编辑

为输入提供的执行环境包括 mito 库提供的函数、宏和全局变量。单个 JSON 对象作为可通过 state 变量访问的输入提供。state 包含一个 response 映射字段,并且可能包含通过输入的 state 配置配置的任意其他字段。如果 CEL 程序在程序执行之间保存游标状态,则配置的 state.cursor 值将在执行之前被保存的游标替换。

启动时,state 将如下所示

{
    "response": { ... },
    "cursor": { ... },
    ...
}

streaming 输入 websocket 处理程序在状态映射中创建一个 response 字段,并将 websocket 消息附加到此字段。所有编写的 CEL 程序都应作用于此 response 字段。其他字段可能存在于对象的根目录,并且如果程序可以容忍,则游标值可能不存在。只有游标会在重启时持久化,但状态中的所有字段都会在处理循环的迭代之间保留,除了生成的事件数组,请参见下文。

如果存在游标,则程序应根据其值处理或筛选响应。如果不存在游标,则应根据程序的逻辑处理所有响应。

程序执行完成后,应返回一个结构如下的单个对象

{
    "events": [ 
        {...},
        ...
    ],
    "cursor": [ 
        {...},
        ...
    ]
}

events 字段必须存在,但可能为空或 null。如果它不为空,则它必须只包含对象作为元素。该字段可以是数组或单个对象,它将被视为具有单个元素的数组。这完全取决于流式数据源。events 字段是要发布到输出的事件数组。每个事件都必须是 JSON 对象。

如果存在 cursor,则它必须是单个对象或与事件长度相同的数组;cursor 的每个元素 *i* 将是用于获取 events 数组中事件 *i* 及其之后事件的详细信息。如果 cursor 是单个对象,它将是用于获取 events 数组中最后一个事件之后事件的详细信息,并且仅在成功发布 events 数组中的所有事件时才会保留。

配置示例

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: streaming
  url: ws://127.0.0.1:443/v1/stream
  program: |
    bytes(state.response).decode_json().as(inner_body,{
      "events": {
        "message":  inner_body.encode_json(),
      }
    })
filebeat.inputs:
# Read and process events from the Crowdstrike Falcon Hose API
- type: streaming
  stream_type: crowdstrike
  url: https://api.crowdstrike.com/sensors/entities/datafeed/v2
  auth:
    client_id: a23fcea2643868ef1a41565a1a8a1c7c
    client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
    token_url: https://api.crowdstrike.com/oauth2/token
  crowdstrike_app_id: my_app_id
  program: |
    state.response.decode_json().as(body,{
      "events": [body],
      ?"cursor": has(body.?metadata.offset) ?
        optional.of({"offset": body.metadata.offset})
      :
        optional.none(),
    })

调试状态日志记录

编辑

当以 DEBUG 级别记录时,Websocket 输入将在 CEL 求值之前和之后记录完整状态。这将包括保存在 state 对象中的任何敏感或机密信息,因此当敏感信息保留在 state 对象中时,不应在生产中使用 DEBUG 级别日志记录。有关从 DEBUG 日志中排除敏感字段的设置,请参见 redact 配置参数。

身份验证

编辑

websocket 流式输入支持通过基本令牌身份验证、持有者令牌身份验证和通过自定义身份验证配置进行身份验证。与 REST 输入不同,基本身份验证包含基本身份验证令牌,持有者身份验证包含持有者令牌,自定义身份验证包含自定义标头和值的任意组合。这些令牌/密钥值将添加到请求标头,并且不会暴露给 state 对象。自定义身份验证配置对于构建需要自定义标头和值进行身份验证的请求非常有用。基本和持有者令牌配置将始终使用 Authorization 标头,并在令牌前面分别添加 BasicBearer

具有身份验证的配置示例

filebeat.inputs:
- type: streaming
  auth.basic_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream
filebeat.inputs:
- type: streaming
  auth.bearer_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream
filebeat.inputs:
- type: streaming
  auth.custom:
    header: "x-api-key"
    value: "dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream
filebeat.inputs:
- type: streaming
  auth.custom:
    header: "Auth"
    value: "Bearer dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream

crowdstrike 流式输入需要使用客户端 ID、客户端密钥和令牌 URL 进行 OAuth2.0 身份验证。这些值不会暴露给 state 对象。OAuth2.0 范围和端点参数可通过 auth.scopesauth.endpoint_params 配置参数获得。

filebeat.inputs:
- type: streaming
  stream_type: crowdstrike
  auth:
    client_id: a23fcea2643868ef1a41565a1a8a1c7c
    client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
    token_url: https://api.crowdstrike.com/oauth2/token

输入状态

编辑

streaming 输入在接收到的每条消息之间保持运行时状态。此状态可由 CEL 程序访问,并且可能包含任意对象。该状态必须包含一个 response 映射,并且可能包含用户希望存储在其中的任何对象。所有对象都在运行时存储,除了 cursor,它的值在重启之间持久化。

配置选项

编辑

streaming 输入支持以下配置选项以及稍后描述的 通用选项

stream_type

编辑

要使用的流式传输类型。这可以是“websocket”、“crowdstrike”或未设置。如果未设置该字段,则使用 websocket 流式传输。

program

编辑

在接收到的每条消息上执行的 CEL 程序。理想情况下,此字段应存在,但如果不存在,则使用下面给出的默认程序。

program: |
  bytes(state.response).decode_json().as(inner_body,{
    "events": {
      "message":  inner_body.encode_json(),
    }
  })

url_program

编辑

如果存在,则此 CEL 程序在建立流式连接之前,使用 state 对象(包括任何存储的游标值)执行。它必须求值为有效的 URL。返回的 URL 用于建立流式连接以进行处理。该程序可以使用游标值或其他定义的状态值来自定义运行时 URL。

url: ws://testapi:443/v1/streamresults
state:
  initial_start_time: "2022-01-01T00:00:00Z"
url_program: |
  state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time)
program: |
  bytes(state.response).decode_json().as(inner_body,{
    "events": {
      "message":  inner_body.encode_json(),
    },
    "cursor": {
      "since": inner_body.timestamp
    }
  })

state

编辑

state 是一个可选对象,它在第一次执行时传递给 CEL 程序。它作为 state 变量提供给执行程序。除了 state.cursor 字段外,state 不会在重启时持久化。

state.cursor

编辑

游标是一个对象,可以作为 state.cursor 访问,可以在其中存储任意值。游标状态在输入重启之间保持,并在请求的每个事件发布后更新。使用游标时,CEL 程序必须为程序返回的每个事件创建一个游标状态,或者创建一个反映完整事件集的游标的单个游标。

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: streaming
  url: ws://127.0.0.1:443/v1/stream
  program: |
    bytes(state.response).as(body, {
      "events": [body.decode_json().with({
        "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ?
          state.cursor.last_requested_at
        :
          now
      })],
      "cursor": {"last_requested_at": now}
    })

regexp

编辑

一组命名的正则表达式,可以在 CEL 程序的执行期间使用 regexp 扩展库使用。用于正则表达式的语法为 RE2

filebeat.inputs:
- type: streaming
  # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution.
  regexp:
    products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
    solutions: '(?i)(Search|Observability|Security)'

redact

编辑

在调试级别日志记录期间,日志中会包含 state 对象和生成的求值结果。这可能会导致泄露机密信息。为了防止这种情况,可以从记录的 state 中删除或删除字段。redact 配置允许用户配置此字段删除行为。出于安全原因,如果缺少 redact 配置,则会记录警告。

在不需要删除的情况下,应使用空的 redact.fields 配置来消除记录的警告。

- type: streaming
  redact:
    fields: ~

例如,如果 CEL 程序中使用了用户构建的基本身份验证请求,则可以按如下方式删除密码

filebeat.inputs:
- type: streaming
  url: ws://127.0.0.1:443/_stream
  state:
    user: [email protected]
    password: P@$$W0₹D
  redact:
    fields:
      - password
    delete: true

请注意,auth 配置层次结构下的字段不会暴露给 state,因此不需要删除。因此,尽可能优先使用这些字段进行身份验证,而不是上面显示的请求构建。

redact.fields

编辑

这指定在调试日志记录之前要删除的 state 中的字段。此数组中列出的字段将被替换为 * 或完全从发送到调试日志的消息中删除。

redact.delete

编辑

这指定字段应替换为 * 还是完全从发送到调试日志的消息中删除。如果 delete 为 true,则将删除字段,而不是替换。

retry

编辑

retry 配置允许用户指定在连接失败的情况下,输入应尝试重新连接到流式数据源的次数。默认值为 nil,表示不尝试重试。它具有 wait_minwait_max 配置,用于指定重试之间的最小和最大等待时间。

filebeat.inputs:
- type: streaming
  url: ws://127.0.0.1:443/_stream
  program: |
    bytes(state.response).decode_json().as(inner_body,{
      "events": {
        "message":  inner_body.encode_json(),
      }
    })
  retry:
    max_attempts: 5
    wait_min: 1s
    wait_max: 10s

retry.max_attempts

编辑

在连接失败的情况下,输入应尝试重新连接到流式数据源的最大次数。默认值为 5,表示最多尝试 5 次重试。

retry.wait_min

编辑

重试之间的最小等待时间。这确保重试之间有足够的间隔,以便系统有时间恢复或解决瞬时问题,而不是用快速重试轰炸系统。例如,wait_min 可以设置为 1 秒,这意味着即使计算出的退避时间小于此值,客户端也会至少等待 1 秒再重试。默认值为 1 秒。

retry.wait_max

编辑

重试之间的最大等待时间。这可以防止重试机制变得太慢,确保客户端不会在重试之间无限期地等待。这在超时或用户体验至关重要的系统中至关重要。例如,wait_max 可以设置为 10 秒,这意味着即使计算出的退避时间大于此值,客户端也最多等待 10 秒再重试。默认值为 30 秒。

timeout

编辑

超时是 websocket 连接器等待建立连接的最大时间。默认值为 180 秒。

proxy_url

编辑

这指定用于连接的正向代理 URL。proxy_url 配置是可选的,可用于配置连接的代理设置。proxy_url 的默认值由 http.ProxyFromEnvironment 设置,它读取 HTTP_PROXYHTTPS_PROXYNO_PROXY 环境变量。

proxy_headers

编辑

这指定要发送到代理服务器的标头。proxy_headers 配置是可选的,可用于配置要发送到代理服务器的标头。

这指定连接的 SSL 配置。ssl 配置是可选的,可用于配置连接的 SSL 设置。ssl 配置具有以下子字段

  • certificate_authorities:用于验证服务器证书的根证书列表。
  • certificate:用于客户端身份验证的(PEM 编码的)证书。
  • key:用于客户端身份验证的(PEM 编码的)私钥。

如果这是自签名证书,则应将 certificate_authorities 字段设置为证书本身。

指标

编辑

此输入在 HTTP 监控端点下公开指标。这些指标在 /inputs 路径下公开。它们可用于观察输入的活动。

指标 描述

url

输入资源的 URL。

cel_eval_errors

CEL 程序评估期间遇到的错误数。

errors_total

输入生命周期中遇到的错误总数。

batches_received_total

接收到的事件数组数。

batches_published_total

发布的事件数组数。

received_bytes_total

在输入生命周期中接收到的字节总数。

events_received_total

接收到的事件数。

events_published_total

发布的事件数。

cel_processing_time

成功 CEL 程序处理时间(以纳秒为单位)的直方图。

batch_processing_time

成功批处理时间(以纳秒为单位)的直方图(从接收到非空批次的 ACK 的时间)。

开发人员工具

编辑

一个独立的 CEL 环境,实现了流式输入的大部分注释表达式语言功能,可在 Elastic Mito 存储库中找到。此工具可用于帮助开发输入使用的 CEL 程序。可以通过运行 go install github.com/elastic/mito/cmd/mito@latest 从源代码进行安装,并且需要 Go 工具链。

通用选项

编辑

所有输入都支持以下配置选项。

enabled
编辑

使用 enabled 选项启用和禁用输入。默认情况下,enabled 设置为 true。

tags
编辑

Filebeat 在每个已发布事件的 tags 字段中包含的标签列表。标签使您可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件筛选。这些标签将附加到常规配置中指定的标签列表中。

示例

filebeat.inputs:
- type: streaming
  . . .
  tags: ["json"]
fields
编辑

您可以指定的用于向输出添加其他信息的可选字段。例如,您可以添加可用于筛选日志数据的字段。字段可以是标量值、数组、字典或这些的任何嵌套组合。默认情况下,您在此处指定的字段将分组在输出文档中的 fields 子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root 选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。

filebeat.inputs:
- type: streaming
  . . .
  fields:
    app_id: query_engine_12
fields_under_root
编辑

如果此选项设置为 true,则自定义 字段将作为输出文档中的顶级字段存储,而不是分组在 fields 子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。

processors
编辑

要应用于输入数据的处理器列表。

有关在配置中指定处理器的信息,请参阅 处理器

pipeline
编辑

为此输入生成的事件设置的摄取管道 ID。

也可以在 Elasticsearch 输出中配置管道 ID,但此选项通常会产生更简单的配置文件。如果在输入和输出中都配置了管道,则使用输入中的选项。

keep_null
编辑

如果此选项设置为 true,则具有 null 值的字段将发布在输出文档中。默认情况下,keep_null 设置为 false

index
编辑

如果存在,则此格式化的字符串会覆盖来自此输入的事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能会展开为 "filebeat-myindex-2019.11.01"

publisher_pipeline.disable_host
编辑

默认情况下,所有事件都包含 host.name。此选项可以设置为 true 以禁用向所有事件添加此字段。默认值为 false

streaming 输入目前被标记为实验性,可能存在错误和其他问题。请在 Github 存储库上报告任何问题。