配置 Kafka 输出
编辑配置 Kafka 输出编辑
Kafka 输出将事件发送到 Apache Kafka。
要使用此输出,请编辑 Auditbeat 配置文件,通过注释掉 Elasticsearch 输出以禁用它,并通过取消注释 Kafka 部分以启用 Kafka 输出。
对于 Kafka 0.10.0.0+ 版本,消息创建时间戳由 Beats 设置,并等于事件的初始时间戳。这会影响 Kafka 中的保留策略:例如,如果一个 Beat 事件创建于 2 周前,保留策略设置为 7 天,而来自 Beats 的消息今天才到达 Kafka,则该消息将立即被丢弃,因为时间戳值早于最近 7 天。可以通过在消息到达时设置时间戳来更改此行为,这样消息就不会被丢弃,而是再保留 7 天。为此,请在 Kafka 配置中将 log.message.timestamp.type
设置为 LogAppendTime
(默认为 CreateTime
)。
配置示例
output.kafka: # initial brokers for reading cluster metadata hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"] # message topic selection + partitioning topic: '%{[fields.log_topic]}' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
大于 max_message_bytes
的事件将被丢弃。为避免此问题,请确保 Auditbeat 生成的事件不超过 max_message_bytes
。
兼容性编辑
此输出可以连接到 Kafka 0.8.2.0 及更高版本。旧版本可能也可以工作,但不支持。
配置选项编辑
您可以在 auditbeat.yml
配置文件的 kafka
部分中指定以下选项
hosts
编辑
Kafka 代理地址列表,用于从中获取集群元数据。集群元数据包含发布事件的实际 Kafka 代理。
version
编辑
Auditbeat 在连接时将请求的 Kafka 协议版本。默认为 1.0.0。
有效值为 0.8.2.0
和 2.6.0
之间的所有 Kafka 版本。
协议版本控制 Auditbeat 可用的 Kafka 客户端功能;它不会阻止 Auditbeat 连接到比协议版本更新的 Kafka 版本。
有关支持版本的更多信息,请参阅兼容性。
username
编辑
连接到 Kafka 的用户名。如果配置了用户名,则还必须配置密码。
password
编辑
连接到 Kafka 的密码。
sasl.mechanism
编辑
连接到 Kafka 时使用的 SASL 机制。可以是以下之一
-
PLAIN
用于 SASL/PLAIN。 -
SCRAM-SHA-256
用于 SCRAM-SHA-256。 -
SCRAM-SHA-512
用于 SCRAM-SHA-512。
如果未设置 sasl.mechanism
,则在提供 username
和 password
时将使用 PLAIN
。否则,将禁用 SASL 身份验证。
要使用 GSSAPI
机制通过 Kerberos 进行身份验证,您必须将此字段留空,并使用 kerberos
选项。
topic
编辑
用于生成事件的 Kafka 主题。
您可以使用格式字符串动态设置主题,以访问任何事件字段。例如,此配置使用自定义字段 fields.log_topic
为每个事件设置主题
topic: '%{[fields.log_topic]}'
要了解如何向事件添加自定义字段,请参阅 fields
选项。
有关动态设置主题的其他方法,请参阅 topics
设置。
topics
编辑
主题选择器规则数组。每个规则都指定用于匹配规则的事件的 topic
。在发布期间,Auditbeat 会根据数组中第一个匹配的规则为每个事件设置 topic
。规则可以包含条件、基于格式字符串的字段和名称映射。如果缺少 topics
设置或没有规则匹配,则使用 topic
字段。
规则设置
-
topic
- 要使用的主题格式字符串。如果此字符串包含字段引用(例如
%{[fields.name]}
),则这些字段必须存在,否则规则将失败。 -
mappings
- 一个字典,它接受
topic
返回的值并将其映射到新名称。 -
default
- 如果
mappings
未找到匹配项,则使用的默认字符串值。 -
when
- 为了执行当前规则必须满足的条件。处理器支持的所有 条件 在此处也受支持。
以下示例根据消息字段是否包含指定的字符串来设置主题
output.kafka: hosts: ["localhost:9092"] topic: "logs-%{[agent.version]}" topics: - topic: "critical-%{[agent.version]}" when.contains: message: "CRITICAL" - topic: "error-%{[agent.version]}" when.contains: message: "ERR"
此配置将生成名为 critical-8.14.3
、error-8.14.3
和 logs-8.14.3
的主题。
partition
编辑
Kafka 输出代理事件分区策略。必须是 random
、round_robin
或 hash
之一。默认情况下,使用 hash
分区器。
random.group_events
:设置要发布到同一分区的事件数,然后分区器将随机选择一个新分区。默认值为 1,表示在每个事件后随机选择一个新分区。
round_robin.group_events
:设置要发布到同一分区的事件数,然后分区器将选择下一个分区。默认值为 1,表示在每个事件后将选择下一个分区。
hash.hash
:用于计算分区哈希值的字段列表。如果未配置字段,则将使用事件的 key
值。
hash.random
:如果无法计算哈希值或键值,则随机分配事件。
默认情况下,所有分区器都将尝试将事件发布到所有分区。如果分区的领导者对 Beat 变得不可达,则输出可能会阻塞。所有分区器都支持设置 reachable_only
来覆盖此行为。如果将 reachable_only
设置为 true
,则事件将仅发布到可用的分区。
发布到可用分区子集可能会增加资源使用量,因为事件可能会变得分布不均。
headers
编辑
标头是一个键值对,并且可以使用相同的 key
包含多个标头。仅支持字符串值。这些标头将包含在每个生成的 Kafka 消息中。
output.kafka: hosts: ["localhost:9092"] topic: "logs-%{[agent.version]}" headers: - key: "some-key" value: "some value" - key: "another-key" value: "another value"
client_id
编辑
用于日志记录、调试和审计目的的可配置 ClientID。默认值为“beats”。
metadata
编辑
Kafka 元数据更新设置。元数据包含有关代理、主题、分区和用于发布的活动领导者的信息。
-
refresh_frequency
- 元数据刷新间隔。默认为 10 分钟。
-
full
- 获取元数据时使用的策略,当此选项为
true
时,客户端将维护所有可用主题的完整元数据集,如果此选项设置为false
,它将仅刷新已配置主题的元数据。默认值为 false。 -
retry.max
- 集群处于领导者选举期间时,元数据更新重试的总次数。默认值为 3。
-
retry.backoff
- 领导者选举期间重试之间的等待时间。默认值为 250 毫秒。
backoff.init
编辑
网络错误后尝试重新发布到 Kafka 之前的等待秒数。等待 backoff.init
秒后,Auditbeat 尝试重新发布。如果尝试失败,则退避计时器会呈指数级增长,直至达到 backoff.max
。成功发布后,退避计时器将重置。默认值为 1 秒。
backoff.max
编辑
网络错误后尝试重新发布到 Kafka 之前的最长等待秒数。默认值为 60 秒。
bulk_max_size
编辑
单个 Kafka 请求中批量处理的最大事件数。默认值为 2048。
bulk_flush_frequency
编辑
发送批量 Kafka 请求之前的等待时间。0 表示无延迟。默认值为 0。
timeout
编辑
等待 Kafka 代理响应的秒数,超时后将超时。默认值为 30(秒)。
broker_timeout
编辑
代理等待所需 ACK 数量的最长时间。默认值为 10 秒。
channel_buffer_size
编辑
每个 Kafka 代理在输出管道中缓冲的消息数。默认值为 256。
keep_alive
编辑
活动网络连接的保持活动时间。如果为 0 秒,则禁用保持活动状态。默认值为 0 秒。
compression
编辑
设置输出压缩编解码器。必须是 none
、snappy
、lz4
和 gzip
之一。默认值为 gzip
。
Azure Event Hub for Kafka 的已知问题
当目标为 Azure Event Hub for Kafka 时,请将 compression
设置为 none
,因为不支持提供的编解码器。
compression_level
编辑
设置 gzip 使用的压缩级别。将此值设置为 0 将禁用压缩。压缩级别必须在 1(速度最快)到 9(压缩率最高)的范围内。
提高压缩级别会减少网络使用量,但会增加 CPU 使用率。
默认值为 4。
max_message_bytes
编辑
JSON 编码消息允许的最大大小。更大的消息将被丢弃。默认值为 1000000(字节)。此值应等于或小于代理的 message.max.bytes
。
required_acks
编辑
代理所需的 ACK 可靠性级别。0=无响应,1=等待本地提交,-1=等待所有副本提交。默认值为 1。
注意:如果设置为 0,Kafka 不会返回任何 ACK。消息可能会在出错时静默丢失。
ssl
编辑
SSL 参数的配置选项,例如用于 Kafka 连接的根 CA。Kafka 主机密钥库应使用 -keyalg RSA
参数创建,以确保其使用 Filebeat 的 Kafka 库 支持的密码。有关更多信息,请参阅SSL。