流式输入
编辑流式输入
编辑此功能为技术预览版,可能会在未来的版本中更改或删除。Elastic 将努力修复任何问题,但技术预览版的功能不受官方 GA 功能的支持 SLA 的约束。
streaming
输入从流式数据源(例如 websocket 服务器)读取消息。此输入在内部使用 CEL 引擎
和 mito
库来解析和处理消息。支持 CEL
使您能够以更灵活的方式解析和处理消息。它在 CEL
程序的编写方式上与 cel
输入有很多相似之处,但在消息的读取和处理方式上有所不同。目前支持 websocket 服务器或 API 端点以及 Crowdstrike Falcon 流式 API。
websocket 流式输入支持
-
身份验证
- 基本
- 持有者
- 自定义
streaming
输入 websocket 处理程序目前不支持 XML 消息。目前也不支持自动重新连接,因此重新连接将在输入重启时发生。
Crowdstrike 流式输入需要 OAuth2.0,如 Crowdstrike API 文档中所述。使用 Crowdstrike 流式类型时,必须设置 crowdstrike_app_id
配置字段。此字段指定发送到 Crowdstrike API 的 appId
参数。有关详细信息,请参阅 Crowdstrike 文档。
stream_type
配置字段指定要使用的流式输入类型,“websocket”或“crowdstrike”。如果未设置,则输入默认为 websocket 流式传输。
执行
编辑为输入提供的执行环境包括 mito 库提供的函数、宏和全局变量。单个 JSON 对象作为可通过 state
变量访问的输入提供。state
包含一个 response
映射字段,并且可能包含通过输入的 state
配置配置的任意其他字段。如果 CEL 程序在程序执行之间保存游标状态,则配置的 state.cursor
值将在执行之前被保存的游标替换。
启动时,state
将如下所示
{ "response": { ... }, "cursor": { ... }, ... }
streaming
输入 websocket 处理程序在状态映射中创建一个 response
字段,并将 websocket 消息附加到此字段。所有编写的 CEL
程序都应作用于此 response
字段。其他字段可能存在于对象的根目录,并且如果程序可以容忍,则游标值可能不存在。只有游标会在重启时持久化,但状态中的所有字段都会在处理循环的迭代之间保留,除了生成的事件数组,请参见下文。
如果存在游标,则程序应根据其值处理或筛选响应。如果不存在游标,则应根据程序的逻辑处理所有响应。
程序执行完成后,应返回一个结构如下的单个对象
|
|
如果存在 |
配置示例
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: streaming url: ws://127.0.0.1:443/v1/stream program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
filebeat.inputs: # Read and process events from the Crowdstrike Falcon Hose API - type: streaming stream_type: crowdstrike url: https://api.crowdstrike.com/sensors/entities/datafeed/v2 auth: client_id: a23fcea2643868ef1a41565a1a8a1c7c client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK token_url: https://api.crowdstrike.com/oauth2/token crowdstrike_app_id: my_app_id program: | state.response.decode_json().as(body,{ "events": [body], ?"cursor": has(body.?metadata.offset) ? optional.of({"offset": body.metadata.offset}) : optional.none(), })
调试状态日志记录
编辑当以 DEBUG 级别记录时,Websocket 输入将在 CEL 求值之前和之后记录完整状态。这将包括保存在 state
对象中的任何敏感或机密信息,因此当敏感信息保留在 state
对象中时,不应在生产中使用 DEBUG 级别日志记录。有关从 DEBUG 日志中排除敏感字段的设置,请参见 redact
配置参数。
身份验证
编辑websocket 流式输入支持通过基本令牌身份验证、持有者令牌身份验证和通过自定义身份验证配置进行身份验证。与 REST 输入不同,基本身份验证包含基本身份验证令牌,持有者身份验证包含持有者令牌,自定义身份验证包含自定义标头和值的任意组合。这些令牌/密钥值将添加到请求标头,并且不会暴露给 state
对象。自定义身份验证配置对于构建需要自定义标头和值进行身份验证的请求非常有用。基本和持有者令牌配置将始终使用 Authorization
标头,并在令牌前面分别添加 Basic
或 Bearer
。
具有身份验证的配置示例
filebeat.inputs: - type: streaming auth.basic_token: "dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
filebeat.inputs: - type: streaming auth.bearer_token: "dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
filebeat.inputs: - type: streaming auth.custom: header: "x-api-key" value: "dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
filebeat.inputs: - type: streaming auth.custom: header: "Auth" value: "Bearer dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
crowdstrike 流式输入需要使用客户端 ID、客户端密钥和令牌 URL 进行 OAuth2.0 身份验证。这些值不会暴露给 state
对象。OAuth2.0 范围和端点参数可通过 auth.scopes
和 auth.endpoint_params
配置参数获得。
filebeat.inputs: - type: streaming stream_type: crowdstrike auth: client_id: a23fcea2643868ef1a41565a1a8a1c7c client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK token_url: https://api.crowdstrike.com/oauth2/token
输入状态
编辑streaming
输入在接收到的每条消息之间保持运行时状态。此状态可由 CEL 程序访问,并且可能包含任意对象。该状态必须包含一个 response
映射,并且可能包含用户希望存储在其中的任何对象。所有对象都在运行时存储,除了 cursor
,它的值在重启之间持久化。
配置选项
编辑streaming
输入支持以下配置选项以及稍后描述的 通用选项。
stream_type
编辑要使用的流式传输类型。这可以是“websocket”、“crowdstrike”或未设置。如果未设置该字段,则使用 websocket 流式传输。
program
编辑在接收到的每条消息上执行的 CEL 程序。理想情况下,此字段应存在,但如果不存在,则使用下面给出的默认程序。
program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
url_program
编辑如果存在,则此 CEL 程序在建立流式连接之前,使用 state
对象(包括任何存储的游标值)执行。它必须求值为有效的 URL。返回的 URL 用于建立流式连接以进行处理。该程序可以使用游标值或其他定义的状态值来自定义运行时 URL。
url: ws://testapi:443/v1/streamresults state: initial_start_time: "2022-01-01T00:00:00Z" url_program: | state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time) program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), }, "cursor": { "since": inner_body.timestamp } })
state
编辑state
是一个可选对象,它在第一次执行时传递给 CEL 程序。它作为 state
变量提供给执行程序。除了 state.cursor
字段外,state
不会在重启时持久化。
state.cursor
编辑游标是一个对象,可以作为 state.cursor
访问,可以在其中存储任意值。游标状态在输入重启之间保持,并在请求的每个事件发布后更新。使用游标时,CEL 程序必须为程序返回的每个事件创建一个游标状态,或者创建一个反映完整事件集的游标的单个游标。
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: streaming url: ws://127.0.0.1:443/v1/stream program: | bytes(state.response).as(body, { "events": [body.decode_json().with({ "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ? state.cursor.last_requested_at : now })], "cursor": {"last_requested_at": now} })
regexp
编辑一组命名的正则表达式,可以在 CEL 程序的执行期间使用 regexp
扩展库使用。用于正则表达式的语法为 RE2。
filebeat.inputs: - type: streaming # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution. regexp: products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)' solutions: '(?i)(Search|Observability|Security)'
redact
编辑在调试级别日志记录期间,日志中会包含 state
对象和生成的求值结果。这可能会导致泄露机密信息。为了防止这种情况,可以从记录的 state
中删除或删除字段。redact
配置允许用户配置此字段删除行为。出于安全原因,如果缺少 redact
配置,则会记录警告。
在不需要删除的情况下,应使用空的 redact.fields
配置来消除记录的警告。
- type: streaming redact: fields: ~
例如,如果 CEL 程序中使用了用户构建的基本身份验证请求,则可以按如下方式删除密码
filebeat.inputs: - type: streaming url: ws://127.0.0.1:443/_stream state: user: [email protected] password: P@$$W0₹D redact: fields: - password delete: true
请注意,auth
配置层次结构下的字段不会暴露给 state
,因此不需要删除。因此,尽可能优先使用这些字段进行身份验证,而不是上面显示的请求构建。
redact.fields
编辑这指定在调试日志记录之前要删除的 state
中的字段。此数组中列出的字段将被替换为 *
或完全从发送到调试日志的消息中删除。
redact.delete
编辑这指定字段应替换为 *
还是完全从发送到调试日志的消息中删除。如果 delete 为 true
,则将删除字段,而不是替换。
retry
编辑retry
配置允许用户指定在连接失败的情况下,输入应尝试重新连接到流式数据源的次数。默认值为 nil
,表示不尝试重试。它具有 wait_min
和 wait_max
配置,用于指定重试之间的最小和最大等待时间。
filebeat.inputs: - type: streaming url: ws://127.0.0.1:443/_stream program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } }) retry: max_attempts: 5 wait_min: 1s wait_max: 10s
retry.max_attempts
编辑在连接失败的情况下,输入应尝试重新连接到流式数据源的最大次数。默认值为 5
,表示最多尝试 5 次重试。
retry.wait_min
编辑重试之间的最小等待时间。这确保重试之间有足够的间隔,以便系统有时间恢复或解决瞬时问题,而不是用快速重试轰炸系统。例如,wait_min
可以设置为 1 秒,这意味着即使计算出的退避时间小于此值,客户端也会至少等待 1 秒再重试。默认值为 1
秒。
retry.wait_max
编辑重试之间的最大等待时间。这可以防止重试机制变得太慢,确保客户端不会在重试之间无限期地等待。这在超时或用户体验至关重要的系统中至关重要。例如,wait_max
可以设置为 10 秒,这意味着即使计算出的退避时间大于此值,客户端也最多等待 10 秒再重试。默认值为 30
秒。
timeout
编辑超时是 websocket 连接器等待建立连接的最大时间。默认值为 180
秒。
proxy_url
编辑这指定用于连接的正向代理 URL。proxy_url
配置是可选的,可用于配置连接的代理设置。proxy_url
的默认值由 http.ProxyFromEnvironment
设置,它读取 HTTP_PROXY
、HTTPS_PROXY
和 NO_PROXY
环境变量。
proxy_headers
编辑这指定要发送到代理服务器的标头。proxy_headers
配置是可选的,可用于配置要发送到代理服务器的标头。
ssl
编辑这指定连接的 SSL 配置。ssl
配置是可选的,可用于配置连接的 SSL 设置。ssl
配置具有以下子字段
-
certificate_authorities
:用于验证服务器证书的根证书列表。 -
certificate
:用于客户端身份验证的(PEM 编码的)证书。 -
key
:用于客户端身份验证的(PEM 编码的)私钥。
如果这是自签名证书,则应将 certificate_authorities
字段设置为证书本身。
指标
编辑此输入在 HTTP 监控端点下公开指标。这些指标在 /inputs
路径下公开。它们可用于观察输入的活动。
指标 | 描述 |
---|---|
|
输入资源的 URL。 |
|
CEL 程序评估期间遇到的错误数。 |
|
输入生命周期中遇到的错误总数。 |
|
接收到的事件数组数。 |
|
发布的事件数组数。 |
|
在输入生命周期中接收到的字节总数。 |
|
接收到的事件数。 |
|
发布的事件数。 |
|
成功 CEL 程序处理时间(以纳秒为单位)的直方图。 |
|
成功批处理时间(以纳秒为单位)的直方图(从接收到非空批次的 ACK 的时间)。 |
开发人员工具
编辑一个独立的 CEL 环境,实现了流式输入的大部分注释表达式语言功能,可在 Elastic Mito 存储库中找到。此工具可用于帮助开发输入使用的 CEL 程序。可以通过运行 go install github.com/elastic/mito/cmd/mito@latest
从源代码进行安装,并且需要 Go 工具链。
通用选项
编辑所有输入都支持以下配置选项。
enabled
编辑使用 enabled
选项启用和禁用输入。默认情况下,enabled 设置为 true。
tags
编辑Filebeat 在每个已发布事件的 tags
字段中包含的标签列表。标签使您可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件筛选。这些标签将附加到常规配置中指定的标签列表中。
示例
filebeat.inputs: - type: streaming . . . tags: ["json"]
fields
编辑您可以指定的用于向输出添加其他信息的可选字段。例如,您可以添加可用于筛选日志数据的字段。字段可以是标量值、数组、字典或这些的任何嵌套组合。默认情况下,您在此处指定的字段将分组在输出文档中的 fields
子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root
选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。
filebeat.inputs: - type: streaming . . . fields: app_id: query_engine_12
fields_under_root
编辑如果此选项设置为 true,则自定义 字段将作为输出文档中的顶级字段存储,而不是分组在 fields
子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。
processors
编辑要应用于输入数据的处理器列表。
有关在配置中指定处理器的信息,请参阅 处理器。
pipeline
编辑为此输入生成的事件设置的摄取管道 ID。
也可以在 Elasticsearch 输出中配置管道 ID,但此选项通常会产生更简单的配置文件。如果在输入和输出中都配置了管道,则使用输入中的选项。
keep_null
编辑如果此选项设置为 true,则具有 null
值的字段将发布在输出文档中。默认情况下,keep_null
设置为 false
。
index
编辑如果存在,则此格式化的字符串会覆盖来自此输入的事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index
字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index
或处理器。
示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"
可能会展开为 "filebeat-myindex-2019.11.01"
。
publisher_pipeline.disable_host
编辑默认情况下,所有事件都包含 host.name
。此选项可以设置为 true
以禁用向所有事件添加此字段。默认值为 false
。
streaming
输入目前被标记为实验性,可能存在错误和其他问题。请在 Github 存储库上报告任何问题。