配置 Kafka 输出编辑

Kafka 输出将事件发送到 Apache Kafka。

要使用此输出,请编辑 Packetbeat 配置文件,通过注释掉 Elasticsearch 输出来禁用它,并通过取消注释 Kafka 部分来启用 Kafka 输出。

对于 Kafka 版本 0.10.0.0+,消息创建时间戳由 beats 设置,等于事件的初始时间戳。这会影响 Kafka 中的保留策略:例如,如果 beat 事件是在 2 周前创建的,保留策略设置为 7 天,并且来自 beats 的消息今天到达 Kafka,它将立即被丢弃,因为时间戳值在过去的 7 天之前。可以通过在消息到达时设置时间戳来更改此行为,这样消息就不会被丢弃,而是保留 7 天。为此,请在 Kafka 配置中将 log.message.timestamp.type 设置为 LogAppendTime(默认值为 CreateTime)。

示例配置

output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]

  # message topic selection + partitioning
  topic: '%{[fields.log_topic]}'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

大于 max_message_bytes 的事件将被丢弃。为了避免这个问题,请确保 Packetbeat 不会生成大于 max_message_bytes 的事件。

兼容性编辑

此输出可以连接到 Kafka 版本 0.8.2.0 及更高版本。较旧的版本可能也可以工作,但不支持。

配置选项编辑

您可以在 packetbeat.yml 配置文件中的 kafka 部分指定以下选项

enabled编辑

enabled 配置是一个布尔值设置,用于启用或禁用输出。如果设置为 false,则输出被禁用。

默认值为 true

hosts编辑

要从中获取集群元数据的 Kafka 代理地址列表。集群元数据包含事件发布到的实际 Kafka 代理。

version编辑

Packetbeat 在连接时将请求的 Kafka 协议版本。默认为 1.0.0。

有效值为 0.8.2.02.6.0 之间的所有 kafka 版本。

协议版本控制 Packetbeat 可用的 Kafka 客户端功能;它不会阻止 Packetbeat 连接到比协议版本更新的 Kafka 版本。

有关支持版本的更多信息,请参见 兼容性

username编辑

用于连接到 Kafka 的用户名。如果配置了用户名,则必须配置密码。

password编辑

用于连接到 Kafka 的密码。

sasl.mechanism编辑

连接到 Kafka 时要使用的 SASL 机制。它可以是以下之一

  • PLAIN 用于 SASL/PLAIN。
  • SCRAM-SHA-256 用于 SCRAM-SHA-256。
  • SCRAM-SHA-512 用于 SCRAM-SHA-512。

如果未设置 sasl.mechanism,则如果提供了 usernamepassword,则使用 PLAIN。否则,SASL 身份验证将被禁用。

要使用 GSSAPI 机制使用 Kerberos 进行身份验证,您必须将此字段留空,并使用 kerberos 选项。

topic编辑

用于生成事件的 Kafka 主题。

您可以通过使用格式字符串访问任何事件字段来动态设置主题。例如,此配置使用自定义字段 fields.log_topic 为每个事件设置主题

topic: '%{[fields.log_topic]}'

要了解如何向事件添加自定义字段,请参见 fields 选项。

有关其他动态设置主题的方法,请参见 topics 设置。

topics编辑

主题选择器规则数组。每个规则指定用于与规则匹配的事件的 topic。在发布期间,Packetbeat 基于数组中的第一个匹配规则为每个事件设置 topic。规则可以包含条件、基于格式字符串的字段和名称映射。如果 topics 设置不存在或没有规则匹配,则使用 topic 字段。

规则设置

topic
要使用的主题格式字符串。如果此字符串包含字段引用,例如 %{[fields.name]},则字段必须存在,否则规则将失败。
mappings
一个字典,它接受由 topic 返回的值并将其映射到一个新名称。
default
如果 mappings 未找到匹配项,则要使用的默认字符串值。
when
必须成功的条件才能执行当前规则。处理器支持的所有 条件 也在此处支持。

以下示例基于消息字段是否包含指定的字符串来设置主题

output.kafka:
  hosts: ["localhost:9092"]
  topic: "logs-%{[agent.version]}"
  topics:
    - topic: "critical-%{[agent.version]}"
      when.contains:
        message: "CRITICAL"
    - topic: "error-%{[agent.version]}"
      when.contains:
        message: "ERR"

此配置将导致名为 critical-8.14.3error-8.14.3logs-8.14.3 的主题。

key编辑

可选格式化字符串,指定 Kafka 事件键。如果配置了,则可以使用格式字符串从事件中提取事件键。

有关特定键选择的影响,请参见 Kafka 文档;默认情况下,键由 Kafka 集群选择。

partition编辑

Kafka 输出代理事件分区策略。必须是 randomround_robinhash 之一。默认情况下,使用 hash 分区器。

random.group_events: 设置要发布到同一分区的事件数量,然后分区器通过随机方式选择一个新分区。默认值为 1,这意味着在每个事件之后,都会随机选择一个新分区。

round_robin.group_events: 设置要发布到同一分区的事件数量,然后分区器选择下一个分区。默认值为 1,这意味着在每个事件之后,将选择下一个分区。

hash.hash: 用于从事件中计算分区哈希值的字段列表。如果未配置任何字段,则将使用事件的 key 值。

hash.random: 如果无法计算哈希值或键值,则随机分配事件。

默认情况下,所有分区器都会尝试将事件发布到所有分区。如果分区的领导者对于 beat 变得不可访问,输出可能会阻塞。所有分区器都支持设置 reachable_only 来覆盖此行为。如果将 reachable_only 设置为 true,事件将仅发布到可用的分区。

发布到可用分区的子集可能会增加资源使用量,因为事件可能会变得不均匀地分布。

headers编辑

标头是一个键值对,并且可以使用相同的 key 包含多个标头。仅支持字符串值。这些标头将包含在每个生成的 Kafka 消息中。

output.kafka:
  hosts: ["localhost:9092"]
  topic: "logs-%{[agent.version]}"
  headers:
    - key: "some-key"
      value: "some value"
    - key: "another-key"
      value: "another value"

client_id编辑

用于日志记录、调试和审计目的的可配置 ClientID。默认值为“beats”。

codec编辑

输出编解码器配置。如果缺少 codec 部分,则事件将被 json 编码。

有关更多信息,请参见 更改输出编解码器

metadata编辑

Kafka 元数据更新设置。元数据包含有关代理、主题、分区和用于发布的活动领导者的信息。

refresh_frequency
元数据刷新间隔。默认为 10 分钟。
full
获取元数据时要使用的策略,当此选项为 true 时,客户端将维护所有可用主题的完整元数据集,如果将此选项设置为 false,它将仅刷新配置主题的元数据。默认值为 false。
retry.max
当集群处于领导者选举的中间状态时,元数据更新重试的总次数。默认值为 3。
retry.backoff
领导者选举期间重试之间的等待时间。默认值为 250 毫秒。

max_retries编辑

在发布失败后重试发布事件的次数。在指定的重试次数后,事件通常会被丢弃。

max_retries 设置为小于 0 的值,以重试,直到所有事件都被发布。

默认值为 3。

backoff.init编辑

在网络错误后等待尝试重新发布到 Kafka 的秒数。在等待 backoff.init 秒后,Packetbeat 将尝试重新发布。如果尝试失败,则退避计时器将以指数方式增加到 backoff.max。在成功发布后,退避计时器将重置。默认值为 1 秒。

backoff.maxedit

网络错误后尝试重新发布到 Kafka 的最大等待时间(秒)。默认值为 60 秒。

bulk_max_sizeedit

单个 Kafka 请求中批量发送的最大事件数。默认值为 2048。

bulk_flush_frequencyedit

发送批量 Kafka 请求之前的等待时间。0 表示没有延迟。默认值为 0。

timeoutedit

等待 Kafka 代理响应的最长时间(秒)。默认值为 30 秒。

broker_timeoutedit

代理等待所需 ACK 数量的最长时间。默认值为 10 秒。

channel_buffer_sizeedit

每个 Kafka 代理在输出管道中缓冲的消息数量。默认值为 256。

keep_aliveedit

活动网络连接的保持活动时间。如果为 0 秒,则禁用保持活动。默认值为 0 秒。

compressionedit

设置输出压缩编解码器。必须是 nonesnappylz4gzip 之一。默认值为 gzip

Azure Event Hub for Kafka 的已知问题

如果目标是 Azure Event Hub for Kafka,请将 compression 设置为 none,因为提供的编解码器不受支持。

compression_leveledit

设置 gzip 使用的压缩级别。将此值设置为 0 会禁用压缩。压缩级别必须在 1(最快速度)到 9(最佳压缩)之间。

提高压缩级别会减少网络使用量,但会增加 CPU 使用量。

默认值为 4。

max_message_bytesedit

允许的最大 JSON 编码消息大小。更大的消息将被丢弃。默认值为 1000000 字节。此值应等于或小于代理的 message.max.bytes

required_acksedit

代理所需的 ACK 可靠性级别。0=无响应,1=等待本地提交,-1=等待所有副本提交。默认值为 1。

注意:如果设置为 0,Kafka 不会返回任何 ACK。消息可能会在错误时静默丢失。

ssledit

用于 SSL 参数(如 Kafka 连接的根 CA)的配置选项。Kafka 主机密钥库应该使用 -keyalg RSA 参数创建,以确保它使用 Filebeat 的 Kafka 库 支持的密码。有关详细信息,请参见 SSL

kerberosedit

此功能处于测试阶段,可能会发生变化。设计和代码的成熟度低于正式 GA 功能,按原样提供,不提供任何担保。测试版功能不适用正式 GA 功能的支持服务级别协议。

用于 Kerberos 身份验证的配置选项。

有关详细信息,请参见 Kerberos