Kafka 输入

编辑

使用 kafka 输入从 Kafka 集群中的主题读取数据。

要配置此输入,请指定集群中一个或多个用于引导连接的 hosts 列表,要跟踪的 topics 列表以及连接的 group_id

示例配置

filebeat.inputs:
- type: kafka
  hosts:
    - kafka-broker-1:9092
    - kafka-broker-2:9092
  topics: ["my-topic"]
  group_id: "filebeat"

以下示例演示如何使用 kafka 输入从启用了 Kafka 兼容性的 Microsoft Azure 事件中心提取数据。

filebeat.inputs:
- type: kafka
  hosts: ["<your event hub namespace>.servicebus.windows.net:9093"]
  topics: ["<your event hub instance>"]
  group_id: "<your consumer group>"

  username: "$ConnectionString"
  password: "<your connection string>"
  ssl.enabled: true

有关 Kafka 和事件中心配置参数之间映射的更多详细信息,请参阅 Azure 文档

兼容性

编辑

此输入适用于 0.11 到 2.8.0 之间的所有 Kafka 版本。较旧的版本也可能有效,但不支持。

配置选项

编辑

kafka 输入支持以下配置选项以及稍后描述的 常用选项

如果您使用的是带有 Kafka 输入的 Elastic Agent 并需要提高吞吐量,我们建议通过额外的 Elastic Agent 水平扩展来读取 Kafka 主题。请注意,每个 Elastic Agent 会并发地从其已分配的每个分区读取数据。

hosts
编辑

此集群的 Kafka 引导主机(代理)列表。

topics
编辑

要从中读取的主题列表。

group_id
编辑

Kafka 消费者组 ID。

client_id
编辑

Kafka 客户端 ID(可选)。

version
编辑

要使用的 Kafka 协议版本(默认为 "1.0.0")。

initial_offset
编辑

开始读取的初始偏移量,可以是 "oldest" 或 "newest"。默认为 "oldest"。

connect_backoff

编辑

致命错误后等待多长时间才能尝试重新连接到 Kafka 集群。默认为 30 秒。

consume_backoff

编辑

在重试失败的读取操作之前等待多长时间。默认为 2 秒。

max_wait_time

编辑

读取时等待最小输入字节数的时间。默认为 250 毫秒。

wait_close

编辑

关闭时,等待多长时间才能传递和确认进行中的消息。

isolation_level

编辑

此配置项配置 Kafka 组隔离级别。

  • "read_uncommitted" 返回消息通道中的所有消息。
  • "read_committed" 隐藏属于已中止事务的消息。

默认为 "read_uncommitted"

fetch

编辑

Kafka 获取设置

min
要等待的最小字节数。默认为 1。
default
每个请求读取的默认字节数。默认为 1MB。
max
每个请求读取的最大字节数。默认为 0(无限制)。

expand_event_list_from_field

编辑

如果使用此输入的文件集预期接收捆绑在特定字段下的多条消息,则可以将配置选项 expand_event_list_from_field 值分配给字段名称。例如,对于 Azure 文件集,事件位于 json 对象 "records" 下。

{
"records": [ {event1}, {event2}]
}

此设置能够将组值 (records) 下的消息拆分成单独的事件。

rebalance

编辑

Kafka 重新平衡设置

strategy
可以是 "range""roundrobin"。默认为 "range"
timeout
等待尝试重新平衡的时间。默认为 60 秒。
max_retries
如果重新平衡失败,要重试的次数。默认为 4。
retry_backoff
不成功重新平衡尝试后等待多长时间。默认为 2 秒。

sasl.mechanism

编辑

连接到 Kafka 时要使用的 SASL 机制。它可以是:

  • PLAIN 用于 SASL/PLAIN。
  • SCRAM-SHA-256 用于 SCRAM-SHA-256。
  • SCRAM-SHA-512 用于 SCRAM-SHA-512。

如果未设置 sasl.mechanism,则如果提供了 usernamepassword,则使用 PLAIN。否则,SASL 身份验证将被禁用。

要使用 GSSAPI 机制通过 Kerberos 进行身份验证,您必须将此字段留空,并使用 kerberos 选项。

kerberos

编辑

此功能处于测试阶段,可能随时更改。其设计和代码不如正式 GA 功能成熟,按原样提供,不提供任何担保。测试版功能不受正式 GA 功能支持服务水平协议的约束。

Kerberos 身份验证的配置选项。

有关更多信息,请参阅 Kerberos

parsers
编辑

此选项需要一个解析器列表,有效负载必须经过这些解析器。

可用的解析器

  • ndjson
  • multiline
ndjson
编辑

这些选项使 Filebeat 能够将有效负载解码为 JSON 消息。

示例配置

- ndjson:
  target: ""
  add_error_key: true
  message_key: log
target
应包含已解析键值对的新 JSON 对象的名称。如果留空,新键将位于根目录下。
overwrite_keys
来自已解码 JSON 对象的值会覆盖 Filebeat 通常在发生冲突时添加的字段(类型、源、偏移量等)。如果要保留先前添加的值,请禁用它。
expand_keys
如果启用此设置,Filebeat 将递归地取消已解码 JSON 中的点分隔键,并将它们扩展为分层对象结构。例如,{"a.b.c": 123} 将扩展为 {"a":{"b":{"c":123}}}。当输入由 ECS 记录器 生成时,应启用此设置。
add_error_key
如果启用此设置,Filebeat 会在 JSON 反序列化错误或在配置中定义了 message_key 但无法使用时添加 "error.message" 和 "error.type: json" 键。
message_key
一个可选的配置设置,用于指定要应用行筛选和多行设置的 JSON 键。如果指定,则该键必须位于 JSON 对象的顶层,并且与该键关联的值必须是字符串,否则不会进行任何筛选或多行聚合。
document_id
指定用于设置文档 ID 的 JSON 键的可选配置设置。如果已配置,则该字段将从原始 JSON 文档中删除并存储在 @metadata._id 中。
ignore_decoding_error
一个可选的配置设置,用于指定是否应记录 JSON 解码错误。如果设置为 true,则不会记录错误。默认为 false。
multiline
编辑

控制 Filebeat 如何处理跨越多行的日志消息的选项。有关配置多行选项的更多信息,请参阅 多行消息

常用选项

编辑

所有输入都支持以下配置选项。

enabled
编辑

使用 enabled 选项启用和禁用输入。默认情况下,enabled 设置为 true。

tags
编辑

Filebeat 将包含在每个发布事件的 tags 字段中的标签列表。标签使您可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件筛选。这些标签将附加到在常规配置中指定的标签列表中。

示例

filebeat.inputs:
- type: kafka
  . . .
  tags: ["json"]
fields
编辑

您可以指定的可选字段,以向输出添加其他信息。例如,您可以添加可用于筛选日志数据的字段。字段可以是标量值、数组、字典或这些值的任何嵌套组合。默认情况下,您在此处指定的字段将在输出文档中的 fields 子字典下分组。要将自定义字段存储为顶级字段,请将 fields_under_root 选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。

filebeat.inputs:
- type: kafka
  . . .
  fields:
    app_id: query_engine_12
fields_under_root
编辑

如果将此选项设置为 true,则自定义 fields 将作为输出文档中的顶级字段存储,而不是在 fields 子字典下分组。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段会覆盖其他字段。

processors
编辑

要应用于输入数据的处理器列表。

有关在配置中指定处理器的信息,请参阅 处理器

pipeline
编辑

为此输入生成的事件设置的摄取管道 ID。

还可以在 Elasticsearch 输出中配置管道 ID,但这通常会导致更简单的配置文件。如果在输入和输出中都配置了管道,则使用输入中的选项。

keep_null
编辑

如果将此选项设置为 true,则输出文档中将发布值为 null 的字段。默认情况下,keep_null 设置为 false

index
编辑

如果存在,此格式化字符串将覆盖此输入事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能扩展为 "filebeat-myindex-2019.11.01"

publisher_pipeline.disable_host
编辑

默认情况下,所有事件都包含 host.name。可以将此选项设置为 true 以禁用将此字段添加到所有事件。默认值为 false