流式输入

编辑

此功能处于技术预览阶段,可能会在将来的版本中更改或删除。Elastic 将致力于解决任何问题,但技术预览中的功能不受官方 GA 功能的支持 SLA 的约束。

streaming 输入从流式数据源读取消息,例如 Websocket 服务器。此输入在内部使用 CEL 引擎mito 库来解析和处理消息。支持 CEL 允许您以更灵活的方式解析和处理消息。它与 cel 输入在 CEL 程序的编写方式上有很多相似之处,但读取和处理消息的方式有所不同。目前支持 Websocket 服务器或 API 端点以及 Crowdstrike Falcon 流式 API。

Websocket 流式输入支持

  • 身份验证

    • 基本认证
    • Bearer 认证
    • 自定义认证

streaming 输入 Websocket 处理程序目前不支持 XML 消息。自动重新连接目前也不受支持,因此重新连接将在输入重启时发生。

Crowdstrike 流式输入需要 OAuth2.0,如 Crowdstrike API 文档中所述。使用 Crowdstrike 流式类型时,必须设置 crowdstrike_app_id 配置字段。此字段指定发送到 Crowdstrike API 的 appId 参数。有关详细信息,请参阅 Crowdstrike 文档。

stream_type 配置字段指定要使用的流式输入类型,“websocket”或“crowdstrike”。如果未设置,则输入默认为 websocket 流式传输。

执行

编辑

为输入提供的执行环境包括 mito 库提供的函数、宏和全局变量。单个 JSON 对象作为输入提供,可以通过 state 变量访问。 state 包含一个 response 映射字段,并且可能包含通过输入的 state 配置配置的任意其他字段。如果 CEL 程序在程序执行之间保存游标状态,则在执行之前,配置的 state.cursor 值将被保存的游标替换。

启动时,state 将类似于以下内容

{
    "response": { ... },
    "cursor": { ... },
    ...
}

streaming 输入 Websocket 处理程序在 state 映射中创建一个 response 字段,并将 Websocket 消息附加到此字段。所有编写的 CEL 程序都应作用于此 response 字段。对象根部可能存在其他字段,如果程序容忍它,则游标值可能不存在。只有游标在重启后持久化,但在处理循环的每次迭代之间保留 state 中的所有字段,除了生成的 events 数组,如下所示。

如果存在游标,则程序应根据其值处理或过滤掉响应。如果游标不存在,则应根据程序的逻辑处理所有响应。

程序执行完成后,它应该返回一个结构类似于以下内容的单个对象

{
    "events": [ 
        {...},
        ...
    ],
    "cursor": [ 
        {...},
        ...
    ]
}

events 字段必须存在,但可以为空或 null。如果它不为空,则它只能具有对象作为元素。该字段可以是数组或单个对象,它将被视为具有单个元素的数组。这完全取决于流式数据源。该 events 字段是要发布到输出的事件数组。每个事件必须是 JSON 对象。

如果存在 cursor,它必须是单个对象或与 events 长度相同的数组;cursor 的每个元素 i 将是获取事件的详细信息,并在 events 数组中事件 i 及其之后。如果 cursor 是单个对象,它将是获取 events 数组中最后一个事件之后的事件的详细信息,并且只有在成功发布 events 数组中的所有事件后才会保留。

示例配置

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: streaming
  url: ws://127.0.0.1:443/v1/stream
  program: |
    bytes(state.response).decode_json().as(inner_body,{
      "events": {
        "message":  inner_body.encode_json(),
      }
    })
filebeat.inputs:
# Read and process events from the Crowdstrike Falcon Hose API
- type: streaming
  stream_type: crowdstrike
  url: https://api.crowdstrike.com/sensors/entities/datafeed/v2
  auth:
    client_id: a23fcea2643868ef1a41565a1a8a1c7c
    client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
    token_url: https://api.crowdstrike.com/oauth2/token
  crowdstrike_app_id: my_app_id
  program: |
    state.response.decode_json().as(body,{
      "events": [body],
      ?"cursor": has(body.?metadata.offset) ?
        optional.of({"offset": body.metadata.offset})
      :
        optional.none(),
    })

调试状态日志记录

编辑

Websocket 输入将在 DEBUG 级别进行日志记录时,在 CEL 评估之前和之后记录完整的状态。这将包括 state 对象中保留的任何敏感或秘密信息,因此在生产环境中,当 state 对象中保留敏感信息时,不应使用 DEBUG 级别日志记录。请参阅 redact 配置参数以获取排除 DEBUG 日志中敏感字段的设置。

身份验证

编辑

Websocket 流式输入支持通过基本令牌身份验证、Bearer 令牌身份验证和通过自定义身份验证配置进行身份验证。与 REST 输入不同,基本身份验证包含基本身份验证令牌,Bearer 身份验证包含 Bearer 令牌,自定义身份验证包含自定义标头和值的任意组合。这些令牌/密钥值将添加到请求标头中,并且不会暴露给 state 对象。自定义身份验证配置对于构造需要自定义标头和值进行身份验证的请求很有用。基本和 Bearer 令牌配置将始终使用 Authorization 标头,并在令牌前分别加上 BasicBearer

带有身份验证的示例配置

filebeat.inputs:
- type: streaming
  auth.basic_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream
filebeat.inputs:
- type: streaming
  auth.bearer_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream
filebeat.inputs:
- type: streaming
  auth.custom:
    header: "x-api-key"
    value: "dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream
filebeat.inputs:
- type: streaming
  auth.custom:
    header: "Auth"
    value: "Bearer dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream

Crowdstrike 流式输入需要使用客户端 ID、客户端密钥和令牌 URL 进行 OAuth2.0 身份验证。这些值不会暴露给 state 对象。OAuth2.0 范围和端点参数可通过 auth.scopesauth.endpoint_params 配置参数获得。

filebeat.inputs:
- type: streaming
  stream_type: crowdstrike
  auth:
    client_id: a23fcea2643868ef1a41565a1a8a1c7c
    client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
    token_url: https://api.crowdstrike.com/oauth2/token

输入状态

编辑

streaming 输入在接收到的每条消息之间保持运行时状态。CEL 程序可以访问此状态,并且可能包含任意对象。该状态必须包含一个 response 映射,并且可能包含用户希望存储在其中的任何对象。所有对象都存储在运行时,除了 cursor,其值在重启之间持久化。

配置选项

编辑

streaming 输入支持以下配置选项以及稍后描述的 通用选项

stream_type

编辑

要使用的流式传输类型。这可以是“websocket”、“crowdstrike”或未设置。如果该字段未设置,则使用 websocket 流式传输。

program

编辑

在接收到的每条消息上执行的 CEL 程序。此字段理想情况下应该存在,但如果不存在,则使用下面给出的默认程序。

program: |
  bytes(state.response).decode_json().as(inner_body,{
    "events": {
      "message":  inner_body.encode_json(),
    }
  })

url_program

编辑

如果存在,则在使用 state 对象(包括任何存储的游标值)建立流式连接之前执行此 CEL 程序。它必须计算为有效的 URL。返回的 URL 用于建立流式连接以进行处理。程序可以使用游标值或其他状态定义值在运行时自定义 URL。

url: ws://testapi:443/v1/streamresults
state:
  initial_start_time: "2022-01-01T00:00:00Z"
url_program: |
  state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time)
program: |
  bytes(state.response).decode_json().as(inner_body,{
    "events": {
      "message":  inner_body.encode_json(),
    },
    "cursor": {
      "since": inner_body.timestamp
    }
  })

state

编辑

state 是一个可选对象,在第一次执行时传递给 CEL 程序。它可以作为 state 变量供执行程序使用。除了 state.cursor 字段外,state 在重启后不会持久化。

state.cursor

编辑

游标是一个可作为 state.cursor 使用的对象,其中可以存储任意值。游标状态在输入重启之间保持不变,并在请求的每个事件发布后更新。当使用游标时,CEL 程序必须为程序返回的每个事件创建一个游标状态,或者创建一个反映完整事件集完成的游标。

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: streaming
  url: ws://127.0.0.1:443/v1/stream
  program: |
    bytes(state.response).as(body, {
      "events": [body.decode_json().with({
        "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ?
          state.cursor.last_requested_at
        :
          now
      })],
      "cursor": {"last_requested_at": now}
    })

regexp

编辑

一组命名的正则表达式,可以在 CEL 程序执行期间使用 regexp 扩展库使用。正则表达式的语法是 RE2

filebeat.inputs:
- type: streaming
  # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution.
  regexp:
    products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
    solutions: '(?i)(Search|Observability|Security)'

redact

编辑

在调试级别日志记录期间,state 对象和结果评估结果包含在日志中。这可能导致泄漏机密。为了防止这种情况,可以从记录的 state 中删除或编辑字段。该 redact 配置允许用户配置此字段编辑行为。出于安全原因,如果缺少 redact 配置,则会记录警告。

在不需要编辑的情况下,应使用空的 redact.fields 配置来抑制记录的警告。

- type: streaming
  redact:
    fields: ~

例如,如果用户构建的基本身份验证请求在 CEL 程序中使用,则密码可以像这样编辑

filebeat.inputs:
- type: streaming
  url: ws://127.0.0.1:443/_stream
  state:
    user: [email protected]
    password: P@$$W0₹D
  redact:
    fields:
      - password
    delete: true

请注意,auth 配置层次结构下的字段不会暴露给 state,因此不需要进行脱敏。因此,在可能的情况下,最好使用这些字段进行身份验证,而不是使用上面显示的请求构造方法。

redact.fields

编辑

这指定了在调试日志记录之前要从 state 中脱敏的字段。此数组中列出的字段将替换为 * 或从发送到调试日志的消息中完全删除。

redact.delete

编辑

这指定了字段是替换为 * 还是从发送到调试日志的消息中完全删除。如果 delete 为 true,则字段将被删除而不是替换。

retry

编辑

retry 配置允许用户指定在连接失败的情况下,输入尝试重新连接到流数据源的次数。默认值为 nil,表示不会尝试重新连接。它具有 wait_minwait_max 配置,分别指定重试之间等待的最小和最大时间。

filebeat.inputs:
- type: streaming
  url: ws://127.0.0.1:443/_stream
  program: |
    bytes(state.response).decode_json().as(inner_body,{
      "events": {
        "message":  inner_body.encode_json(),
      }
    })
  retry:
    max_attempts: 5
    wait_min: 1s
    wait_max: 10s

retry.max_attempts

编辑

在连接失败的情况下,输入尝试重新连接到流数据源的最大次数。默认值为 nil,表示不会尝试重新连接。

retry.wait_min

编辑

重试之间等待的最小时间。这确保了重试间隔足够长,以便系统有时间恢复或解决瞬态问题,而不是用快速的重试轰炸系统。例如,wait_min 可以设置为 1 秒,这意味着即使计算出的回退时间小于此值,客户端也将至少等待 1 秒后再重试。

retry.wait_max

编辑

重试之间等待的最大时间。这可以防止重试机制变得太慢,确保客户端不会在重试之间无限期地等待。这在超时或用户体验至关重要的系统中至关重要。例如,wait_max 可以设置为 10 秒,这意味着即使计算出的回退时间大于此值,客户端也将最多等待 10 秒后再重试。

指标

编辑

此输入在 HTTP 监控端点 下公开指标。这些指标在 /inputs 路径下公开。它们可用于观察输入的活动。

指标 描述

url

输入资源的 URL。

cel_eval_errors

在 cel 程序评估期间遇到的错误数量。

errors_total

输入生命周期中遇到的错误数量。

batches_received_total

接收到的事件数组数量。

batches_published_total

发布的事件数组数量。

received_bytes_total

输入生命周期中接收到的字节数。

events_received_total

接收到的事件数量。

events_published_total

发布的事件数量。

cel_processing_time

成功 CEL 程序处理时间的直方图(以纳秒为单位)。

batch_processing_time

成功批处理时间的直方图(以纳秒为单位)(非空批次的接收时间到 ACK 时间)。

开发者工具

编辑

Elastic Mito 存储库中提供了一个独立的 CEL 环境,它实现了流输入的大部分注释表达式语言功能。此工具可用于帮助开发输入将使用的 CEL 程序。可以通过运行 go install github.com/elastic/mito/cmd/mito@latest 从源代码进行安装,并且需要一个 Go 工具链。

通用选项

编辑

以下配置选项受所有输入支持。

enabled
编辑

使用 enabled 选项启用和禁用输入。默认情况下,enabled 设置为 true。

tags
编辑

Filebeat 在每个发布事件的 tags 字段中包含的一系列标签。标签使在 Kibana 中选择特定事件或在 Logstash 中应用条件过滤变得容易。这些标签将附加到通用配置中指定的标签列表中。

示例

filebeat.inputs:
- type: streaming
  . . .
  tags: ["json"]
fields
编辑

您可以指定的可选字段,以向输出添加其他信息。例如,您可以添加可用于过滤日志数据的字段。字段可以是标量值、数组、字典或这些的任何嵌套组合。默认情况下,您在此处指定的字段将分组在输出文档中的 fields 子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root 选项设置为 true。如果在通用配置中声明了重复的字段,则其值将被此处声明的值覆盖。

filebeat.inputs:
- type: streaming
  . . .
  fields:
    app_id: query_engine_12
fields_under_root
编辑

如果将此选项设置为 true,则自定义 fields 将存储为输出文档中的顶级字段,而不是分组在 fields 子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段会覆盖其他字段。

processors
编辑

要应用于输入数据的一系列处理器。

有关在配置中指定处理器的信息,请参阅 处理器

pipeline
编辑

为此输入生成的事件设置的摄取管道 ID。

管道 ID 也可以在 Elasticsearch 输出中配置,但此选项通常会导致更简单的配置文件。如果在输入和输出中都配置了管道,则使用输入中的选项。

keep_null
编辑

如果将此选项设置为 true,则输出文档中将发布具有 null 值的字段。默认情况下,keep_null 设置为 false

index
编辑

如果存在,此格式化字符串将覆盖此输入事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能扩展为 "filebeat-myindex-2019.11.01"

publisher_pipeline.disable_host
编辑

默认情况下,所有事件都包含 host.name。此选项可以设置为 true 以禁用将此字段添加到所有事件中。默认值为 false

streaming 输入目前被标记为实验性,可能存在错误和其他问题。请在 Github 存储库中报告任何问题。