流式输入
编辑流式输入
编辑此功能处于技术预览阶段,可能会在将来的版本中更改或删除。Elastic 将致力于解决任何问题,但技术预览中的功能不受官方 GA 功能的支持 SLA 的约束。
该 streaming
输入从流式数据源读取消息,例如 Websocket 服务器。此输入在内部使用 CEL 引擎
和 mito
库来解析和处理消息。支持 CEL
允许您以更灵活的方式解析和处理消息。它与 cel
输入在 CEL
程序的编写方式上有很多相似之处,但读取和处理消息的方式有所不同。目前支持 Websocket 服务器或 API 端点以及 Crowdstrike Falcon 流式 API。
Websocket 流式输入支持
-
身份验证
- 基本认证
- Bearer 认证
- 自定义认证
该 streaming
输入 Websocket 处理程序目前不支持 XML 消息。自动重新连接目前也不受支持,因此重新连接将在输入重启时发生。
Crowdstrike 流式输入需要 OAuth2.0,如 Crowdstrike API 文档中所述。使用 Crowdstrike 流式类型时,必须设置 crowdstrike_app_id
配置字段。此字段指定发送到 Crowdstrike API 的 appId
参数。有关详细信息,请参阅 Crowdstrike 文档。
该 stream_type
配置字段指定要使用的流式输入类型,“websocket”或“crowdstrike”。如果未设置,则输入默认为 websocket 流式传输。
执行
编辑为输入提供的执行环境包括 mito 库提供的函数、宏和全局变量。单个 JSON 对象作为输入提供,可以通过 state
变量访问。 state
包含一个 response
映射字段,并且可能包含通过输入的 state
配置配置的任意其他字段。如果 CEL 程序在程序执行之间保存游标状态,则在执行之前,配置的 state.cursor
值将被保存的游标替换。
启动时,state
将类似于以下内容
{ "response": { ... }, "cursor": { ... }, ... }
该 streaming
输入 Websocket 处理程序在 state 映射中创建一个 response
字段,并将 Websocket 消息附加到此字段。所有编写的 CEL
程序都应作用于此 response
字段。对象根部可能存在其他字段,如果程序容忍它,则游标值可能不存在。只有游标在重启后持久化,但在处理循环的每次迭代之间保留 state 中的所有字段,除了生成的 events 数组,如下所示。
如果存在游标,则程序应根据其值处理或过滤掉响应。如果游标不存在,则应根据程序的逻辑处理所有响应。
程序执行完成后,它应该返回一个结构类似于以下内容的单个对象
该 |
|
如果存在 |
示例配置
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: streaming url: ws://127.0.0.1:443/v1/stream program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
filebeat.inputs: # Read and process events from the Crowdstrike Falcon Hose API - type: streaming stream_type: crowdstrike url: https://api.crowdstrike.com/sensors/entities/datafeed/v2 auth: client_id: a23fcea2643868ef1a41565a1a8a1c7c client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK token_url: https://api.crowdstrike.com/oauth2/token crowdstrike_app_id: my_app_id program: | state.response.decode_json().as(body,{ "events": [body], ?"cursor": has(body.?metadata.offset) ? optional.of({"offset": body.metadata.offset}) : optional.none(), })
调试状态日志记录
编辑Websocket 输入将在 DEBUG 级别进行日志记录时,在 CEL 评估之前和之后记录完整的状态。这将包括 state
对象中保留的任何敏感或秘密信息,因此在生产环境中,当 state
对象中保留敏感信息时,不应使用 DEBUG 级别日志记录。请参阅 redact
配置参数以获取排除 DEBUG 日志中敏感字段的设置。
身份验证
编辑Websocket 流式输入支持通过基本令牌身份验证、Bearer 令牌身份验证和通过自定义身份验证配置进行身份验证。与 REST 输入不同,基本身份验证包含基本身份验证令牌,Bearer 身份验证包含 Bearer 令牌,自定义身份验证包含自定义标头和值的任意组合。这些令牌/密钥值将添加到请求标头中,并且不会暴露给 state
对象。自定义身份验证配置对于构造需要自定义标头和值进行身份验证的请求很有用。基本和 Bearer 令牌配置将始终使用 Authorization
标头,并在令牌前分别加上 Basic
或 Bearer
。
带有身份验证的示例配置
filebeat.inputs: - type: streaming auth.basic_token: "dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
filebeat.inputs: - type: streaming auth.bearer_token: "dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
filebeat.inputs: - type: streaming auth.custom: header: "x-api-key" value: "dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
filebeat.inputs: - type: streaming auth.custom: header: "Auth" value: "Bearer dXNlcjpwYXNzd29yZA==" url: wss://127.0.0.1:443/_stream
Crowdstrike 流式输入需要使用客户端 ID、客户端密钥和令牌 URL 进行 OAuth2.0 身份验证。这些值不会暴露给 state
对象。OAuth2.0 范围和端点参数可通过 auth.scopes
和 auth.endpoint_params
配置参数获得。
filebeat.inputs: - type: streaming stream_type: crowdstrike auth: client_id: a23fcea2643868ef1a41565a1a8a1c7c client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK token_url: https://api.crowdstrike.com/oauth2/token
输入状态
编辑该 streaming
输入在接收到的每条消息之间保持运行时状态。CEL 程序可以访问此状态,并且可能包含任意对象。该状态必须包含一个 response
映射,并且可能包含用户希望存储在其中的任何对象。所有对象都存储在运行时,除了 cursor
,其值在重启之间持久化。
配置选项
编辑该 streaming
输入支持以下配置选项以及稍后描述的 通用选项。
stream_type
编辑要使用的流式传输类型。这可以是“websocket”、“crowdstrike”或未设置。如果该字段未设置,则使用 websocket 流式传输。
program
编辑在接收到的每条消息上执行的 CEL 程序。此字段理想情况下应该存在,但如果不存在,则使用下面给出的默认程序。
program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } })
url_program
编辑如果存在,则在使用 state
对象(包括任何存储的游标值)建立流式连接之前执行此 CEL 程序。它必须计算为有效的 URL。返回的 URL 用于建立流式连接以进行处理。程序可以使用游标值或其他状态定义值在运行时自定义 URL。
url: ws://testapi:443/v1/streamresults state: initial_start_time: "2022-01-01T00:00:00Z" url_program: | state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time) program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), }, "cursor": { "since": inner_body.timestamp } })
state
编辑state
是一个可选对象,在第一次执行时传递给 CEL 程序。它可以作为 state
变量供执行程序使用。除了 state.cursor
字段外,state
在重启后不会持久化。
state.cursor
编辑游标是一个可作为 state.cursor
使用的对象,其中可以存储任意值。游标状态在输入重启之间保持不变,并在请求的每个事件发布后更新。当使用游标时,CEL 程序必须为程序返回的每个事件创建一个游标状态,或者创建一个反映完整事件集完成的游标。
filebeat.inputs: # Read and process simple websocket messages from a local websocket server - type: streaming url: ws://127.0.0.1:443/v1/stream program: | bytes(state.response).as(body, { "events": [body.decode_json().with({ "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ? state.cursor.last_requested_at : now })], "cursor": {"last_requested_at": now} })
regexp
编辑一组命名的正则表达式,可以在 CEL 程序执行期间使用 regexp
扩展库使用。正则表达式的语法是 RE2。
filebeat.inputs: - type: streaming # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution. regexp: products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)' solutions: '(?i)(Search|Observability|Security)'
redact
编辑在调试级别日志记录期间,state
对象和结果评估结果包含在日志中。这可能导致泄漏机密。为了防止这种情况,可以从记录的 state
中删除或编辑字段。该 redact
配置允许用户配置此字段编辑行为。出于安全原因,如果缺少 redact
配置,则会记录警告。
在不需要编辑的情况下,应使用空的 redact.fields
配置来抑制记录的警告。
- type: streaming redact: fields: ~
例如,如果用户构建的基本身份验证请求在 CEL 程序中使用,则密码可以像这样编辑
filebeat.inputs: - type: streaming url: ws://127.0.0.1:443/_stream state: user: [email protected] password: P@$$W0₹D redact: fields: - password delete: true
请注意,auth
配置层次结构下的字段不会暴露给 state
,因此不需要进行脱敏。因此,在可能的情况下,最好使用这些字段进行身份验证,而不是使用上面显示的请求构造方法。
redact.fields
编辑这指定了在调试日志记录之前要从 state
中脱敏的字段。此数组中列出的字段将替换为 *
或从发送到调试日志的消息中完全删除。
redact.delete
编辑这指定了字段是替换为 *
还是从发送到调试日志的消息中完全删除。如果 delete 为 true
,则字段将被删除而不是替换。
retry
编辑retry
配置允许用户指定在连接失败的情况下,输入尝试重新连接到流数据源的次数。默认值为 nil
,表示不会尝试重新连接。它具有 wait_min
和 wait_max
配置,分别指定重试之间等待的最小和最大时间。
filebeat.inputs: - type: streaming url: ws://127.0.0.1:443/_stream program: | bytes(state.response).decode_json().as(inner_body,{ "events": { "message": inner_body.encode_json(), } }) retry: max_attempts: 5 wait_min: 1s wait_max: 10s
retry.max_attempts
编辑在连接失败的情况下,输入尝试重新连接到流数据源的最大次数。默认值为 nil
,表示不会尝试重新连接。
retry.wait_min
编辑重试之间等待的最小时间。这确保了重试间隔足够长,以便系统有时间恢复或解决瞬态问题,而不是用快速的重试轰炸系统。例如,wait_min
可以设置为 1 秒,这意味着即使计算出的回退时间小于此值,客户端也将至少等待 1 秒后再重试。
retry.wait_max
编辑重试之间等待的最大时间。这可以防止重试机制变得太慢,确保客户端不会在重试之间无限期地等待。这在超时或用户体验至关重要的系统中至关重要。例如,wait_max
可以设置为 10 秒,这意味着即使计算出的回退时间大于此值,客户端也将最多等待 10 秒后再重试。
指标
编辑此输入在 HTTP 监控端点 下公开指标。这些指标在 /inputs
路径下公开。它们可用于观察输入的活动。
指标 | 描述 |
---|---|
|
输入资源的 URL。 |
|
在 cel 程序评估期间遇到的错误数量。 |
|
输入生命周期中遇到的错误数量。 |
|
接收到的事件数组数量。 |
|
发布的事件数组数量。 |
|
输入生命周期中接收到的字节数。 |
|
接收到的事件数量。 |
|
发布的事件数量。 |
|
成功 CEL 程序处理时间的直方图(以纳秒为单位)。 |
|
成功批处理时间的直方图(以纳秒为单位)(非空批次的接收时间到 ACK 时间)。 |
开发者工具
编辑在 Elastic Mito 存储库中提供了一个独立的 CEL 环境,它实现了流输入的大部分注释表达式语言功能。此工具可用于帮助开发输入将使用的 CEL 程序。可以通过运行 go install github.com/elastic/mito/cmd/mito@latest
从源代码进行安装,并且需要一个 Go 工具链。
通用选项
编辑以下配置选项受所有输入支持。
enabled
编辑使用 enabled
选项启用和禁用输入。默认情况下,enabled 设置为 true。
tags
编辑Filebeat 在每个发布事件的 tags
字段中包含的一系列标签。标签使在 Kibana 中选择特定事件或在 Logstash 中应用条件过滤变得容易。这些标签将附加到通用配置中指定的标签列表中。
示例
filebeat.inputs: - type: streaming . . . tags: ["json"]
fields
编辑您可以指定的可选字段,以向输出添加其他信息。例如,您可以添加可用于过滤日志数据的字段。字段可以是标量值、数组、字典或这些的任何嵌套组合。默认情况下,您在此处指定的字段将分组在输出文档中的 fields
子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root
选项设置为 true。如果在通用配置中声明了重复的字段,则其值将被此处声明的值覆盖。
filebeat.inputs: - type: streaming . . . fields: app_id: query_engine_12
fields_under_root
编辑如果将此选项设置为 true,则自定义 fields 将存储为输出文档中的顶级字段,而不是分组在 fields
子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段会覆盖其他字段。
processors
编辑要应用于输入数据的一系列处理器。
有关在配置中指定处理器的信息,请参阅 处理器。
pipeline
编辑为此输入生成的事件设置的摄取管道 ID。
管道 ID 也可以在 Elasticsearch 输出中配置,但此选项通常会导致更简单的配置文件。如果在输入和输出中都配置了管道,则使用输入中的选项。
keep_null
编辑如果将此选项设置为 true,则输出文档中将发布具有 null
值的字段。默认情况下,keep_null
设置为 false
。
index
编辑如果存在,此格式化字符串将覆盖此输入事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index
字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index
或处理器。
示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"
可能扩展为 "filebeat-myindex-2019.11.01"
。
publisher_pipeline.disable_host
编辑默认情况下,所有事件都包含 host.name
。此选项可以设置为 true
以禁用将此字段添加到所有事件中。默认值为 false
。
streaming
输入目前被标记为实验性,可能存在错误和其他问题。请在 Github 存储库中报告任何问题。