Kafka 输入
编辑Kafka 输入
编辑使用 kafka
输入从 Kafka 集群中的主题读取数据。
要配置此输入,请指定集群中一个或多个用于引导连接的 hosts
列表,要跟踪的 topics
列表以及连接的 group_id
。
示例配置
filebeat.inputs: - type: kafka hosts: - kafka-broker-1:9092 - kafka-broker-2:9092 topics: ["my-topic"] group_id: "filebeat"
以下示例演示如何使用 kafka
输入从启用了 Kafka 兼容性的 Microsoft Azure 事件中心提取数据。
filebeat.inputs: - type: kafka hosts: ["<your event hub namespace>.servicebus.windows.net:9093"] topics: ["<your event hub instance>"] group_id: "<your consumer group>" username: "$ConnectionString" password: "<your connection string>" ssl.enabled: true
有关 Kafka 和事件中心配置参数之间映射的更多详细信息,请参阅 Azure 文档。
兼容性
编辑此输入适用于 0.11 到 2.8.0 之间的所有 Kafka 版本。较旧的版本也可能有效,但不支持。
配置选项
编辑kafka
输入支持以下配置选项以及稍后描述的 常用选项。
如果您使用的是带有 Kafka 输入的 Elastic Agent 并需要提高吞吐量,我们建议通过额外的 Elastic Agent 水平扩展来读取 Kafka 主题。请注意,每个 Elastic Agent 会并发地从其已分配的每个分区读取数据。
hosts
编辑此集群的 Kafka 引导主机(代理)列表。
topics
编辑要从中读取的主题列表。
group_id
编辑Kafka 消费者组 ID。
client_id
编辑Kafka 客户端 ID(可选)。
version
编辑要使用的 Kafka 协议版本(默认为 "1.0.0"
)。
initial_offset
编辑开始读取的初始偏移量,可以是 "oldest" 或 "newest"。默认为 "oldest"。
connect_backoff
编辑致命错误后等待多长时间才能尝试重新连接到 Kafka 集群。默认为 30 秒。
consume_backoff
编辑在重试失败的读取操作之前等待多长时间。默认为 2 秒。
max_wait_time
编辑读取时等待最小输入字节数的时间。默认为 250 毫秒。
wait_close
编辑关闭时,等待多长时间才能传递和确认进行中的消息。
isolation_level
编辑此配置项配置 Kafka 组隔离级别。
-
"read_uncommitted"
返回消息通道中的所有消息。 -
"read_committed"
隐藏属于已中止事务的消息。
默认为 "read_uncommitted"
。
expand_event_list_from_field
编辑如果使用此输入的文件集预期接收捆绑在特定字段下的多条消息,则可以将配置选项 expand_event_list_from_field
值分配给字段名称。例如,对于 Azure 文件集,事件位于 json 对象 "records" 下。
{ "records": [ {event1}, {event2}] }
此设置能够将组值 (records) 下的消息拆分成单独的事件。
rebalance
编辑Kafka 重新平衡设置
-
strategy
- 可以是
"range"
或"roundrobin"
。默认为"range"
。 -
timeout
- 等待尝试重新平衡的时间。默认为 60 秒。
-
max_retries
- 如果重新平衡失败,要重试的次数。默认为 4。
-
retry_backoff
- 不成功重新平衡尝试后等待多长时间。默认为 2 秒。
sasl.mechanism
编辑连接到 Kafka 时要使用的 SASL 机制。它可以是:
-
PLAIN
用于 SASL/PLAIN。 -
SCRAM-SHA-256
用于 SCRAM-SHA-256。 -
SCRAM-SHA-512
用于 SCRAM-SHA-512。
如果未设置 sasl.mechanism
,则如果提供了 username
和 password
,则使用 PLAIN
。否则,SASL 身份验证将被禁用。
要使用 GSSAPI
机制通过 Kerberos 进行身份验证,您必须将此字段留空,并使用 kerberos
选项。
kerberos
编辑此功能处于测试阶段,可能随时更改。其设计和代码不如正式 GA 功能成熟,按原样提供,不提供任何担保。测试版功能不受正式 GA 功能支持服务水平协议的约束。
Kerberos 身份验证的配置选项。
有关更多信息,请参阅 Kerberos。
parsers
编辑此选项需要一个解析器列表,有效负载必须经过这些解析器。
可用的解析器
-
ndjson
-
multiline
ndjson
编辑这些选项使 Filebeat 能够将有效负载解码为 JSON 消息。
示例配置
- ndjson: target: "" add_error_key: true message_key: log
-
target
- 应包含已解析键值对的新 JSON 对象的名称。如果留空,新键将位于根目录下。
-
overwrite_keys
- 来自已解码 JSON 对象的值会覆盖 Filebeat 通常在发生冲突时添加的字段(类型、源、偏移量等)。如果要保留先前添加的值,请禁用它。
-
expand_keys
- 如果启用此设置,Filebeat 将递归地取消已解码 JSON 中的点分隔键,并将它们扩展为分层对象结构。例如,
{"a.b.c": 123}
将扩展为{"a":{"b":{"c":123}}}
。当输入由 ECS 记录器 生成时,应启用此设置。 -
add_error_key
- 如果启用此设置,Filebeat 会在 JSON 反序列化错误或在配置中定义了
message_key
但无法使用时添加 "error.message" 和 "error.type: json" 键。 -
message_key
- 一个可选的配置设置,用于指定要应用行筛选和多行设置的 JSON 键。如果指定,则该键必须位于 JSON 对象的顶层,并且与该键关联的值必须是字符串,否则不会进行任何筛选或多行聚合。
-
document_id
- 指定用于设置文档 ID 的 JSON 键的可选配置设置。如果已配置,则该字段将从原始 JSON 文档中删除并存储在
@metadata._id
中。 -
ignore_decoding_error
- 一个可选的配置设置,用于指定是否应记录 JSON 解码错误。如果设置为 true,则不会记录错误。默认为 false。
multiline
编辑控制 Filebeat 如何处理跨越多行的日志消息的选项。有关配置多行选项的更多信息,请参阅 多行消息。
常用选项
编辑所有输入都支持以下配置选项。
enabled
编辑使用 enabled
选项启用和禁用输入。默认情况下,enabled 设置为 true。
tags
编辑Filebeat 将包含在每个发布事件的 tags
字段中的标签列表。标签使您可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件筛选。这些标签将附加到在常规配置中指定的标签列表中。
示例
filebeat.inputs: - type: kafka . . . tags: ["json"]
fields
编辑您可以指定的可选字段,以向输出添加其他信息。例如,您可以添加可用于筛选日志数据的字段。字段可以是标量值、数组、字典或这些值的任何嵌套组合。默认情况下,您在此处指定的字段将在输出文档中的 fields
子字典下分组。要将自定义字段存储为顶级字段,请将 fields_under_root
选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。
filebeat.inputs: - type: kafka . . . fields: app_id: query_engine_12
fields_under_root
编辑如果将此选项设置为 true,则自定义 fields 将作为输出文档中的顶级字段存储,而不是在 fields
子字典下分组。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段会覆盖其他字段。
processors
编辑要应用于输入数据的处理器列表。
有关在配置中指定处理器的信息,请参阅 处理器。
pipeline
编辑为此输入生成的事件设置的摄取管道 ID。
还可以在 Elasticsearch 输出中配置管道 ID,但这通常会导致更简单的配置文件。如果在输入和输出中都配置了管道,则使用输入中的选项。
keep_null
编辑如果将此选项设置为 true,则输出文档中将发布值为 null
的字段。默认情况下,keep_null
设置为 false
。
index
编辑如果存在,此格式化字符串将覆盖此输入事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index
字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index
或处理器。
示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"
可能扩展为 "filebeat-myindex-2019.11.01"
。
publisher_pipeline.disable_host
编辑默认情况下,所有事件都包含 host.name
。可以将此选项设置为 true
以禁用将此字段添加到所有事件。默认值为 false
。