Kafka 输入编辑

使用 kafka 输入从 Kafka 集群中的主题读取数据。

要配置此输入,请在集群中指定一个或多个 hosts 列表以引导连接,一个要跟踪的 topics 列表,以及连接的 group_id

配置示例

filebeat.inputs:
- type: kafka
  hosts:
    - kafka-broker-1:9092
    - kafka-broker-2:9092
  topics: ["my-topic"]
  group_id: "filebeat"

以下示例显示了如何使用 kafka 输入从启用了 Kafka 兼容性的 Microsoft Azure 事件中心提取数据

filebeat.inputs:
- type: kafka
  hosts: ["<your event hub namespace>.servicebus.windows.net:9093"]
  topics: ["<your event hub instance>"]
  group_id: "<your consumer group>"

  username: "$ConnectionString"
  password: "<your connection string>"
  ssl.enabled: true

有关 Kafka 和事件中心配置参数之间映射的更多详细信息,请参阅Azure 文档

兼容性编辑

此输入适用于 0.11 到 2.8.0 之间的所有 Kafka 版本。旧版本也可能有效,但不支持。

配置选项编辑

kafka 输入支持以下配置选项以及稍后描述的通用选项

hosts编辑

此集群的 Kafka 引导主机(代理)列表。

topics编辑

要从中读取数据的主题列表。

group_id编辑

Kafka 消费者组 ID。

client_id编辑

Kafka 客户端 ID(可选)。

version编辑

要使用的 Kafka 协议版本(默认为 "1.0.0")。

initial_offset编辑

开始读取的初始偏移量,可以是“oldest”或“newest”。默认为“oldest”。

connect_backoff编辑

发生致命错误后,尝试重新连接到 kafka 集群之前的等待时间。默认为 30 秒。

consume_backoff编辑

重试失败读取之前的等待时间。默认为 2 秒。

max_wait_time编辑

读取时等待最小输入字节数的时间。默认为 250 毫秒。

wait_close编辑

关闭时,等待正在发送的消息被传递和确认的时间。

isolation_level编辑

这将配置 Kafka 组隔离级别

  • "read_uncommitted" 返回消息通道中的所有消息。
  • "read_committed" 隐藏属于中止事务的消息。

默认为 "read_uncommitted"

fetch编辑

Kafka 获取设置

min
要等待的最小字节数。默认为 1。
default
每个请求读取的默认字节数。默认为 1MB。
max
每个请求读取的最大字节数。默认为 0(无限制)。

expand_event_list_from_field编辑

如果使用此输入的文件集希望在特定字段下接收捆绑在一起的多条消息,则可以为 config 选项 expand_event_list_from_field 的值分配字段名称。例如,在 azure 文件集中,事件位于 json 对象“records”下。

{
"records": [ {event1}, {event2}]
}

此设置将能够将组值(*records*)下的消息拆分为单独的事件。

rebalance编辑

Kafka 重新平衡设置

strategy
可以是 "range""roundrobin"。默认为 "range"
timeout
尝试重新平衡的等待时间。默认为 60 秒。
max_retries
如果重新平衡失败,则重试次数。默认为 4。
retry_backoff
不成功的重新平衡尝试后的等待时间。默认为 2 秒。

sasl.mechanism编辑

连接到 Kafka 时使用的 SASL 机制。可以是以下之一

  • PLAIN 用于 SASL/PLAIN。
  • SCRAM-SHA-256 用于 SCRAM-SHA-256。
  • SCRAM-SHA-512 用于 SCRAM-SHA-512。

如果未设置 sasl.mechanism,则在提供 usernamepassword 时使用 PLAIN。否则,将禁用 SASL 身份验证。

要使用 GSSAPI 机制通过 Kerberos 进行身份验证,必须将此字段留空,并使用 kerberos 选项。

kerberos编辑

此功能处于测试阶段,可能会发生变化。其设计和代码不如正式的 GA 功能成熟,因此按原样提供,不作任何保证。测试功能不受正式 GA 功能的支持 SLA 的约束。

Kerberos 身份验证的配置选项。

有关更多信息,请参阅Kerberos

parsers编辑

此选项需要一个解析器列表,有效负载必须经过这些解析器。

可用的解析器

  • ndjson
  • multiline
ndjson编辑

这些选项使 Filebeat 可以将有效负载解码为 JSON 消息。

配置示例

- ndjson:
  target: ""
  add_error_key: true
  message_key: log
target
应包含已解析键值对的新 JSON 对象的名称。如果将其留空,则新键将位于根目录下。
overwrite_keys
如果发生冲突,则来自解码后的 JSON 对象的值将覆盖 Filebeat 通常添加的字段(类型、来源、偏移量等)。如果要保留以前添加的值,请禁用它。
expand_keys
如果启用此设置,Filebeat 将递归地对解码后的 JSON 中的键进行去点操作,并将它们扩展为分层对象结构。例如,{"a.b.c": 123} 将扩展为 {"a":{"b":{"c":123}}}。当输入由ECS 记录器生成时,应启用此设置。
add_error_key
如果启用此设置,则 Filebeat 会在 JSON 反编组错误或在配置中定义了 message_key 但无法使用时添加“error.message”和“error.type: json”键。
message_key
一个可选的配置设置,用于指定应用行过滤和多行设置的 JSON 键。如果指定,则该键必须位于 JSON 对象的顶层,并且与该键关联的值必须是字符串,否则不会进行任何过滤或多行聚合。
document_id
选项配置设置,用于指定用于设置文档 ID 的 JSON 键。如果配置了,该字段将从原始 JSON 文档中删除并存储在 @metadata._id
ignore_decoding_error
一个可选的配置设置,用于指定是否应记录 JSON 解码错误。如果设置为 true,则不会记录错误。默认为 false。
multiline编辑

控制 Filebeat 如何处理跨越多行的日志消息的选项。有关配置多行选项的更多信息,请参阅多行消息

通用选项编辑

所有输入都支持以下配置选项。

enabled编辑

使用 enabled 选项启用和禁用输入。默认情况下,enabled 设置为 true。

tags编辑

Filebeat 包含在每个已发布事件的 tags 字段中的标签列表。标签可以轻松地在 Kibana 中选择特定事件或在 Logstash 中应用条件过滤。这些标签将附加到常规配置中指定的标签列表中。

示例

filebeat.inputs:
- type: kafka
  . . .
  tags: ["json"]
fields编辑

您可以指定可选字段以向输出添加其他信息。例如,您可以添加可用于过滤日志数据的字段。字段可以是标量值、数组、字典或这些值的任何嵌套组合。默认情况下,您在此处指定的字段将分组在输出文档的 fields 子字典下。要将自定义字段存储为顶级字段,请将 fields_under_root 选项设置为 true。如果在常规配置中声明了重复字段,则其值将被此处声明的值覆盖。

filebeat.inputs:
- type: kafka
  . . .
  fields:
    app_id: query_engine_12
fields_under_root编辑

如果此选项设置为 true,则自定义 字段 将作为顶级字段存储在输出文档中,而不是分组在 fields 子字典下。如果自定义字段名称与 Filebeat 添加的其他字段名称冲突,则自定义字段会覆盖其他字段。

processors编辑

要应用于输入数据的处理器列表。

有关在配置中指定处理器的详细信息,请参阅处理器

pipeline编辑

要为此输入生成的事件设置的摄取管道 ID。

管道 ID 也可以在 Elasticsearch 输出中配置,但此选项通常会生成更简单的配置文件。如果在输入和输出中都配置了管道,则使用输入中的选项。

keep_null编辑

如果此选项设置为 true,则值为 null 的字段将在输出文档中发布。默认情况下,keep_null 设置为 false

index编辑

如果存在,此格式字符串将覆盖此输入事件的索引(对于 elasticsearch 输出),或设置事件元数据的 raw_index 字段(对于其他输出)。此字符串只能引用代理名称和版本以及事件时间戳;要访问动态字段,请使用 output.elasticsearch.index 或处理器。

示例值:"%{[agent.name]}-myindex-%{+yyyy.MM.dd}" 可能会扩展为 "filebeat-myindex-2019.11.01"

publisher_pipeline.disable_host编辑

默认情况下,所有事件都包含 host.name。可以将此选项设置为 true 以禁用将此字段添加到所有事件。默认值为 false