Kafka 输入

编辑

使用 kafka 输入从 Kafka 集群中的主题读取数据。

要配置此输入,请指定集群中一个或多个 hosts 列表以引导连接、要跟踪的 topics 列表,以及连接的 group_id

配置示例

filebeat.inputs:
- type: kafka
  hosts:
    - kafka-broker-1:9092
    - kafka-broker-2:9092
  topics: ["my-topic"]
  group_id: "filebeat"

以下示例展示了如何使用 kafka 输入来摄取来自已启用 Kafka 兼容性的 Microsoft Azure 事件中心的数据

filebeat.inputs:
- type: kafka
  hosts: ["<your event hub namespace>.servicebus.windows.net:9093"]
  topics: ["<your event hub instance>"]
  group_id: "<your consumer group>"

  username: "$ConnectionString"
  password: "<your connection string>"
  ssl.enabled: true

有关 Kafka 和事件中心配置参数之间映射的更多详细信息,请参阅 Azure 文档

兼容性

编辑

此输入适用于 0.11 到 2.8.0 之间的所有 Kafka 版本。较旧的版本可能也适用,但不受支持。

配置选项

编辑

kafka 输入支持以下配置选项以及稍后描述的 通用选项

如果您正在将 Elastic Agent 与 Kafka 输入一起使用,并且需要提高吞吐量,我们建议通过额外的 Elastic Agent 水平扩展以从 Kafka 主题读取数据。请注意,每个 Elastic Agent 从其已分配的每个分区并发读取数据。

hosts
编辑

此集群的 Kafka 引导主机(代理)列表。

topics
编辑

要从中读取的主题列表。

group_id
编辑

Kafka 消费者组 ID。

client_id
编辑

Kafka 客户端 ID(可选)。

version
编辑

要使用的 Kafka 协议的版本(默认为 "1.0.0")。

initial_offset
编辑

开始读取的初始偏移量,可以是 “oldest” 或 “newest”。默认为 “oldest”。

connect_backoff

编辑

在发生致命错误后尝试重新连接到 Kafka 集群之前等待的时间。默认为 30 秒。

consume_backoff

编辑

在重试失败的读取之前等待的时间。默认为 2 秒。

max_wait_time

编辑

在读取时等待最少输入字节数的时间。默认为 250 毫秒。

wait_close

编辑

关闭时,等待传递和确认正在传输中的消息的时间。

isolation_level

编辑

这将配置 Kafka 组隔离级别

  • "read_uncommitted" 返回消息通道中的所有消息。
  • "read_committed" 隐藏属于已中止事务的消息。

默认值为 "read_uncommitted"

fetch

编辑

Kafka 获取设置

min
要等待的最小字节数。默认为 1。
default
每个请求读取的默认字节数。默认为 1MB。
max
每个请求读取的最大字节数。默认为 0(无限制)。

expand_event_list_from_field

编辑

如果使用此输入的文件集希望在特定字段下接收多个捆绑的消息,则可以将配置选项 expand_event_list_from_field 值分配给该字段的名称。例如,在 Azure 文件集的情况下,事件在 json 对象 “records” 下找到。

{
"records": [ {event1}, {event2}]
}

此设置可以将组值(records)下的消息拆分为单独的事件。

rebalance

编辑

Kafka 重新平衡设置

strategy
可以是 "range""roundrobin"。默认为 "range"
timeout
尝试重新平衡的等待时间。默认为 60 秒。
max_retries
如果重新平衡失败,则重试的次数。默认为 4。
retry_backoff
在不成功的重新平衡尝试后等待的时间。默认为 2 秒。

sasl.mechanism

编辑

连接到 Kafka 时要使用的 SASL 机制。它可以是以下之一

  • PLAIN 用于 SASL/PLAIN。
  • SCRAM-SHA-256 用于 SCRAM-SHA-256。
  • SCRAM-SHA-512 用于 SCRAM-SHA-512。

如果未设置 sasl.mechanism,则如果提供了 usernamepassword,则使用 PLAIN。否则,禁用 SASL 身份验证。

要使用 GSSAPI 机制通过 Kerberos 进行身份验证,您必须将此字段留空,并使用 kerberos 选项。

kerberos

编辑

此功能处于测试阶段,可能会发生更改。设计和代码不如官方 GA 功能成熟,并且按原样提供,不提供任何保证。测试功能不受官方 GA 功能的支持 SLA 的约束。

Kerberos 身份验证的配置选项。

有关详细信息,请参阅Kerberos

parsers
编辑

此选项需要一个有效负载必须经过的解析器列表。

可用解析器

  • ndjson
  • multiline
ndjson
编辑

这些选项使 Filebeat 可以将有效负载解码为 JSON 消息。

配置示例

- ndjson:
  target: ""
  add_error_key: true
  message_key: log
target
应包含解析后的键值对的新 JSON 对象的名称。如果将其留空,则新键将放在根目录下。
overwrite_keys
如果发生冲突,解码后的 JSON 对象中的值会覆盖 Filebeat 通常添加的字段(类型、源、偏移量等)。如果要保留先前添加的值,请禁用它。
expand_keys
如果启用此设置,Filebeat 将递归地取消解码后的 JSON 中的点号键,并将它们扩展为分层对象结构。例如,{"a.b.c": 123} 将被扩展为 {"a":{"b":{"c":123}}}。当输入由 ECS 记录器 生成时,应启用此设置。
add_error_key
如果启用此设置,则 Filebeat 会在发生 JSON 解组错误或在配置中定义了 message_key 但无法使用时添加 “error.message” 和 “error.type: json” 键。
message_key
一个可选的配置设置,用于指定要应用行过滤和多行设置的 JSON 键。如果指定,则该键必须位于 JSON 对象的顶层,并且与该键关联的值必须是字符串,否则不会发生过滤或多行聚合。
document_id
可选的配置设置,用于指定要设置文档 ID 的 JSON 键。如果配置,该字段将从原始 JSON 文档中删除并存储在 @metadata._id
ignore_decoding_error
一个可选的配置设置,用于指定是否应记录 JSON 解码错误。如果设置为 true,则不会记录错误。默认值为 false。
multiline
编辑

控制 Filebeat 如何处理跨多行的日志消息的选项。有关配置多行选项的更多信息,请参阅多行消息

通用选项

编辑

所有输入都支持以下配置选项。

enabled
编辑

使用 enabled 选项启用和禁用输入。默认情况下,enabled 设置为 true。

tags
编辑

Filebeat 在每个已发布事件的 tags 字段中包含的标签列表。标签使您可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件过滤。这些标签将附加到常规配置中指定的标签列表。

示例

filebeat.inputs:
- type: kafka
  . . .
  tags: ["json"]
fields
编辑

您可以指定的可选字段,以向输出添加其他信息。例如,您可能添加可用于过滤日志数据的字段。字段可以是标量值、数组、字典或这些值的任何嵌套组合。默认情况下,您在此处指定的字段将分组在输出文档中的 fields 子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root 选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。

filebeat.inputs:
- type: kafka
  . . .
  fields:
    app_id: query_engine_12
fields_under_root
编辑

如果将此选项设置为 true,则自定义 字段 将作为输出文档中的顶级字段存储,而不是分组在 fields 子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段将覆盖其他字段。

processors
编辑

要应用于输入数据的处理器列表。

有关在配置中指定处理器的信息,请参阅处理器

pipeline
编辑

为此输入生成的事件设置的摄取管道 ID。

管道 ID 也可以在 Elasticsearch 输出中配置,但此选项通常会产生更简单的配置文件。如果在输入和输出中都配置了管道,则使用来自输入的选项。

keep_null
编辑

如果将此选项设置为 true,则具有 null 值的字段将发布在输出文档中。默认情况下,keep_null 设置为 false

index
编辑

如果存在,此格式化字符串将覆盖来自此输入的事件的索引(对于 Elasticsearch 输出),或者设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能会扩展为 "filebeat-myindex-2019.11.01"

publisher_pipeline.disable_host
编辑

默认情况下,所有事件都包含 host.name。可以将此选项设置为 true 来禁用将此字段添加到所有事件。默认值为 false