Kafka 输入
编辑Kafka 输入
编辑使用 kafka
输入从 Kafka 集群中的主题读取数据。
要配置此输入,请指定集群中一个或多个 hosts
列表以引导连接、要跟踪的 topics
列表,以及连接的 group_id
。
配置示例
filebeat.inputs: - type: kafka hosts: - kafka-broker-1:9092 - kafka-broker-2:9092 topics: ["my-topic"] group_id: "filebeat"
以下示例展示了如何使用 kafka
输入来摄取来自已启用 Kafka 兼容性的 Microsoft Azure 事件中心的数据
filebeat.inputs: - type: kafka hosts: ["<your event hub namespace>.servicebus.windows.net:9093"] topics: ["<your event hub instance>"] group_id: "<your consumer group>" username: "$ConnectionString" password: "<your connection string>" ssl.enabled: true
有关 Kafka 和事件中心配置参数之间映射的更多详细信息,请参阅 Azure 文档。
兼容性
编辑此输入适用于 0.11 到 2.8.0 之间的所有 Kafka 版本。较旧的版本可能也适用,但不受支持。
配置选项
编辑kafka
输入支持以下配置选项以及稍后描述的 通用选项。
如果您正在将 Elastic Agent 与 Kafka 输入一起使用,并且需要提高吞吐量,我们建议通过额外的 Elastic Agent 水平扩展以从 Kafka 主题读取数据。请注意,每个 Elastic Agent 从其已分配的每个分区并发读取数据。
hosts
编辑此集群的 Kafka 引导主机(代理)列表。
topics
编辑要从中读取的主题列表。
group_id
编辑Kafka 消费者组 ID。
client_id
编辑Kafka 客户端 ID(可选)。
version
编辑要使用的 Kafka 协议的版本(默认为 "1.0.0"
)。
initial_offset
编辑开始读取的初始偏移量,可以是 “oldest” 或 “newest”。默认为 “oldest”。
connect_backoff
编辑在发生致命错误后尝试重新连接到 Kafka 集群之前等待的时间。默认为 30 秒。
consume_backoff
编辑在重试失败的读取之前等待的时间。默认为 2 秒。
max_wait_time
编辑在读取时等待最少输入字节数的时间。默认为 250 毫秒。
wait_close
编辑关闭时,等待传递和确认正在传输中的消息的时间。
isolation_level
编辑这将配置 Kafka 组隔离级别
-
"read_uncommitted"
返回消息通道中的所有消息。 -
"read_committed"
隐藏属于已中止事务的消息。
默认值为 "read_uncommitted"
。
expand_event_list_from_field
编辑如果使用此输入的文件集希望在特定字段下接收多个捆绑的消息,则可以将配置选项 expand_event_list_from_field
值分配给该字段的名称。例如,在 Azure 文件集的情况下,事件在 json 对象 “records” 下找到。
{ "records": [ {event1}, {event2}] }
此设置可以将组值(records)下的消息拆分为单独的事件。
rebalance
编辑Kafka 重新平衡设置
-
strategy
- 可以是
"range"
或"roundrobin"
。默认为"range"
。 -
timeout
- 尝试重新平衡的等待时间。默认为 60 秒。
-
max_retries
- 如果重新平衡失败,则重试的次数。默认为 4。
-
retry_backoff
- 在不成功的重新平衡尝试后等待的时间。默认为 2 秒。
sasl.mechanism
编辑连接到 Kafka 时要使用的 SASL 机制。它可以是以下之一
-
PLAIN
用于 SASL/PLAIN。 -
SCRAM-SHA-256
用于 SCRAM-SHA-256。 -
SCRAM-SHA-512
用于 SCRAM-SHA-512。
如果未设置 sasl.mechanism
,则如果提供了 username
和 password
,则使用 PLAIN
。否则,禁用 SASL 身份验证。
要使用 GSSAPI
机制通过 Kerberos 进行身份验证,您必须将此字段留空,并使用 kerberos
选项。
kerberos
编辑此功能处于测试阶段,可能会发生更改。设计和代码不如官方 GA 功能成熟,并且按原样提供,不提供任何保证。测试功能不受官方 GA 功能的支持 SLA 的约束。
Kerberos 身份验证的配置选项。
有关详细信息,请参阅Kerberos。
parsers
编辑此选项需要一个有效负载必须经过的解析器列表。
可用解析器
-
ndjson
-
multiline
ndjson
编辑这些选项使 Filebeat 可以将有效负载解码为 JSON 消息。
配置示例
- ndjson: target: "" add_error_key: true message_key: log
-
target
- 应包含解析后的键值对的新 JSON 对象的名称。如果将其留空,则新键将放在根目录下。
-
overwrite_keys
- 如果发生冲突,解码后的 JSON 对象中的值会覆盖 Filebeat 通常添加的字段(类型、源、偏移量等)。如果要保留先前添加的值,请禁用它。
-
expand_keys
- 如果启用此设置,Filebeat 将递归地取消解码后的 JSON 中的点号键,并将它们扩展为分层对象结构。例如,
{"a.b.c": 123}
将被扩展为{"a":{"b":{"c":123}}}
。当输入由 ECS 记录器 生成时,应启用此设置。 -
add_error_key
- 如果启用此设置,则 Filebeat 会在发生 JSON 解组错误或在配置中定义了
message_key
但无法使用时添加 “error.message” 和 “error.type: json” 键。 -
message_key
- 一个可选的配置设置,用于指定要应用行过滤和多行设置的 JSON 键。如果指定,则该键必须位于 JSON 对象的顶层,并且与该键关联的值必须是字符串,否则不会发生过滤或多行聚合。
-
document_id
- 可选的配置设置,用于指定要设置文档 ID 的 JSON 键。如果配置,该字段将从原始 JSON 文档中删除并存储在
@metadata._id
中 -
ignore_decoding_error
- 一个可选的配置设置,用于指定是否应记录 JSON 解码错误。如果设置为 true,则不会记录错误。默认值为 false。
multiline
编辑控制 Filebeat 如何处理跨多行的日志消息的选项。有关配置多行选项的更多信息,请参阅多行消息。
通用选项
编辑所有输入都支持以下配置选项。
enabled
编辑使用 enabled
选项启用和禁用输入。默认情况下,enabled 设置为 true。
tags
编辑Filebeat 在每个已发布事件的 tags
字段中包含的标签列表。标签使您可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件过滤。这些标签将附加到常规配置中指定的标签列表。
示例
filebeat.inputs: - type: kafka . . . tags: ["json"]
fields
编辑您可以指定的可选字段,以向输出添加其他信息。例如,您可能添加可用于过滤日志数据的字段。字段可以是标量值、数组、字典或这些值的任何嵌套组合。默认情况下,您在此处指定的字段将分组在输出文档中的 fields
子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root
选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。
filebeat.inputs: - type: kafka . . . fields: app_id: query_engine_12
fields_under_root
编辑如果将此选项设置为 true,则自定义 字段 将作为输出文档中的顶级字段存储,而不是分组在 fields
子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。
processors
编辑要应用于输入数据的处理器列表。
有关在配置中指定处理器的信息,请参阅处理器。
pipeline
编辑为此输入生成的事件设置的摄取管道 ID。
管道 ID 也可以在 Elasticsearch 输出中配置,但此选项通常会产生更简单的配置文件。如果在输入和输出中都配置了管道,则使用来自输入的选项。
keep_null
编辑如果将此选项设置为 true,则具有 null
值的字段将发布在输出文档中。默认情况下,keep_null
设置为 false
。
index
编辑如果存在,此格式化字符串将覆盖来自此输入的事件的索引(对于 Elasticsearch 输出),或者设置事件元数据的 raw_index
字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index
或处理器。
示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"
可能会扩展为 "filebeat-myindex-2019.11.01"
。
publisher_pipeline.disable_host
编辑默认情况下,所有事件都包含 host.name
。可以将此选项设置为 true
来禁用将此字段添加到所有事件。默认值为 false
。