配置 Kafka 输出
编辑配置 Kafka 输出
编辑Kafka 输出将事件发送到 Apache Kafka。
要使用此输出,请编辑 Auditbeat 配置文件,通过注释掉 Elasticsearch 输出禁用它,并通过取消注释 Kafka 部分启用 Kafka 输出。
对于 Kafka 版本 0.10.0.0+,消息创建时间戳由 Beats 设置,并等于事件的初始时间戳。 这会影响 Kafka 中的保留策略:例如,如果一个 Beats 事件在 2 周前创建,保留策略设置为 7 天,并且来自 Beats 的消息今天到达 Kafka,则该消息会立即被丢弃,因为时间戳值早于过去 7 天。 可以通过在消息到达时设置时间戳来更改此行为,因此不会丢弃消息,而是保留 7 天。 为此,请在 Kafka 配置中将 log.message.timestamp.type
设置为 LogAppendTime
(默认为 CreateTime
)。
配置示例
output.kafka: # initial brokers for reading cluster metadata hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"] # message topic selection + partitioning topic: '%{[fields.log_topic]}' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
大于 max_message_bytes
的事件将被丢弃。为了避免这个问题,请确保 Auditbeat 不会生成大于 max_message_bytes
的事件。
兼容性
编辑此输出可以连接到 Kafka 版本 0.8.2.0 及更高版本。 较旧的版本也可能有效,但不被支持。
配置选项
编辑您可以在 auditbeat.yml
配置文件的 kafka
部分中指定以下选项
hosts
编辑从中获取集群元数据的 Kafka 代理地址列表。 集群元数据包含事件发布到的实际 Kafka 代理。
version
编辑Auditbeat 连接时将请求的 Kafka 协议版本。 默认为 1.0.0。
有效值是 0.8.2.0
和 2.6.0
之间的所有 Kafka 版本。
协议版本控制 Auditbeat 可用的 Kafka 客户端功能;它不会阻止 Auditbeat 连接到比协议版本新的 Kafka 版本。
有关支持的版本的信息,请参阅 兼容性。
username
编辑用于连接 Kafka 的用户名。 如果配置了用户名,则还必须配置密码。
password
编辑用于连接 Kafka 的密码。
sasl.mechanism
编辑连接到 Kafka 时使用的 SASL 机制。 它可以是以下之一
-
PLAIN
用于 SASL/PLAIN。 -
SCRAM-SHA-256
用于 SCRAM-SHA-256。 -
SCRAM-SHA-512
用于 SCRAM-SHA-512。
如果未设置 sasl.mechanism
,如果提供了 username
和 password
,则使用 PLAIN
。 否则,禁用 SASL 身份验证。
要使用 GSSAPI
机制通过 Kerberos 进行身份验证,您必须将此字段留空,并使用 kerberos
选项。
topic
编辑用于生成事件的 Kafka 主题。
您可以使用格式字符串来访问任何事件字段,从而动态设置主题。 例如,此配置使用自定义字段 fields.log_topic
来设置每个事件的主题
topic: '%{[fields.log_topic]}'
要了解如何向事件添加自定义字段,请参阅 fields
选项。
有关动态设置主题的其他方法,请参阅 topics
设置。
topics
编辑主题选择器规则数组。 每个规则都指定用于匹配规则的事件的 topic
。 在发布期间,Auditbeat 根据数组中第一个匹配的规则为每个事件设置 topic
。 规则可以包含条件、基于格式字符串的字段和名称映射。 如果缺少 topics
设置或没有规则匹配,则使用 topic
字段。
规则设置
-
topic
- 要使用的主题格式字符串。 如果此字符串包含字段引用,例如
%{[fields.name]}
,则这些字段必须存在,否则规则将失败。 -
mappings
- 一个字典,它获取
topic
返回的值并将其映射到新名称。 -
default
- 如果
mappings
未找到匹配项,则要使用的默认字符串值。 -
when
- 为了执行当前规则必须成功的条件。 处理器支持的所有 条件 也在此处支持。
以下示例根据消息字段是否包含指定字符串来设置主题
output.kafka: hosts: ["localhost:9092"] topic: "logs-%{[agent.version]}" topics: - topic: "critical-%{[agent.version]}" when.contains: message: "CRITICAL" - topic: "error-%{[agent.version]}" when.contains: message: "ERR"
此配置将生成名为 critical-8.17.0
、error-8.17.0
和 logs-8.17.0
的主题。
key
编辑可选的格式化字符串,用于指定 Kafka 事件键。 如果配置了,则可以使用格式字符串从事件中提取事件键。
有关特定键选择的含义,请参阅 Kafka 文档;默认情况下,键由 Kafka 集群选择。
partition
编辑Kafka 输出代理事件分区策略。 必须是 random
、round_robin
或 hash
之一。 默认情况下,使用 hash
分区器。
random.group_events
:设置在分区器随机选择新分区之前,要发布到同一分区的事件数。 默认值为 1,表示在每个事件之后随机选择一个新分区。
round_robin.group_events
:设置在分区器选择下一个分区之前,要发布到同一分区的事件数。 默认值为 1,表示在每个事件之后将选择下一个分区。
hash.hash
:用于计算分区哈希值的字段列表。 如果未配置任何字段,则将使用事件 key
值。
hash.random
:如果无法计算哈希值或键值,则随机分发事件。
默认情况下,所有分区器都会尝试将事件发布到所有分区。 如果分区的领导者对于 Beats 不可达,则输出可能会阻塞。 所有分区器都支持设置 reachable_only
以覆盖此行为。 如果 reachable_only
设置为 true
,则事件仅发布到可用分区。
发布到可用分区的子集可能会增加资源使用量,因为事件可能会分布不均。
headers
编辑标头是键值对,并且可以在同一个 key
中包含多个标头。 仅支持字符串值。 这些标头将包含在每个生成的 Kafka 消息中。
output.kafka: hosts: ["localhost:9092"] topic: "logs-%{[agent.version]}" headers: - key: "some-key" value: "some value" - key: "another-key" value: "another value"
client_id
编辑用于日志记录、调试和审核的可配置 ClientID。 默认为“beats”。
metadata
编辑Kafka 元数据更新设置。 元数据包含有关用于发布的代理、主题、分区和活动领导者的信息。
-
refresh_frequency
- 元数据刷新间隔。 默认为 10 分钟。
-
full
- 在获取元数据时使用的策略,当此选项为
true
时,客户端将维护所有可用主题的完整元数据集,如果此选项设置为false
,则仅刷新已配置主题的元数据。 默认值为 false。 -
retry.max
- 当集群处于领导者选举中间时,元数据更新重试的总次数。 默认为 3。
-
retry.backoff
- 领导者选举期间重试之间的等待时间。 默认为 250 毫秒。
backoff.init
编辑在发生网络错误后,尝试重新发布到 Kafka 之前等待的秒数。 等待 backoff.init
秒后,Auditbeat 会尝试重新发布。 如果尝试失败,则回退计时器将以指数方式增加,直到达到 backoff.max
。 成功发布后,回退计时器将重置。 默认为 1 秒。
backoff.max
编辑在发生网络错误后,尝试重新发布到 Kafka 之前等待的最大秒数。 默认为 60 秒。
bulk_max_size
编辑在单个 Kafka 请求中批量处理的最大事件数。 默认值为 2048。
bulk_flush_frequency
编辑发送批量 Kafka 请求之前等待的时间。 0 表示没有延迟。 默认为 0。
timeout
编辑在超时之前等待 Kafka 代理响应的秒数。 默认为 30(秒)。
broker_timeout
编辑代理等待所需 ACK 数的最大持续时间。 默认为 10 秒。
channel_buffer_size
编辑每个 Kafka 代理在输出管道中缓冲的消息数。 默认值为 256。
keep_alive
编辑活动网络连接的保持活动周期。 如果为 0 秒,则禁用保持活动。 默认值为 0 秒。
compression
编辑设置输出压缩编解码器。 必须是 none
、snappy
、lz4
、gzip
和 zstd
之一。 默认为 gzip
。
Azure 事件中心 for Kafka 的已知问题
当以 Azure Event Hub for Kafka 为目标时,请将 compression
设置为 none
,因为提供的编解码器不受支持。
compression_level
编辑设置 gzip 使用的压缩级别。将此值设置为 0 将禁用压缩。压缩级别必须在 1(最快速度)到 9(最佳压缩)的范围内。
增加压缩级别会减少网络使用量,但会增加 CPU 使用量。
默认值为 4。
max_message_bytes
编辑JSON 编码消息允许的最大大小。更大的消息将被丢弃。默认值为 1000000(字节)。此值应等于或小于 broker 的 message.max.bytes
。
required_acks
编辑broker 所需的 ACK 可靠性级别。0=无响应,1=等待本地提交,-1=等待所有副本提交。默认值为 1。
注意:如果设置为 0,Kafka 不会返回任何 ACK。错误时消息可能会静默丢失。
ssl
编辑SSL 参数的配置选项,如 Kafka 连接的根 CA。Kafka 主机密钥库应使用 -keyalg RSA
参数创建,以确保它使用 Filebeat 的 Kafka 库支持的密码。有关更多信息,请参阅 SSL。