Websocket 输入编辑

此功能处于技术预览阶段,可能会在未来版本中更改或删除。Elastic 将努力修复任何问题,但技术预览版中的功能不受官方 GA 功能的支持 SLA 的约束。

websocket 输入从 websocket 服务器或 API 端点读取消息。此输入使用 CEL 引擎mito 库来解析和处理消息。支持 CEL 允许您以更灵活的方式解析和处理消息。它在 CEL 程序的编写方式上与 cel 输入有很多相似之处,但在读取和处理消息的方式上有所不同。websocket 输入是一种 流式 输入,只能用于从 websocket 服务器或 API 端点读取消息。

此输入支持

  • 身份验证

    • 基本
    • 承载
    • 自定义

websocket 输入目前不支持 XML 消息。目前也不支持自动重新连接,因此重新连接将在输入重新启动时发生。

执行编辑

为输入提供的执行环境包括 mito 库提供的函数、宏和全局变量。单个 JSON 对象作为输入提供,可通过 state 变量访问。state 包含一个 response 映射字段,并且可能包含通过输入的 state 配置配置的任意其他字段。如果 CEL 程序在程序执行之间保存游标状态,则在执行之前,配置的 state.cursor 值将被保存的游标替换。

启动时,state 将类似于以下内容

{
    "response": { ... },
    "cursor": { ... },
    ...
}

websocket 输入在状态映射中创建一个 response 字段,并将 websocket 消息附加到此字段。所有编写的 CEL 程序都应对此 response 字段进行操作。其他字段可能出现在对象的根目录下,如果程序允许,则游标值可能不存在。只有游标会在重启后持久化,但状态中的所有字段在处理循环的每次迭代之间都保留,但生成的事件数组除外,如下所示。

如果游标存在,程序应根据其值处理或过滤掉响应。如果游标不存在,则应根据程序的逻辑处理所有响应。

程序执行完成后,它应该返回一个结构如下所示的单个对象

{
    "events": [ 
        {...},
        ...
    ],
    "cursor": [ 
        {...},
        ...
    ]
}

events 字段必须存在,但可以为空或 null。如果它不为空,则它必须只有对象作为元素。该字段可以是数组或单个对象,它们将被视为具有单个元素的数组。这完全取决于 websocket 服务器或 API 端点。events 字段是要发布到输出的事件数组。每个事件都必须是一个 JSON 对象。

如果 cursor 存在,则它必须是单个对象或与事件长度相同的数组;cursor 的每个元素 *i* 将是在 events 数组中获取事件 i 及其之后事件的详细信息。如果 cursor 是单个对象,则它将是在 events 数组中最后一个事件之后获取事件的详细信息,并且仅在成功发布 events 数组中的所有事件后才会保留。

示例配置

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
  url: ws://127.0.0.1:443/v1/stream
  program: |
    bytes(state.response).decode_json().as(inner_body,{
      "events": {
        "message":  inner_body.encode_json(),
      }
    })

调试状态日志记录编辑

Websocket 输入将在 CEL 评估前后以 DEBUG 级别记录日志时记录完整状态。这将包括保存在 state 对象中的任何敏感或机密信息,因此在将敏感信息保留在 state 对象中时,不应在生产环境中使用 DEBUG 级别日志记录。有关从 DEBUG 日志中排除敏感字段的设置,请参阅 redact 配置参数。

身份验证编辑

Websocket 输入支持通过基本令牌身份验证、承载令牌身份验证和通过自定义身份验证配置进行身份验证。与 REST 输入不同,基本身份验证包含基本身份验证令牌,承载身份验证包含承载令牌,自定义身份验证包含自定义标头和值的任意组合。这些令牌/密钥值将添加到请求标头中,并且不会暴露给 state 对象。自定义身份验证配置对于构建需要自定义标头和值进行身份验证的请求非常有用。基本和承载令牌配置将始终使用 Authorization 标头,并分别在令牌前添加 BasicBearer

使用身份验证的示例配置

filebeat.inputs:
- type: websocket
  auth.basic_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream
filebeat.inputs:
- type: websocket
  auth.bearer_token: "dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream
filebeat.inputs:
- type: websocket
  auth.custom:
    header: "x-api-key"
    value: "dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream
filebeat.inputs:
- type: websocket
  auth.custom:
    header: "Auth"
    value: "Bearer dXNlcjpwYXNzd29yZA=="
  url: wss://127.0.0.1:443/_stream

输入状态编辑

websocket 输入在收到的每条消息之间保留一个运行时状态。CEL 程序可以访问此状态,并且该状态可能包含任意对象。状态必须包含一个 response 映射,并且可以包含用户希望存储在其中的任何对象。所有对象都存储在运行时,但 cursor 除外,它的值在重启后持久化。

配置选项编辑

websocket 输入支持以下配置选项以及稍后描述的 通用选项

program编辑

在收到的每条消息上执行的 CEL 程序。理想情况下,此字段应该存在,但如果不存在,则使用下面给出的默认程序。

program: |
  bytes(state.response).decode_json().as(inner_body,{
    "events": {
      "message":  inner_body.encode_json(),
    }
  })

state编辑

state 是一个可选对象,在第一次执行时传递给 CEL 程序。它作为 state 变量提供给正在执行的程序。除了 state.cursor 字段外,state 不会在重启后持久化。

state.cursor编辑

游标是一个可作为 state.cursor 使用的对象,可以在其中存储任意值。游标状态在输入重启之间保持,并在发布请求的每个事件后更新。使用游标时,CEL 程序必须为程序返回的每个事件创建游标状态,或者创建反映完整事件集完成情况的单个游标。

filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
  url: ws://127.0.0.1:443/v1/stream
  program: |
    bytes(state.response).as(body, {
      "events": [body.decode_json().with({
        "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ?
          state.cursor.last_requested_at
        :
          now
      })],
      "cursor": {"last_requested_at": now}
    })

regexp编辑

一组命名的正则表达式,可以在 CEL 程序执行期间使用 regexp 扩展库使用。用于正则表达式的语法是 RE2

filebeat.inputs:
- type: websocket
  # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution.
  regexp:
    products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
    solutions: '(?i)(Search|Observability|Security)'

redact编辑

在调试级别日志记录期间,state 对象和结果评估结果包含在日志中。这可能会导致泄露机密。为了防止这种情况,可以从记录的 state 中编辑或删除字段。redact 配置允许用户配置此字段编辑行为。出于安全原因,如果缺少 redact 配置,则会记录警告。

如果不需要编辑,则应使用空的 redact.fields 配置来抑制记录的警告。

- type: websocket
  redact:
    fields: ~

例如,如果在 CEL 程序中使用用户构建的基本身份验证请求,则可以像这样编辑密码

filebeat.inputs:
- type: websocket
  url: ws://127.0.0.1:443/_stream
  state:
    user: [email protected]
    password: P@$$W0₹D
  redact:
    fields:
      - password
    delete: true

请注意,auth 配置层次结构下的字段不会暴露给 state,因此不需要编辑。因此,如果可能,最好使用这些字段进行身份验证,而不是使用上面显示的请求构造。

redact.fields编辑

这指定了在调试日志记录之前要在 state 中编辑的字段。此数组中列出的字段将被替换为 * 或从发送到调试日志的消息中完全删除。

redact.delete编辑

这指定了是应该将字段替换为 * 还是从发送到调试日志的消息中完全删除。如果 delete 为 true,则将删除字段而不是替换。

指标编辑

此输入在 HTTP 监控端点 下公开指标。这些指标在 /inputs 路径下公开。它们可用于观察输入的活动。

指标 描述

url

输入资源的 URL。

cel_eval_errors

cel 程序评估期间遇到的错误数。

errors_total

在输入的生命周期内遇到的错误总数。

batches_received_total

接收到的事件数组数。

batches_published_total

发布的事件数组数。

received_bytes_total

在输入的生命周期内接收到的字节数。

events_received_total

接收到的事件数。

events_published_total

发布的事件数。

cel_processing_time

成功的 CEL 程序处理时间(以纳秒为单位)的直方图。

batch_processing_time

成功的批处理时间(以纳秒为单位)的直方图(从接收时间到非空批处理的 ACK 时间)。

开发者工具编辑

Elastic Mito 代码库中,提供了一个独立的 CEL 环境,它实现了 websocket 输入的注释表达式语言 (CEL) 功能的大部分。此工具可用于帮助开发供输入使用的 CEL 程序。可以通过运行 go install github.com/elastic/mito/cmd/mito@latest 从源代码进行安装,并需要 Go 工具链。

通用选项编辑

所有输入都支持以下配置选项。

enabled编辑

使用 enabled 选项启用和禁用输入。默认情况下,enabled 设置为 true。

tags编辑

Filebeat 包含在每个已发布事件的 tags 字段中的一系列标签。标签可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件过滤。这些标签将附加到常规配置中指定的标签列表。

示例

filebeat.inputs:
- type: websocket
  . . .
  tags: ["json"]
fields编辑

您可以指定可选字段,以向输出添加其他信息。例如,您可以添加可用于过滤日志数据的字段。字段可以是标量值、数组、字典或这些值的任何嵌套组合。默认情况下,您在此处指定的字段将在输出文档的 fields 子字典下分组。要将自定义字段存储为顶级字段,请将 fields_under_root 选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。

filebeat.inputs:
- type: websocket
  . . .
  fields:
    app_id: query_engine_12
fields_under_root编辑

如果此选项设置为 true,则自定义 字段 将作为顶级字段存储在输出文档中,而不是分组在 fields 子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。

processors编辑

要应用于输入数据的一系列处理器。

有关在配置中指定处理器的信息,请参阅处理器

pipeline编辑

要为由此输入生成的事件设置的摄取管道 ID。

管道 ID 也可以在 Elasticsearch 输出中配置,但此选项通常会导致配置文件更简单。如果在输入和输出中都配置了管道,则使用来自输入的选项。

keep_null编辑

如果此选项设置为 true,则值为 null 的字段将在输出文档中发布。默认情况下,keep_null 设置为 false

index编辑

如果存在,则此格式字符串将覆盖此输入事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能会扩展为 "filebeat-myindex-2019.11.01"

publisher_pipeline.disable_host编辑

默认情况下,所有事件都包含 host.name。可以将此选项设置为 true 以禁用将此字段添加到所有事件。默认值为 false

websocket 输入当前标记为实验性,可能存在错误和其他问题。请在 Github 代码库中报告任何问题。