Kafka 输出插件

编辑

有关其他版本,请参阅 版本化插件文档

获取帮助

编辑

有关插件的问题,请在 Discuss 论坛中发起主题。对于错误或功能请求,请在 Github 中打开一个问题。有关 Elastic 支持的插件列表,请查阅 Elastic 支持矩阵

描述

编辑

将事件写入 Kafka 主题。

此插件使用 Kafka Client 3.4。有关代理兼容性,请参阅官方的 Kafka 兼容性参考。如果链接的兼容性维基未更新,请与 Kafka 支持/社区联系以确认兼容性。

如果您需要此插件中尚不可用的功能(包括客户端版本升级),请提交一个包含您需要内容详细信息的问题。

此输出支持通过以下方式连接到 Kafka

  • SSL(需要插件版本 3.0.0 或更高版本)
  • Kerberos SASL(需要插件版本 5.1.0 或更高版本)

默认情况下,安全性已禁用,但可以根据需要启用。

唯一必需的配置是 topic_id。

默认编解码器为 plain。Logstash 不仅会使用 message 字段对事件进行编码,还会使用时间戳和主机名进行编码。

如果希望将事件的完整内容作为 json 发送,则应在输出配置中设置编解码器,如下所示

    output {
      kafka {
        codec => json
        topic_id => "mytopic"
      }
    }

有关更多信息,请参阅 https://kafka.apache.org/34/documentation.html#theproducer

Kafka 生产者配置: https://kafka.apache.org/34/documentation.html#producerconfigs

此插件不支持在与 Kafka 代理通信时使用代理。

Kafka 输出配置选项

编辑

此插件支持以下配置选项以及稍后描述的 通用选项

其中一些选项映射到 Kafka 选项。默认值通常反映 Kafka 默认设置,并且如果 Kafka 的生产者默认值发生更改,可能会发生变化。有关更多详细信息,请参阅 https://kafka.apache.org/34/documentation

另请参阅 通用选项,以获取所有输出插件支持的选项列表。

 

acks

编辑
  • 值可以是以下任何一个:01all
  • 默认值为 "1"

生产者在认为请求已完成之前,要求 leader 收到的确认次数。

acks=0。生产者不会等待服务器的任何确认。

acks=1。leader 会将记录写入其本地日志,但会立即响应,而不会等待所有 follower 的完全确认。

acks=all。leader 会等待完整的同步副本集,然后再确认记录。

batch_size

编辑
  • 值类型为 数字
  • 默认值为 16384

当多个记录发送到同一分区时,生产者会尝试将记录批量组合到更少的请求中。这有助于提高客户端和服务器两端的性能。此配置控制默认的批量大小(以字节为单位)。

bootstrap_servers

编辑
  • 值类型为 字符串
  • 默认值为 "localhost:9092"

这是用于引导的,生产者仅将其用于获取元数据(主题、分区和副本)。发送实际数据的套接字连接将根据元数据中返回的代理信息建立。格式为 host1:port1,host2:port2,并且列表可以是代理的子集或指向代理子集的 VIP。

buffer_memory

编辑
  • 值类型为 数字
  • 默认值为 33554432(32MB)。

生产者可以用来缓冲等待发送到服务器的记录的内存总字节数。

client_dns_lookup

编辑
  • 值类型为 字符串
  • 有效选项为 use_all_dns_ipsresolve_canonical_bootstrap_servers_onlydefault
  • 默认值为 "default"

控制如何执行 DNS 查找。如果设置为 use_all_dns_ips,则 Logstash 会尝试主机返回的所有 IP 地址,然后再失败连接。如果设置为 resolve_canonical_bootstrap_servers_only,则每个条目将被解析并扩展为规范名称列表。

从 Kafka 3 开始,已删除 client.dns.lookup 值的 default 值。如果未显式配置,则默认为 use_all_dns_ips

client_id

编辑
  • 值类型为 字符串
  • 默认值为 "logstash"

在发出请求时传递给服务器的 ID 字符串。其目的是能够通过允许在请求中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是 ip/port。

compression_type

编辑
  • 值可以是以下任何一个:nonegzipsnappylz4zstd
  • 默认值为 "none"

生产者生成的所有数据的压缩类型。默认值为 none(表示不压缩)。有效值为 none、gzip、snappy、lz4 或 zstd。

connections_max_idle_ms

编辑
  • 值类型为 数字
  • 默认值为 540000 毫秒(9 分钟)。

在由此配置指定的毫秒数后关闭空闲连接。

jaas_path

编辑
  • 值类型为 路径
  • 此设置没有默认值。

Java 身份验证和授权服务 (JAAS) API 为 Kafka 提供用户身份验证和授权服务。此设置提供 JAAS 文件的路径。Kafka 客户端的示例 JAAS 文件

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=true
  renewTicket=true
  serviceName="kafka";
  };

请注意,在配置文件中指定 jaas_pathkerberos_config 会将这些添加到全局 JVM 系统属性中。这意味着如果您有多个 Kafka 输入,则所有输入都将共享相同的 jaas_pathkerberos_config。如果这不是您想要的,则必须在不同的 JVM 实例上运行 Logstash 的单独实例。

kerberos_config

编辑
  • 值类型为 路径
  • 此设置没有默认值。

Kerberos 配置文件的可选路径。这与 https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html 中详述的 krb5.conf 样式相同。

key_serializer

编辑
  • 值类型为 字符串
  • 默认值为 "org.apache.kafka.common.serialization.StringSerializer"

消息键的序列化程序类。

linger_ms

编辑
  • 值类型为 数字
  • 默认值为 0

生产者将请求传输之间到达的任何记录组合到单个批量请求中。通常,这仅在记录到达速度快于发送速度时才会在负载下发生。但是,在某些情况下,客户端可能希望即使在中等负载下也减少请求数量。此设置通过添加少量人工延迟来实现此目的,即,生产者不会立即发送记录,而是最多等待给定的延迟,以允许发送其他记录,以便可以将发送批量组合在一起。

max_request_size

编辑
  • 值类型为 数字
  • 默认值为 1048576(1MB)。

请求的最大大小。

message_headers

编辑
  • 值类型为 哈希

    • 键是标头名称,必须为 字符串
    • 值是标头值,必须为 字符串
    • 值支持从事件字段值进行插值。
  • 此设置没有默认值。

键值对的映射,每个映射分别对应于标头名称及其值。示例

    message_headers => { "event_timestamp" => "%{@timestamp}" }

message_key

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

消息的键。

metadata_fetch_timeout_ms

编辑
  • 值类型为 数字
  • 默认值为 60000 毫秒(60 秒)。

初始元数据请求以获取主题元数据的超时设置。

metadata_max_age_ms

编辑
  • 值类型为 数字
  • 默认值为 300000 毫秒(5 分钟)。

强制刷新元数据之前的最大时间(以毫秒为单位)。

partitioner

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

默认行为是将事件的message_key进行哈希以获取分区。如果不存在消息键,插件将以循环方式选择一个分区。

以下是选择分区策略的可用选项

  • default 使用如上所述的默认分区器
  • round_robin 将写入均匀地分配到所有分区,而不管message_key如何。
  • uniform_sticky 在批处理期间坚持使用一个分区,然后随机选择一个新的分区。

receive_buffer_bytes

编辑
  • 值类型为 数字
  • 默认值为32768(32KB)。

读取数据时使用的 TCP 接收缓冲区的大小。

reconnect_backoff_ms

编辑
  • 值类型为 数字
  • 默认值为50

连接失败时,在尝试重新连接到给定主机之前等待的时间。

request_timeout_ms

编辑
  • 值类型为 数字
  • 默认值为40000 毫秒(40 秒)。

此配置控制客户端等待请求响应的最长时间。如果在超时时间到期之前未收到响应,客户端将根据需要重新发送请求,或者如果重试次数用尽则使请求失败。

retries

编辑
  • 值类型为 数字
  • 此设置没有默认值。

默认重试行为是重试直到成功。为了防止数据丢失,不建议更改此设置。

如果选择设置retries,则大于零的值将导致客户端仅重试固定次数。如果传输故障持续时间超过重试次数(网络中断、Kafka 宕机等),这将导致数据丢失。

小于零的值是配置错误。

从版本 10.5.0 开始,此插件将仅重试为RetriableExceptionInterruptException 子类的异常。如果生成消息抛出任何其他异常,则会记录错误并丢弃消息,而不会重试。这可以防止 Logstash 管道无限期挂起。

在 10.5.0 之前的版本中,任何异常都会无限期重试,除非配置了retries选项。

retry_backoff_ms

编辑
  • 值类型为 数字
  • 默认值为100 毫秒。

在尝试将失败的生产请求重新发送到给定主题分区之前等待的时间。

sasl_client_callback_handler_class

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

指定的 SASL 机制应使用的 SASL 客户端回调处理程序类。

sasl_jaas_config

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

此插件实例的本地 JAAS 配置设置,而不是使用jaas_path配置的配置文件中设置,这些设置在整个 JVM 中共享。这允许每个插件实例拥有自己的配置。

如果同时设置了sasl_jaas_configjaas_path配置,则此处的设置优先。

示例(Azure 事件中心设置)

    output {
      kafka {
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='auser'  password='apassword';"
      }
    }

sasl_kerberos_service_name

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

Kafka 代理运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。

sasl_mechanism

编辑
  • 值类型为 字符串
  • 默认值为"GSSAPI"

SASL 机制 用于客户端连接。这可能是任何可使用安全提供程序的机制。GSSAPI 是默认机制。

security_protocol

编辑
  • 值可以是以下任何一个:PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL
  • 默认值为"PLAINTEXT"

要使用的安全协议,可以是 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL 中的任何一个。

send_buffer_bytes

编辑
  • 值类型为 数字
  • 默认值为131072(128KB)。

发送数据时使用的 TCP 发送缓冲区的大小。

ssl_endpoint_identification_algorithm

编辑

端点标识算法,默认为"https"。设置为空字符串""以禁用。

ssl_key_password

编辑
  • 值类型为password
  • 此设置没有默认值。

密钥存储文件中的私钥密码。

ssl_keystore_location

编辑
  • 值类型为 路径
  • 此设置没有默认值。

如果需要客户端身份验证,此设置将存储密钥库路径。

ssl_keystore_password

编辑
  • 值类型为password
  • 此设置没有默认值。

如果需要客户端身份验证,此设置将存储密钥库密码。

ssl_keystore_type

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

密钥库类型。

ssl_truststore_location

编辑
  • 值类型为 路径
  • 此设置没有默认值。

用于验证 Kafka 代理证书的 JKS 信任库路径。

ssl_truststore_password

编辑
  • 值类型为password
  • 此设置没有默认值。

信任库密码。

ssl_truststore_type

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

信任库类型。

topic_id

编辑
  • 这是必需的设置。
  • 值类型为 字符串
  • 此设置没有默认值。

要向其生成消息的主题。

value_serializer

编辑
  • 值类型为 字符串
  • 默认值为 "org.apache.kafka.common.serialization.StringSerializer"

消息值的序列化程序类。

通用选项

编辑

所有输出插件都支持这些配置选项。

设置 输入类型 必需

codec

codec

enable_metric

boolean

id

字符串

codec

编辑
  • 值类型为codec
  • 默认值为"plain"

用于输出数据的编解码器。输出编解码器是在数据离开输出之前对其进行编码的便捷方法,而无需在 Logstash 管道中使用单独的过滤器。

enable_metric

编辑
  • 值类型为boolean
  • 默认值为true

禁用或启用此特定插件实例的指标日志记录。默认情况下,我们会记录所有可以记录的指标,但您可以为特定插件禁用指标收集。

  • 值类型为string
  • 此设置没有默认值。

向插件配置添加唯一的ID。如果未指定 ID,Logstash 将生成一个。强烈建议在配置中设置此 ID。当您有两个或多个相同类型的插件时,这尤其有用。例如,如果您有两个 kafka 输出。在这种情况下,添加命名的 ID 将有助于在使用监控 API 监控 Logstash 时提供帮助。

output {
  kafka {
    id => "my_plugin_id"
  }
}

id字段中的变量替换仅支持环境变量,不支持使用密钥存储中的值。