Kafka 输出设置

编辑

指定这些设置以通过安全连接将数据发送到 Kafka。在 Fleet 输出设置中,确保已选择 Kafka 输出类型。

如果您计划在将 Elastic Agent 输出数据发送到 Kafka 之前使用 Logstash 修改它们,请参阅本页稍后的 指南

常规设置
编辑

Kafka 版本

Elastic Agent 连接时将请求的 Kafka 协议版本。默认为 1.0.0。目前支持 0.8.2.02.6.0 的 Kafka 版本,但选择版本 2.6.0 时,预计最新的 Kafka 版本 (3.x.x) 将兼容。使用 Kafka 4.0 及更高版本时,版本必须设置为至少 2.1.0

主机

您的 Elastic Agent 将用来连接到一个或多个 Kafka 代理的地址。使用格式 host:port(不带任何协议 http://)。单击添加行以指定其他地址。

示例

  • localhost:9092
  • mykafkahost:9092

有关默认端口和其他配置详细信息,请参阅 Fleet Server 文档。

身份验证设置
编辑

选择 Elastic Agent 用于与 Kafka 进行身份验证的机制。

Elastic Agent 和 Kafka 之间不使用身份验证。这是默认选项。在生产环境中,建议选择一种身份验证方法。

纯文本

设置此选项,使 Elastic Agent 和 Kafka 之间的流量以纯文本形式发送,而不进行任何传输层安全保护。

这是未设置身份验证时的默认选项。

加密

设置此选项,使 Elastic Agent 和 Kafka 之间的流量使用传输层安全保护。

选择 加密后,服务器 SSL 证书颁发机构验证模式模式选项将变为可用。

用户名/密码

使用用户名和密码连接到 Kafka。

提供您的用户名和密码,并为您的登录凭据选择一个 SASL(简单身份验证和安全层)机制。

启用 SCRAM 后,Elastic Agent 将使用 SCRAM 机制来验证用户凭据。SCRAM 基于 IETF RFC5802 标准,该标准描述了一种用于验证用户的质询-响应机制。

  • 纯文本 - 不使用 SCRAM 进行身份验证
  • SCRAM-SHA-256 - 使用 SHA-256 哈希函数
  • SCRAM-SHA-512 - 使用 SHA-512 哈希函数

为了防止未经授权的访问,您的 Kafka 密码将存储为机密值。虽然建议使用机密存储,但您可以选择覆盖此设置并将密码作为纯文本存储在代理策略定义中。机密存储需要 Fleet Server 版本 8.12 或更高版本。

请注意,此设置也可以存储为机密值或作为预配置输出的纯文本。有关详细信息,请参阅 Kibana 指南中的 预配置设置

SSL

使用安全套接字层 (SSL) 协议进行身份验证。提供您的 SSL 证书的以下详细信息

客户端 SSL 证书

为客户端生成的证书。复制并粘贴证书的完整内容。这是所有代理将用来连接到 Kafka 的证书。

在每个客户端都有唯一证书的情况下,可以将该证书的本地路径放置在此处。代理在建立与 Kafka 的连接时,将选取该位置中的证书。

客户端 SSL 证书密钥

为客户端生成的私钥。这必须采用 PKCS 8 密钥。复制并粘贴证书密钥的完整内容。这是所有代理将用来连接到 Kafka 的证书密钥。

在每个客户端都有唯一证书密钥的情况下,可以将该证书密钥的本地路径放置在此处。代理在建立与 Kafka 的连接时,将选取该位置中的证书密钥。

为了防止未经授权的访问,证书密钥将存储为机密值。虽然建议使用机密存储,但您可以选择覆盖此设置并将密钥作为纯文本存储在代理策略定义中。机密存储需要 Fleet Server 版本 8.12 或更高版本。

请注意,此设置也可以存储为机密值或作为预配置输出的纯文本。有关详细信息,请参阅 Kibana 指南中的 预配置设置

服务器 SSL 证书颁发机构

用于连接到 Kafka 的 CA 证书。这是用于生成 Kafka 的证书和密钥的 CA。复制并粘贴 CA 证书的完整内容。

此设置是可选的。选择身份验证 纯文本 选项时,此设置不可用。

单击添加行以指定其他证书颁发机构。

验证模式

控制服务器证书的验证。有效值为

完整
验证所提供的证书是否由受信任的颁发机构 (CA) 签名,并验证服务器的主机名(或 IP 地址)是否与证书中标识的名称匹配。
不执行任何服务器证书的验证。此模式禁用了 SSL/TLS 的许多安全优势,应在谨慎考虑后才使用。它主要用作尝试解决 TLS 错误时的临时诊断机制;强烈建议不要在生产环境中使用它。
严格
验证所提供的证书是否由受信任的颁发机构 (CA) 签名,并验证服务器的主机名(或 IP 地址)是否与证书中标识的名称匹配。如果主题备用名称为空,则返回错误。
证书
验证所提供的证书是否由受信任的颁发机构 (CA) 签名,但不执行任何主机名验证。

默认值为 完整。选择身份验证 纯文本 选项时,此设置不可用。

分区设置
编辑

创建的分区数由 Kafka 代理根据主题列表自动设置。然后,记录将随机、按轮询顺序或根据计算的哈希发布到分区。

随机

将记录随机发布到 Kafka 输出代理事件分区。指定在分区程序选择新分区之前要发布到同一分区的事件数。

轮询

以轮询方式将记录发布到 Kafka 输出代理事件分区。指定在分区程序选择新分区之前要发布到同一分区的事件数。

哈希

根据从指定的字段列表计算出的哈希将记录发布到 Kafka 输出代理事件分区。如果未指定字段,则使用 Kafka 事件键值。

主题设置
编辑

使用此选项为每个 Elastic Agent 事件设置 Kafka 主题。

默认主题

设置一个默认主题,用于由 Elastic Agent 发送到 Kafka 输出的事件。

您可以设置一个静态主题,例如 elastic-agent,或者您可以选择根据 Elastic 通用模式 (ECS) 字段动态设置主题。可用字段包括

  • data_stream_type
  • data_stream.dataset
  • data_stream.namespace
  • @timestamp
  • event-dataset

您还可以设置自定义字段。如果您正在将 add_fields 处理器作为 Elastic Agent 输入的一部分,这将非常有用。否则,不建议设置自定义字段。

标头设置
编辑

标头是键值对,并且可以包含多个具有相同键的标头。仅支持字符串值。这些标头将包含在每个生成的 Kafka 消息中。

要在 Kafka 标头中设置的键。

要在 Kafka 标头中设置的值。

单击添加标头以配置要包含在每个 Kafka 消息中的其他标头。

客户端 ID

用于日志记录、调试和审核目的的可配置 ClientID。默认值为 Elastic。客户端 ID 是协议的一部分,用于标识消息的来源。

压缩设置
编辑

您可以启用压缩以减少 Kafka 输出的容量。

编解码器

选择要使用的压缩编解码器。支持的编解码器为 snappylz4gzip

级别

对于 gzip 编解码器,您可以选择压缩级别。级别必须在 1(最快速度)到 9(最佳压缩)的范围内。

提高压缩级别会减少网络使用量,但会增加 CPU 使用量。默认值为 4。

代理设置
编辑

为 Kafka 代理配置超时和缓冲区大小值。

代理超时

Kafka 代理在超时之前等待所需数量的 ACK 的最长时间(请参阅稍后的 ACK 可靠性 设置)。默认值为 30 秒。

代理可达性超时

Elastic Agent 等待 Kafka 代理响应的最长时间,超过此时间将会超时。默认值为 30 秒。

ACK 可靠性

代理所需的 ACK 可靠性级别。选项为

  • 等待本地提交
  • 等待所有副本提交
  • 不等待

默认值为 等待本地提交

请注意,如果 ACK 可靠性设置为 不等待,则 Kafka 不会返回任何 ACK。如果发生错误,消息可能会在无提示的情况下丢失。

其他设置
编辑

一个可选的格式化字符串,指定 Kafka 事件键。如果配置,则可以使用格式字符串从事件中提取事件键。

有关特定密钥选择的影响,请参阅 Kafka 文档;默认情况下,密钥由 Kafka 集群选择。

代理

为 Elastic Agent 选择用于连接 Kafka 的代理 URL。要了解有关代理配置的信息,请参阅将代理服务器与 Elastic Agent 和 Fleet 结合使用

高级 YAML 配置

将添加到每个使用此输出的策略的 Kafka 输出部分的 YAML 设置。请确保指定有效的 YAML。UI 当前不提供验证。

有关可用设置的描述,请参阅 高级 YAML 配置

将此输出设置为代理集成默认值

启用此设置后,如果代理策略中未设置其他输出,则 Elastic Agent 将使用此输出发送数据。

将此输出设置为代理监控默认值

启用此设置后,如果代理策略中未设置其他输出,则 Elastic Agent 将使用此输出发送代理监控数据

高级 YAML 配置

编辑
设置 描述

backoff.init

(字符串) 在发生网络错误后尝试重新连接到 Kafka 之前等待的秒数。等待 backoff.init 秒后,Elastic Agent 会尝试重新连接。如果尝试失败,则回退计时器将呈指数增长,直至达到 backoff.max。成功连接后,回退计时器将重置。

默认值: 1 秒

backoff.max

(字符串) 在发生网络错误后尝试连接到 Kafka 之前等待的最大秒数。

默认值: 60 秒

bulk_max_size

(整数) 单个 Kafka 请求中批量处理的最大事件数。

默认值: 2048

bulk_flush_frequency

(整数) 发送批量 Kafka 请求之前等待的持续时间。0 表示无延迟。

默认值: 0

channel_buffer_size

(整数) 每个 Kafka 代理在输出管道中缓冲的消息数。

默认值: 256

client_id

(字符串) 用于日志记录、调试和审计目的的可配置 ClientID。

默认值: Elastic Agent

codec

输出编解码器配置。您可以指定 jsonformat 编解码器。默认情况下,使用 json 编解码器。

json.pretty:如果将 pretty 设置为 true,则事件将以美观的方式进行格式化。默认值为 false。

json.escape_html:如果将 escape_html 设置为 true,则 HTML 符号将在字符串中进行转义。默认值为 false。

使用启用美观打印的 json 编解码器将事件写入控制台的配置示例

output.console:
  codec.json:
    pretty: true
    escape_html: false

format.string:用于创建自定义格式消息的可配置格式字符串。

使用 format 编解码器将事件时间戳和消息字段打印到控制台的可配置示例

output.console:
  codec.format:
    string: '%{[@timestamp]} %{[message]}'

默认值: json

keep_alive

(字符串) 活动网络连接的保持活动期。如果为 0 秒,则禁用保持活动。

默认值: 0 秒

max_message_bytes

(整数) JSON 编码消息允许的最大大小。较大的消息将被丢弃。此值应等于或小于代理的 message.max.bytes

默认值: 1000000 (字节)

metadata

Kafka 元数据更新设置。元数据包含有关代理、主题、分区和用于发布的活动领导者的信息。

refresh_frequency
元数据刷新间隔。默认为 10 分钟。
full
获取元数据时使用的策略。如果此选项为 true,则客户端将维护所有可用主题的完整元数据集。如果设置为 false,则它只会刷新配置主题的元数据。默认值为 false。
retry.max
元数据更新重试总次数。默认值为 3。
retry.backoff
重试之间的等待时间。默认值为 250 毫秒。

queue.mem.events

队列可以存储的事件数。此值应能被 queue.mem.flush.min_eventsbulk_max_size 中的较小值整除,以避免将部分批次发送到输出。

默认值: 3200 个事件

queue.mem.flush.min_events

flush.min_events 是一个旧参数,新配置应首选使用 bulk_max_size 来控制批次大小。从 8.13 版开始,使用 flush.min_events 而不是 bulk_max_size 来限制批次大小永远没有性能优势

默认值: 1600 个事件

queue.mem.flush.timeout

(整数) 满足 queue.mem.flush.min_events 的最长等待时间。如果设置为 0 秒,则事件会立即提供给输出。

默认值: 10 秒

Kafka 输出和使用 Logstash 将数据索引到 Elasticsearch

编辑

如果您正在考虑使用 Logstash 将数据从 kafka 传输到 Elasticsearch,请注意,从 Elastic Agent 发送到 kafka 的文档结构不得由 Logstash 修改。我们建议在 kafka 输入和 json 编解码器上禁用 ecs_compatibility,以确保输入不编辑字段及其内容。

集成设置的数据流希望接收具有与直接从 Elastic Agent 发送的事件结构和字段名称相同的事件。

从 Elastic Agent 发送到 kafka 的文档结构不得由 Logstash 修改。我们建议在 kafka 输入和 json 编解码器上禁用 ecs_compatibility

有关更多详细信息,请参阅适用于 Elastic Agent 的 Logstash 输出文档。

inputs {
  kafka {
    ...
    ecs_compatibility => "disabled"
    codec => json { ecs_compatibility => "disabled" }
    ...
  }
}
...