配置 Kafka 输出编辑

Kafka 输出将事件发送到 Apache Kafka。

要使用此输出,请编辑 APM Server 配置文件以通过注释掉来禁用 Elasticsearch 输出,并通过取消注释 Kafka 部分来启用 Kafka 输出。

示例配置

output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]

  # message topic selection + partitioning
  topic: '%{[fields.log_topic]}'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

大于 max_message_bytes 的事件将被丢弃。为避免此问题,请确保 APM Server 不会生成大于 max_message_bytes 的事件。

Kibana 配置编辑

APM Server 使用 APM 集成来设置和管理 APM 模板、策略和管道。为了确认集成已安装,APM Server 在启动时会轮询 Elasticsearch 或 Kibana。当使用非 Elasticsearch 输出时,APM Server 需要通过 Kibana 端点 访问 Kibana。

示例配置

apm-server:
  kibana:
    enabled: true
    host: "https://..."
    username: "elastic"
    password: "xxx"
兼容性编辑

此输出适用于 0.11 到 2.2.2 之间的 Kafka 版本。较旧的版本也可能有效,但不受支持。

配置选项编辑

您可以在 apm-server.yml 配置文件中的 kafka 部分指定以下选项

enabled编辑

enabled 配置是一个布尔值设置,用于启用或禁用输出。如果设置为 false,则禁用输出。

默认值为 false

hosts编辑

Kafka 代理地址列表,从中获取集群元数据。集群元数据包含发布事件的实际 Kafka 代理。

version编辑

假设 apm-server 运行的 Kafka 版本。默认为 1.0.0。

如果启用了版本 0.10.0.0+,则会添加事件时间戳。

有效值为 0.8.2.02.0.0 之间的 Kafka 版本。

有关支持版本的详细信息,请参阅 兼容性

username编辑

连接到 Kafka 的用户名。如果配置了用户名,则也必须配置密码。

password编辑

连接到 Kafka 的密码。

sasl.mechanism编辑

此功能处于测试阶段,可能会发生变化。设计和代码不如正式 GA 功能成熟,按原样提供,不提供任何保证。测试版功能不受正式 GA 功能支持 SLA 的约束。

连接到 Kafka 时要使用的 SASL 机制。它可以是以下之一

  • PLAIN 用于 SASL/PLAIN。
  • SCRAM-SHA-256 用于 SCRAM-SHA-256。
  • SCRAM-SHA-512 用于 SCRAM-SHA-512。

如果未设置 sasl.mechanism,则如果提供了 usernamepassword,则使用 PLAIN。否则,SASL 身份验证将被禁用。

topic编辑

用于生成事件的 Kafka 主题。

您可以使用格式字符串访问任何事件字段来动态设置主题。例如,此配置使用自定义字段 fields.log_topic 来设置每个事件的主题

topic: '%{[fields.log_topic]}'

有关其他动态设置主题的方法,请参阅 topics 设置。

topics编辑

主题选择器规则数组。每个规则指定与规则匹配的事件要使用的 topic。在发布期间,APM Server 根据数组中第一个匹配的规则设置每个事件的 topic。规则可以包含条件语句、基于格式字符串的字段和名称映射。如果缺少 topics 设置或没有规则匹配,则使用 topic 字段。

规则设置

topic
要使用的主题格式字符串。如果此字符串包含字段引用,例如 %{[fields.name]},则必须存在这些字段,否则规则将失败。
mappings
一个字典,它接受 topic 返回的值并将其映射到一个新名称。
default
如果 mappings 未找到匹配项,则要使用的默认字符串值。
when
必须成功的条件,才能执行当前规则。

以下示例根据消息字段是否包含指定字符串来设置主题

output.kafka:
  hosts: ["localhost:9092"]
  topic: "logs-%{[agent.version]}"
  topics:
    - topic: "critical-%{[agent.version]}"
      when.contains:
        message: "CRITICAL"
    - topic: "error-%{[agent.version]}"
      when.contains:
        message: "ERR"

此配置将导致名为 critical-8.14.2error-8.14.2logs-8.14.2 的主题。

key编辑

可选的格式化字符串,指定 Kafka 事件键。如果已配置,则可以使用格式字符串从事件中提取事件键。

有关特定键选择的影响,请参阅 Kafka 文档;默认情况下,键由 Kafka 集群选择。

partition编辑

Kafka 输出代理事件分区策略。必须是 randomround_robinhash 之一。默认情况下,使用 hash 分区器。

random.group_events: 设置要发布到同一分区的事件数量,然后分区器通过随机方式选择一个新分区。默认值为 1,表示在每个事件之后,都会随机选择一个新分区。

round_robin.group_events: 设置要发布到同一分区的事件数量,然后分区器选择下一个分区。默认值为 1,表示在每个事件之后,都会选择下一个分区。

hash.hash: 用于从事件中计算分区哈希值的字段列表。如果未配置任何字段,则将使用事件的 key 值。

hash.random: 如果无法计算哈希值或键值,则随机分配事件。

默认情况下,所有分区器都会尝试将事件发布到所有分区。如果分区的领导者变得无法访问,则输出可能会阻塞。所有分区器都支持设置 reachable_only 来覆盖此行为。如果将 reachable_only 设置为 true,则事件将仅发布到可用的分区。

发布到可用分区子集可能会增加资源使用量,因为事件可能会不均匀地分布。

client_id编辑

用于日志记录、调试和审核目的的可配置客户端 ID。默认值为“beats”。

worker编辑

并发负载均衡 Kafka 输出工作程序的数量。

codec编辑

输出编解码器配置。如果缺少 codec 部分,则事件将使用 JSON 编码。

有关详细信息,请参阅 更改输出编解码器

metadata编辑

Kafka 元数据更新设置。元数据包含有关代理、主题、分区和用于发布的活动领导者的信息。

refresh_frequency
元数据刷新间隔。默认为 10 分钟。
full
获取元数据时要使用的策略,当此选项为 true 时,客户端将维护所有可用主题的完整元数据集,如果此选项设置为 false,则它将仅刷新已配置主题的元数据。默认值为 false。
retry.max
当集群处于领导者选举中间状态时,元数据更新重试的总次数。默认值为 3。
retry.backoff
领导选举期间重试之间的等待时间。默认值为 250ms
max_retries编辑

在发布失败后重试发布事件的次数。在指定次数的重试后,事件通常会被丢弃。

max_retries 设置为小于 0 的值,以便在所有事件发布之前一直重试。

默认值为 3。

backoff.init编辑

在网络错误后尝试重新发布到 Kafka 之前等待的秒数。等待 backoff.init 秒后,APM Server 会尝试重新发布。如果尝试失败,则回退计时器会以指数方式增加,直到达到 backoff.max。在成功发布后,回退计时器将重置。默认值为 1s

backoff.max编辑

在网络错误后尝试重新发布到 Kafka 之前等待的最大秒数。默认值为 60s

bulk_max_size编辑

在单个 Kafka 请求中批量处理的事件的最大数量。默认值为 2048。

bulk_flush_frequency编辑

在发送批量 Kafka 请求之前等待的持续时间。0 表示没有延迟。默认值为 0。

timeout编辑

在超时之前等待 Kafka 代理响应的秒数。默认值为 30(秒)。

broker_timeout编辑

代理等待所需 ACK 数量的最大持续时间。默认值为 10s

channel_buffer_size编辑

每个 Kafka 代理在输出管道中缓冲的消息数量。默认值为 256。

keep_alive编辑

活动网络连接的保持活动周期。如果为 0s,则禁用保持活动。默认值为 0s

compression编辑

设置输出压缩编解码器。必须是 nonesnappylz4gzip 之一。默认值为 gzip

Azure Event Hub for Kafka 的已知问题

当目标为 Azure Event Hub for Kafka 时,将 compression 设置为 none,因为提供的编解码器不受支持。

compression_level编辑

设置 gzip 使用的压缩级别。将此值设置为 0 将禁用压缩。压缩级别必须在 1(最佳速度)到 9(最佳压缩)之间。

提高压缩级别将减少网络使用量,但会增加 CPU 使用量。

默认值为 4。

max_message_bytes编辑

允许的 JSON 编码消息的最大大小。更大的消息将被丢弃。默认值为 1000000(字节)。此值应等于或小于代理的 message.max.bytes

required_acks编辑

代理所需的 ACK 可靠性级别。0=无响应,1=等待本地提交,-1=等待所有副本提交。默认值为 1。

注意:如果设置为 0,Kafka 不会返回任何 ACK。消息可能会在错误时静默丢失。

enable_krb5_fast编辑

此功能处于测试阶段,可能会发生变化。设计和代码不如正式 GA 功能成熟,按原样提供,不提供任何保证。测试版功能不受正式 GA 功能支持 SLA 的约束。

启用 Kerberos FAST 身份验证。这可能会与某些 Active Directory 安装冲突。它与标准 Kerberos 设置分开,因为此标志仅适用于 Kafka 输出。默认值为 false

ssl编辑

用于 SSL 参数的配置选项,例如 Kafka 连接的根 CA。Kafka 主机密钥库应使用 -keyalg RSA 参数创建,以确保它使用 Filebeat 的 Kafka 库 支持的密码。有关更多信息,请参见 SSL/TLS 输出设置