Websocket 输入
编辑Websocket 输入编辑
此功能处于技术预览阶段,可能会在未来版本中更改或删除。Elastic 将努力修复任何问题,但技术预览版中的功能不受官方 GA 功能的支持 SLA 的约束。
websocket
输入从 websocket 服务器或 API 端点读取消息。此输入使用 CEL 引擎
和 mito
库来解析和处理消息。支持 CEL
允许您以更灵活的方式解析和处理消息。它在 CEL
程序的编写方式上与 cel
输入有很多相似之处,但在读取和处理消息的方式上有所不同。websocket
输入是一种 流式
输入,只能用于从 websocket 服务器或 API 端点读取消息。
此输入支持
-
身份验证
- 基本
- 承载
- 自定义
websocket
输入目前不支持 XML 消息。目前也不支持自动重新连接,因此重新连接将在输入重新启动时发生。
执行编辑
为输入提供的执行环境包括 mito 库提供的函数、宏和全局变量。单个 JSON 对象作为输入提供,可通过 state
变量访问。state
包含一个 response
映射字段,并且可能包含通过输入的 state
配置配置的任意其他字段。如果 CEL 程序在程序执行之间保存游标状态,则在执行之前,配置的 state.cursor
值将被保存的游标替换。
启动时,state
将类似于以下内容
{ "response": { ... }, "cursor": { ... }, ... }
websocket
输入在状态映射中创建一个 response
字段,并将 websocket 消息附加到此字段。所有编写的 CEL
程序都应对此 response
字段进行操作。其他字段可能出现在对象的根目录下,如果程序允许,则游标值可能不存在。只有游标会在重启后持久化,但状态中的所有字段在处理循环的每次迭代之间都保留,但生成的事件数组除外,如下所示。
如果游标存在,程序应根据其值处理或过滤掉响应。如果游标不存在,则应根据程序的逻辑处理所有响应。
程序执行完成后,它应该返回一个结构如下所示的单个对象
|
|
如果 |
示例配置
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: websocket url: ws://127.0.0.1:443/v1/stream program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
调试状态日志记录编辑
Websocket 输入将在 CEL 评估前后以 DEBUG 级别记录日志时记录完整状态。这将包括保存在 state
对象中的任何敏感或机密信息,因此在将敏感信息保留在 state
对象中时,不应在生产环境中使用 DEBUG 级别日志记录。有关从 DEBUG 日志中排除敏感字段的设置,请参阅 redact
配置参数。
身份验证编辑
Websocket 输入支持通过基本令牌身份验证、承载令牌身份验证和通过自定义身份验证配置进行身份验证。与 REST 输入不同,基本身份验证包含基本身份验证令牌,承载身份验证包含承载令牌,自定义身份验证包含自定义标头和值的任意组合。这些令牌/密钥值将添加到请求标头中,并且不会暴露给 state
对象。自定义身份验证配置对于构建需要自定义标头和值进行身份验证的请求非常有用。基本和承载令牌配置将始终使用 Authorization
标头,并分别在令牌前添加 Basic
或 Bearer
。
使用身份验证的示例配置
filebeat.inputs: - type: websocket auth.basic_token: "dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
filebeat.inputs: - type: websocket auth.bearer_token: "dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
filebeat.inputs: - type: websocket auth.custom: header: "x-api-key" value: "dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
filebeat.inputs: - type: websocket auth.custom: header: "Auth" value: "Bearer dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
输入状态编辑
websocket
输入在收到的每条消息之间保留一个运行时状态。CEL 程序可以访问此状态,并且该状态可能包含任意对象。状态必须包含一个 response
映射,并且可以包含用户希望存储在其中的任何对象。所有对象都存储在运行时,但 cursor
除外,它的值在重启后持久化。
配置选项编辑
websocket
输入支持以下配置选项以及稍后描述的 通用选项。
program
编辑
在收到的每条消息上执行的 CEL 程序。理想情况下,此字段应该存在,但如果不存在,则使用下面给出的默认程序。
program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
state
编辑
state
是一个可选对象,在第一次执行时传递给 CEL 程序。它作为 state
变量提供给正在执行的程序。除了 state.cursor
字段外,state
不会在重启后持久化。
state.cursor
编辑
游标是一个可作为 state.cursor
使用的对象,可以在其中存储任意值。游标状态在输入重启之间保持,并在发布请求的每个事件后更新。使用游标时,CEL 程序必须为程序返回的每个事件创建游标状态,或者创建反映完整事件集完成情况的单个游标。
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: websocket url: ws://127.0.0.1:443/v1/stream program: | bytes(state.response).as(body, { "events": [body.decode_json().with({ "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ? state.cursor.last_requested_at : now })], "cursor": {"last_requested_at": now} })
regexp
编辑
一组命名的正则表达式,可以在 CEL 程序执行期间使用 regexp
扩展库使用。用于正则表达式的语法是 RE2。
filebeat.inputs: - type: websocket # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution. regexp: products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)' solutions: '(?i)(Search|Observability|Security)'
redact
编辑
在调试级别日志记录期间,state
对象和结果评估结果包含在日志中。这可能会导致泄露机密。为了防止这种情况,可以从记录的 state
中编辑或删除字段。redact
配置允许用户配置此字段编辑行为。出于安全原因,如果缺少 redact
配置,则会记录警告。
如果不需要编辑,则应使用空的 redact.fields
配置来抑制记录的警告。
- type: websocket redact: fields: ~
例如,如果在 CEL 程序中使用用户构建的基本身份验证请求,则可以像这样编辑密码
filebeat.inputs: - type: websocket url: ws://127.0.0.1:443/_stream state: user: [email protected] password: P@$$W0₹D redact: fields: - password delete: true
请注意,auth
配置层次结构下的字段不会暴露给 state
,因此不需要编辑。因此,如果可能,最好使用这些字段进行身份验证,而不是使用上面显示的请求构造。
redact.fields
编辑
这指定了在调试日志记录之前要在 state
中编辑的字段。此数组中列出的字段将被替换为 *
或从发送到调试日志的消息中完全删除。
redact.delete
编辑
这指定了是应该将字段替换为 *
还是从发送到调试日志的消息中完全删除。如果 delete 为 true
,则将删除字段而不是替换。
指标编辑
此输入在 HTTP 监控端点 下公开指标。这些指标在 /inputs
路径下公开。它们可用于观察输入的活动。
指标 | 描述 |
---|---|
|
输入资源的 URL。 |
|
cel 程序评估期间遇到的错误数。 |
|
在输入的生命周期内遇到的错误总数。 |
|
接收到的事件数组数。 |
|
发布的事件数组数。 |
|
在输入的生命周期内接收到的字节数。 |
|
接收到的事件数。 |
|
发布的事件数。 |
|
成功的 CEL 程序处理时间(以纳秒为单位)的直方图。 |
|
成功的批处理时间(以纳秒为单位)的直方图(从接收时间到非空批处理的 ACK 时间)。 |
开发者工具编辑
在 Elastic Mito 代码库中,提供了一个独立的 CEL 环境,它实现了 websocket 输入的注释表达式语言 (CEL) 功能的大部分。此工具可用于帮助开发供输入使用的 CEL 程序。可以通过运行 go install github.com/elastic/mito/cmd/mito@latest
从源代码进行安装,并需要 Go 工具链。
通用选项编辑
所有输入都支持以下配置选项。
enabled
编辑
使用 enabled
选项启用和禁用输入。默认情况下,enabled 设置为 true。
tags
编辑
Filebeat 包含在每个已发布事件的 tags
字段中的一系列标签。标签可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件过滤。这些标签将附加到常规配置中指定的标签列表。
示例
filebeat.inputs: - type: websocket . . . tags: ["json"]
fields
编辑
您可以指定可选字段,以向输出添加其他信息。例如,您可以添加可用于过滤日志数据的字段。字段可以是标量值、数组、字典或这些值的任何嵌套组合。默认情况下,您在此处指定的字段将在输出文档的 fields
子字典下分组。要将自定义字段存储为顶级字段,请将 fields_under_root
选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。
filebeat.inputs: - type: websocket . . . fields: app_id: query_engine_12
fields_under_root
编辑
如果此选项设置为 true,则自定义 字段 将作为顶级字段存储在输出文档中,而不是分组在 fields
子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。
processors
编辑
要应用于输入数据的一系列处理器。
有关在配置中指定处理器的信息,请参阅处理器。
pipeline
编辑
要为由此输入生成的事件设置的摄取管道 ID。
管道 ID 也可以在 Elasticsearch 输出中配置,但此选项通常会导致配置文件更简单。如果在输入和输出中都配置了管道,则使用来自输入的选项。
keep_null
编辑
如果此选项设置为 true,则值为 null
的字段将在输出文档中发布。默认情况下,keep_null
设置为 false
。
index
编辑
如果存在,则此格式字符串将覆盖此输入事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index
字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index
或处理器。
示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"
可能会扩展为 "filebeat-myindex-2019.11.01"
。
publisher_pipeline.disable_host
编辑
默认情况下,所有事件都包含 host.name
。可以将此选项设置为 true
以禁用将此字段添加到所有事件。默认值为 false
。
websocket
输入当前标记为实验性,可能存在错误和其他问题。请在 Github 代码库中报告任何问题。