New

The executive guide to generative AI

Read more

Kafka 输出插件

编辑

对于其他版本,请参阅版本化插件文档

获取帮助

编辑

有关插件的问题,请在Discuss论坛中开启一个主题。对于错误或功能请求,请在Github中开启一个 issue。有关 Elastic 支持的插件列表,请查阅Elastic 支持矩阵

描述

编辑

将事件写入 Kafka 主题。

此插件使用 Kafka 客户端 3.4。有关 broker 兼容性,请参阅官方的Kafka 兼容性参考。如果链接的兼容性 wiki 不是最新的,请联系 Kafka 支持/社区以确认兼容性。

如果您需要此插件中尚未提供的功能(包括客户端版本升级),请提交一个包含您所需内容的详细信息的 issue。

此输出支持通过以下方式连接到 Kafka:

  • SSL(需要插件版本 3.0.0 或更高版本)
  • Kerberos SASL(需要插件版本 5.1.0 或更高版本)

默认情况下,安全性被禁用,但可以根据需要启用。

唯一必需的配置是 topic_id。

默认编解码器是 plain。Logstash 不仅会使用消息字段对您的事件进行编码,还会使用时间戳和主机名进行编码。

如果您希望将事件的完整内容以 json 格式发送,您应该像这样在输出配置中设置编解码器

    output {
      kafka {
        codec => json
        topic_id => "mytopic"
      }
    }

有关更多信息,请参阅 https://kafka.apache.org/34/documentation.html#theproducer

Kafka 生产者配置:https://kafka.apache.org/34/documentation.html#producerconfigs

此插件不支持在与 Kafka broker 通信时使用代理。

Kafka 输出配置选项

编辑

此插件支持以下配置选项以及稍后描述的通用选项

其中一些选项映射到 Kafka 选项。默认值通常反映 Kafka 的默认设置,如果 Kafka 的生产者默认值发生更改,则可能会更改。有关更多详细信息,请参阅https://kafka.apache.org/34/documentation

另请参阅通用选项,其中列出了所有输出插件支持的选项。

 

acks

编辑
  • 值可以是以下任意值:01all
  • 默认值为 "1"

生产者需要领导者在认为请求完成之前收到的确认数。

acks=0。生产者不会等待服务器的任何确认。

acks=1。领导者会将记录写入其本地日志,但在未收到所有跟随者的完全确认之前会进行响应。

acks=all。领导者将等待所有同步副本的完全确认,然后再确认该记录。

batch_size

编辑
  • 值类型为数字
  • 默认值为 16384

当多个记录被发送到同一分区时,生产者将尝试将记录批量处理为更少的请求。这有助于客户端和服务器上的性能。此配置控制默认的批处理大小(以字节为单位)。

bootstrap_servers

编辑
  • 值类型为字符串
  • 默认值为 "localhost:9092"

这是用于引导的,生产者将仅使用它来获取元数据(主题、分区和副本)。发送实际数据的套接字连接将基于元数据中返回的 broker 信息建立。格式为 host1:port1,host2:port2,并且列表可以是 broker 的子集或指向 broker 子集的 VIP。

buffer_memory

编辑
  • 值类型为数字
  • 默认值为 33554432 (32MB)。

生产者可用于缓冲等待发送到服务器的记录的总内存字节数。

client_dns_lookup

编辑
  • 值类型为字符串
  • 有效选项为 use_all_dns_ipsresolve_canonical_bootstrap_servers_onlydefault
  • 默认值为 "default"

控制 DNS 查找的执行方式。如果设置为 use_all_dns_ips,则 Logstash 会尝试主机名返回的所有 IP 地址,然后再使连接失败。如果设置为 resolve_canonical_bootstrap_servers_only,则会解析并扩展每个条目为规范名称列表。

从 Kafka 3 开始,client.dns.lookup 值的 default 值已被删除。如果未显式配置,则默认为 use_all_dns_ips

client_id

编辑
  • 值类型为字符串
  • 默认值为 "logstash"

发出请求时传递给服务器的 ID 字符串。这样做的目的是能够通过允许在请求中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是 ip/端口

compression_type

编辑
  • 值可以是以下任意值:nonegzipsnappylz4zstd
  • 默认值为 "none"

生产者生成的所有数据的压缩类型。默认值为 none(表示不压缩)。有效值为 none、gzip、snappy、lz4 或 zstd。

connections_max_idle_ms

编辑
  • 值类型为数字
  • 默认值为 540000 毫秒(9 分钟)。

在此配置指定的毫秒数之后关闭空闲连接。

jaas_path

编辑
  • 值类型为路径
  • 此设置没有默认值。

Java 身份验证和授权服务 (JAAS) API 为 Kafka 提供用户身份验证和授权服务。此设置提供 JAAS 文件的路径。Kafka 客户端的示例 JAAS 文件

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=true
  renewTicket=true
  serviceName="kafka";
  };

请注意,在配置文件中指定 jaas_pathkerberos_config 会将它们添加到全局 JVM 系统属性中。这意味着如果您有多个 Kafka 输入,则所有这些输入都将共享相同的 jaas_pathkerberos_config。如果这不是您想要的,您必须在不同的 JVM 实例上运行 Logstash 的单独实例。

kerberos_config

编辑
  • 值类型为路径
  • 此设置没有默认值。

Kerberos 配置文件的可选路径。这是 https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html 中详述的 krb5.conf 样式

key_serializer

编辑
  • 值类型为字符串
  • 默认值为 "org.apache.kafka.common.serialization.StringSerializer"

消息键的序列化程序类

linger_ms

编辑
  • 值类型为数字
  • 默认值为 0

生产者将请求传输之间到达的任何记录组合成一个批处理请求。通常,这种情况仅在负载下发生,即记录到达的速度快于发送速度。但是在某些情况下,即使在适度负载下,客户端也可能希望减少请求的数量。此设置通过添加少量的人为延迟来完成此目的,即,生产者不是立即发送记录,而是会等待最多给定的延迟,以允许发送其他记录,以便可以将发送批量处理在一起。

max_request_size

编辑
  • 值类型为数字
  • 默认值为 1048576 (1MB)。

请求的最大大小

message_headers

编辑
  • 值类型为哈希

    • 键是标头名称,必须是字符串
    • 值是头部值,并且必须是字符串
    • 值支持从事件字段值进行插值
  • 此设置没有默认值。

键值对的映射,每个键值对分别对应一个头部名称及其值。示例

    message_headers => { "event_timestamp" => "%{@timestamp}" }

message_key

编辑
  • 值类型为字符串
  • 此设置没有默认值。

消息的键。

metadata_fetch_timeout_ms

编辑
  • 值类型为数字
  • 默认值是 60000 毫秒 (60 秒)。

用于获取主题元数据的初始元数据请求的超时设置。

metadata_max_age_ms

编辑
  • 值类型为数字
  • 默认值是 300000 毫秒 (5 分钟)。

强制刷新元数据之前允许的最大时间(以毫秒为单位)。

partitioner

编辑
  • 值类型为字符串
  • 此设置没有默认值。

默认行为是对事件的 message_key 进行哈希以获取分区。当不存在消息键时,插件会以轮询方式选择分区。

选择分区策略的可用选项如下:

  • default 使用上述默认分区器
  • round_robin 将写入平均分配到所有分区,而与 message_key 无关
  • uniform_sticky 在批处理期间坚持使用一个分区,然后随机选择一个新的分区

receive_buffer_bytes

编辑
  • 值类型为数字
  • 默认值是 32768 (32KB)。

读取数据时要使用的 TCP 接收缓冲区的大小

reconnect_backoff_ms

编辑
  • 值类型为数字
  • 默认值是 50

当连接失败时,尝试重新连接到给定主机之前等待的时间。

request_timeout_ms

编辑
  • 值类型为数字
  • 默认值是 40000 毫秒 (40 秒)。

此配置控制客户端等待请求响应的最大时间。如果在超时时间到期之前未收到响应,则客户端将在必要时重新发送请求;如果重试次数耗尽,则请求将失败。

retries

编辑
  • 值类型为数字
  • 此设置没有默认值。

默认的重试行为是重试直到成功。为防止数据丢失,不建议更改此设置。

如果您选择设置 retries,则大于零的值将导致客户端仅重试固定次数。如果传输故障存在的时间长于您的重试次数(网络中断、Kafka 宕机等),则会导致数据丢失。

小于零的值是配置错误。

从 10.5.0 版本开始,此插件将仅重试 RetriableExceptionInterruptException 的子类的异常。如果生成消息抛出任何其他异常,则会记录错误并且消息将被丢弃,而不会重试。这可以防止 Logstash 管道无限期挂起。

在 10.5.0 之前的版本中,任何异常都会无限期重试,除非配置了 retries 选项。

retry_backoff_ms

编辑
  • 值类型为数字
  • 默认值是 100 毫秒。

尝试重试向给定主题分区发送失败的生成请求之前等待的时间。

sasl_client_callback_handler_class

编辑
  • 值类型为字符串
  • 此设置没有默认值。

指定的 SASL 机制应使用的 SASL 客户端回调处理程序类。

sasl_jaas_config

编辑
  • 值类型为字符串
  • 此设置没有默认值。

此插件实例本地的 JAAS 配置设置,而不是使用 jaas_path 配置的配置文件设置,这些设置在 JVM 中共享。这允许每个插件实例拥有自己的配置。

如果同时设置了 sasl_jaas_configjaas_path 配置,则此处的设置优先。

示例(Azure Event Hub 的设置)

    output {
      kafka {
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='auser'  password='apassword';"
      }
    }

sasl_kerberos_service_name

编辑
  • 值类型为字符串
  • 此设置没有默认值。

Kafka broker 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。

sasl_mechanism

编辑
  • 值类型为字符串
  • 默认值是 "GSSAPI"

用于客户端连接的 SASL 机制。这可以是任何有安全提供程序的机制。GSSAPI 是默认机制。

security_protocol

编辑
  • 值可以是以下任意值:PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL
  • 默认值是 "PLAINTEXT"

要使用的安全协议,可以是 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL 中的任何一种

send_buffer_bytes

编辑
  • 值类型为数字
  • 默认值是 131072 (128KB)。

发送数据时要使用的 TCP 发送缓冲区的大小。

ssl_endpoint_identification_algorithm

编辑

端点识别算法,默认为 "https"。设置为空字符串 "" 以禁用

ssl_key_password

编辑
  • 值类型是 密码
  • 此设置没有默认值。

密钥库文件中私钥的密码。

ssl_keystore_location

编辑
  • 值类型为路径
  • 此设置没有默认值。

如果需要客户端身份验证,则此设置存储密钥库路径。

ssl_keystore_password

编辑
  • 值类型是 密码
  • 此设置没有默认值。

如果需要客户端身份验证,则此设置存储密钥库密码

ssl_keystore_type

编辑
  • 值类型为字符串
  • 此设置没有默认值。

密钥库类型。

ssl_truststore_location

编辑
  • 值类型为路径
  • 此设置没有默认值。

用于验证 Kafka broker 证书的 JKS 信任库路径。

ssl_truststore_password

编辑
  • 值类型是 密码
  • 此设置没有默认值。

信任库密码

ssl_truststore_type

编辑
  • 值类型为字符串
  • 此设置没有默认值。

信任库类型。

topic_id

编辑
  • 这是必需的设置。
  • 值类型为字符串
  • 此设置没有默认值。

要将消息生成到的主题

value_serializer

编辑
  • 值类型为字符串
  • 默认值为 "org.apache.kafka.common.serialization.StringSerializer"

消息值的序列化程序类

通用选项

编辑

所有输出插件都支持以下配置选项

设置 输入类型 必需

编解码器

编解码器

启用指标

布尔值

ID

字符串

codec

编辑

用于输出数据的编解码器。输出编解码器是一种方便的方法,可以在数据离开输出之前对其进行编码,而无需在 Logstash 管道中使用单独的过滤器。

enable_metric

编辑

禁用或启用此特定插件实例的指标日志记录。默认情况下,我们会记录所有可以记录的指标,但是您可以禁用特定插件的指标收集。

  • 值类型是 字符串
  • 此设置没有默认值。

向插件配置添加唯一的 ID。如果未指定 ID,Logstash 将生成一个 ID。强烈建议在您的配置中设置此 ID。当您有两个或多个相同类型的插件时,这尤其有用。例如,如果您有 2 个 kafka 输出。在这种情况下添加命名 ID 将在使用监视 API 时帮助监视 Logstash。

output {
  kafka {
    id => "my_plugin_id"
  }
}

id 字段中的变量替换仅支持环境变量,不支持使用来自密钥存储的值。

Was this helpful?
Feedback