Kafka 输出

编辑

Kafka 输出将事件发送到 Apache Kafka。

兼容性:此输出可以连接到 Kafka 版本 0.8.2.0 及更高版本。旧版本也可能有效,但不受支持。

以下示例在 Elastic Agent elastic-agent.yml 文件中配置一个名为 kafka-output 的 Kafka 输出,其设置如下所述:

outputs:
  kafka-output:
    type: kafka
    hosts:
      - 'kafka1:9092'
      - 'kafka2:9092'
      - 'kafka3:9092'
    client_id: Elastic
    version: 1.0.0
    compression: gzip
    compression_level: 4
    username: <my-kafka-username>
    password: <my-kakfa-password>
    sasl:
      mechanism: SCRAM-SHA-256
    partition:
      round_robin:
        group_events: 1
    topic: 'elastic-agent'
    headers: []
    timeout: 30
    broker_timeout: 30
    required_acks: 1
    ssl:
      verification_mode: full

Kafka 输出和使用 Logstash 将数据索引到 Elasticsearch

编辑

如果您正在考虑使用 Logstash 将数据从 kafka 发送到 Elasticsearch,请注意从 Elastic Agent 发送到 kafka 的文档结构不得被 Logstash 修改。我们建议禁用 kafka 输入和 json 编解码器上的 ecs_compatibility,以确保输入不会编辑字段及其内容。

集成设置的数据流期望接收与直接从 Elastic Agent 发送的事件具有相同结构和字段名称的事件。

有关更多详细信息,请参阅 Elastic Agent 的 Logstash 输出文档。

inputs {
  kafka {
    ...
    ecs_compatibility => "disabled"
    codec => json { ecs_compatibility => "disabled" }
    ...
  }
}
...

Kafka 输出配置设置

编辑

kafka 输出支持以下按类别分组的设置。其中许多设置具有合理的默认值,允许您以最少的配置运行 Elastic Agent。

设置 描述

enabled

(布尔值)启用或禁用输出。如果设置为 false,则禁用输出。

hosts

您的 Elastic Agent 将用于连接到一个或多个 Kafka 代理的地址。

以下是一个示例 hosts 设置,其中定义了三个主机

    hosts:
      - 'localhost:9092'
      - 'mykafkahost01:9092'
      - 'mykafkahost02:9092'

version

Elastic Agent 在连接时将请求的 Kafka 协议版本。默认为 1.0.0。

协议版本控制可供 Elastic Agent 使用的 Kafka 客户端功能;它不会阻止 Elastic Agent 连接到比协议版本更新的 Kafka 版本。

身份验证设置

编辑
设置 描述

username

连接到 Kafka 的用户名。如果配置了用户名,则必须同时配置密码。

password

连接到 Kafka 的密码。

sasl.mechanism

连接到 Kafka 时要使用的 SASL 机制。它可以是以下之一:

  • PLAIN 用于 SASL/PLAIN。
  • SCRAM-SHA-256 用于 SCRAM-SHA-256。
  • SCRAM-SHA-512 用于 SCRAM-SHA-512。如果未设置 sasl.mechanism,则如果提供 usernamepassword,则使用 PLAIN。否则,禁用 SASL 身份验证。

ssl

当通过 kafka 输出将数据发送到安全集群时,Elastic Agent 可以使用 SSL/TLS。有关可用设置的列表,请参阅 SSL/TLS,特别是 表 7,“常用配置选项”表 8,“客户端配置选项”下的设置。

内存队列设置

编辑

内存队列将所有事件保存在内存中。

内存队列等待输出确认或丢弃事件。如果队列已满,则无法将新事件插入内存队列。只有在收到输出的信号后,队列才会释放空间以接受更多事件。

内存队列由参数 flush.min_eventsflush.timeout 控制。flush.min_events 限制了单个批次中可以包含的事件数量,而 flush.timeout 指定队列应等待多长时间才能完全填充事件请求。如果输出支持 bulk_max_size 参数,则最大批次大小将是 bulk_max_sizeflush.min_events 中较小的值。

flush.min_events 是一个旧参数,新配置应优先使用 bulk_max_size 来控制批次大小。从 8.13 开始,使用 flush.min_events 而不是 bulk_max_size 来限制批次大小永远不会有性能优势。

在同步模式下,事件请求始终在事件可用时立即填充,即使没有足够的事件来填充请求的批次。这在必须最小化延迟时很有用。要使用同步模式,请将 flush.timeout 设置为 0。

为了向后兼容,还可以通过将 flush.min_events 设置为 0 或 1 来激活同步模式。在这种情况下,批次大小将被限制为队列容量的 1/2。

在异步模式下,事件请求将等待指定超时时间,以尝试完全填充请求的批次。如果超时到期,队列将返回包含所有可用事件的部分批次。要使用异步模式,请将 flush.timeout 设置为正持续时间,例如 5 秒。

此示例配置在有足够的事件来填充输出的请求(通常由 bulk_max_size 控制,并且最多受 flush.min_events 限制为 512 个事件)时,或者在事件已等待一段时间的情况下,将事件转发到输出

  queue.mem.events: 4096
  queue.mem.flush.min_events: 512
  queue.mem.flush.timeout: 5s
设置 描述

queue.mem.events

队列可以存储的事件数。此值应可被 queue.mem.flush.min_eventsbulk_max_size 中较小的值整除,以避免将部分批次发送到输出。

默认值: 3200 个事件

queue.mem.flush.min_events

flush.min_events 是一个旧参数,新配置应优先使用 bulk_max_size 来控制批次大小。从 8.13 开始,使用 flush.min_events 而不是 bulk_max_size 来限制批次大小永远不会有性能优势

默认值: 1600 个事件

queue.mem.flush.timeout

(整数)满足 queue.mem.flush.min_events 的最长等待时间。如果设置为 0 秒,则事件立即可用于输出。

默认值: 10 秒

主题设置

编辑

使用这些选项为每个 Elastic Agent 事件设置 Kafka 主题。

设置 描述

topic

用于生成事件的默认 Kafka 主题。

分区设置

编辑

创建的分区数由 Kafka 代理根据主题列表自动设置。然后,记录将随机、以循环方式或根据计算的哈希值发布到分区。

在以下示例中,每个事件发布到分区后,分区器将以循环方式选择下一个分区。

    partition:
      round_robin:
        group_events: 1
设置 描述

random.group_events

设置在分区器随机选择新分区之前,要发布到同一分区的事件数。默认值为 1,表示在每个事件之后,随机选择一个新分区。

round_robin.group_events

设置在分区器选择下一个分区之前,要发布到同一分区的事件数。默认值为 1,表示在每个事件之后,将选择下一个分区。

hash.hash

用于计算分区哈希值的字段列表。如果未配置任何字段,则将使用事件键值。

hash.random

如果无法计算哈希值或键值,则随机分发事件。

标头设置

编辑

标头是键值对,多个标头可以包含相同的键。仅支持字符串值。这些标头将包含在每个生成的 Kafka 消息中。

设置 描述

key

要在 Kafka 标头中设置的键。

value

要在 Kafka 标头中设置的值。

client_id

用于日志记录、调试和审计目的的可配置 ClientID。默认值为 Elastic。客户端 ID 是协议的一部分,用于标识消息的来源。

其他配置设置

编辑

您可以在代理配置文件的 kafka-output 部分中指定这些各种其他选项。

设置 描述

backoff.init

(字符串)在网络错误后尝试重新连接到 Kafka 之前等待的秒数。等待 backoff.init 秒后,Elastic Agent 尝试重新连接。如果尝试失败,回退计时器将呈指数增长,直到达到 backoff.max。成功连接后,回退计时器将重置。

默认值: 1秒

backoff.max

(字符串) 网络错误后尝试连接 Kafka 之前等待的最大秒数。

默认值: 60秒

broker_timeout

Kafka broker 等待所需数量的 ACK 确认的最大时长,然后超时(请参阅后面的 required_acks 设置)。

默认值: 30 (秒)

bulk_flush_frequency

(整数) 发送批量 Kafka 请求之前等待的时长。 0 表示不延迟。

默认值: 0

bulk_max_size

(整数) 单个 Kafka 请求中批量处理的最大事件数。

默认值: 2048

channel_buffer_size

(整数) 每个 Kafka broker 在输出管道中缓冲的消息数量。

默认值: 256

codec

输出编解码器配置。您可以指定 jsonformat 编解码器。默认情况下使用 json 编解码器。

json.pretty: 如果 pretty 设置为 true,事件将格式化为易读的格式。默认值为 false。

json.escape_html: 如果 escape_html 设置为 true,HTML 符号将在字符串中被转义。默认值为 false。

以下示例配置使用启用美化打印的 json 编解码器将事件写入控制台

output.console:
  codec.json:
    pretty: true
    escape_html: false

format.string: 用于创建自定义格式化消息的可配置格式字符串。

以下示例配置使用 format 编解码器将事件的时间戳和消息字段打印到控制台

output.console:
  codec.format:
    string: '%{[@timestamp]} %{[message]}'

compression

选择要使用的压缩编解码器。支持的编解码器有 snappylz4gzip

compression_level

对于 gzip 编解码器,您可以选择压缩级别。级别必须在 1 (最佳速度)到 9 (最佳压缩)的范围内。

增加压缩级别会减少网络使用,但会增加 CPU 使用率。

默认值: 4

keep_alive

(字符串) 活动网络连接的保持活动周期。如果为 0秒,则禁用保持活动。

默认值: 0秒

max_message_bytes

(整数) JSON 编码消息的最大允许大小。更大的消息将被丢弃。该值应等于或小于 broker 的 message.max.bytes

默认值: 1000000 (字节)

metadata

Kafka 元数据更新设置。元数据包含有关 brokers、topics、分区和用于发布的活动 leader 的信息。

refresh_frequency
元数据刷新间隔。默认为 10 分钟。
full
获取元数据时使用的策略。当此选项为 true 时,客户端将维护所有可用主题的完整元数据集。当设置为 false 时,它将仅刷新已配置主题的元数据。默认值为 false。
retry.max
元数据更新重试的总次数。默认值为 3。
retry.backoff
重试之间的等待时间。默认值为 250 毫秒。

required_acks

从 broker 收到的 ACK 可靠性级别。0=无响应,1=等待本地提交,-1=等待所有副本提交。默认值为 1。

注意:如果设置为 0,Kafka 不会返回任何 ACK。错误时消息可能会静默丢失。

默认值: 1 (等待本地提交)

timeout

等待 Kafka brokers 响应超时前的秒数。默认值为 30 (秒)。

默认值: 1000000 (字节)