Kafka 输出插件
编辑Kafka 输出插件
编辑- kafka 集成插件 的一个组件
- 集成版本:v11.5.2
- 发布日期:2024-10-04
- 更新日志
对于其他版本,请参阅版本化插件文档。
获取帮助
编辑有关插件的问题,请在Discuss论坛中开启一个主题。对于错误或功能请求,请在Github中开启一个 issue。有关 Elastic 支持的插件列表,请查阅Elastic 支持矩阵。
描述
编辑将事件写入 Kafka 主题。
此插件使用 Kafka 客户端 3.4。有关 broker 兼容性,请参阅官方的Kafka 兼容性参考。如果链接的兼容性 wiki 不是最新的,请联系 Kafka 支持/社区以确认兼容性。
如果您需要此插件中尚未提供的功能(包括客户端版本升级),请提交一个包含您所需内容的详细信息的 issue。
此输出支持通过以下方式连接到 Kafka:
- SSL(需要插件版本 3.0.0 或更高版本)
- Kerberos SASL(需要插件版本 5.1.0 或更高版本)
默认情况下,安全性被禁用,但可以根据需要启用。
唯一必需的配置是 topic_id。
默认编解码器是 plain。Logstash 不仅会使用消息字段对您的事件进行编码,还会使用时间戳和主机名进行编码。
如果您希望将事件的完整内容以 json 格式发送,您应该像这样在输出配置中设置编解码器
output { kafka { codec => json topic_id => "mytopic" } }
有关更多信息,请参阅 https://kafka.apache.org/34/documentation.html#theproducer
Kafka 生产者配置:https://kafka.apache.org/34/documentation.html#producerconfigs
此插件不支持在与 Kafka broker 通信时使用代理。
Kafka 输出配置选项
编辑此插件支持以下配置选项以及稍后描述的通用选项。
其中一些选项映射到 Kafka 选项。默认值通常反映 Kafka 的默认设置,如果 Kafka 的生产者默认值发生更改,则可能会更改。有关更多详细信息,请参阅https://kafka.apache.org/34/documentation。
设置 | 输入类型 | 必需 |
---|---|---|
字符串,可以是 |
否 |
|
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
字符串,可以是 |
否 |
|
否 |
||
有效的filesystem路径 |
否 |
|
有效的filesystem路径 |
否 |
|
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
字符串,可以是 |
否 |
|
否 |
||
否 |
||
否 |
||
有效的filesystem路径 |
否 |
|
否 |
||
否 |
||
有效的filesystem路径 |
否 |
|
否 |
||
否 |
||
是 |
||
否 |
另请参阅通用选项,其中列出了所有输出插件支持的选项。
acks
编辑- 值可以是以下任意值:
0
、1
、all
- 默认值为
"1"
生产者需要领导者在认为请求完成之前收到的确认数。
acks=0
。生产者不会等待服务器的任何确认。
acks=1
。领导者会将记录写入其本地日志,但在未收到所有跟随者的完全确认之前会进行响应。
acks=all
。领导者将等待所有同步副本的完全确认,然后再确认该记录。
batch_size
编辑- 值类型为数字
- 默认值为
16384
。
当多个记录被发送到同一分区时,生产者将尝试将记录批量处理为更少的请求。这有助于客户端和服务器上的性能。此配置控制默认的批处理大小(以字节为单位)。
bootstrap_servers
编辑- 值类型为字符串
- 默认值为
"localhost:9092"
这是用于引导的,生产者将仅使用它来获取元数据(主题、分区和副本)。发送实际数据的套接字连接将基于元数据中返回的 broker 信息建立。格式为 host1:port1,host2:port2
,并且列表可以是 broker 的子集或指向 broker 子集的 VIP。
client_dns_lookup
编辑- 值类型为字符串
- 有效选项为
use_all_dns_ips
、resolve_canonical_bootstrap_servers_only
、default
- 默认值为
"default"
控制 DNS 查找的执行方式。如果设置为 use_all_dns_ips
,则 Logstash 会尝试主机名返回的所有 IP 地址,然后再使连接失败。如果设置为 resolve_canonical_bootstrap_servers_only
,则会解析并扩展每个条目为规范名称列表。
从 Kafka 3 开始,client.dns.lookup
值的 default
值已被删除。如果未显式配置,则默认为 use_all_dns_ips
。
client_id
编辑- 值类型为字符串
- 默认值为
"logstash"
发出请求时传递给服务器的 ID 字符串。这样做的目的是能够通过允许在请求中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是 ip/端口
compression_type
编辑- 值可以是以下任意值:
none
、gzip
、snappy
、lz4
、zstd
- 默认值为
"none"
生产者生成的所有数据的压缩类型。默认值为 none(表示不压缩)。有效值为 none、gzip、snappy、lz4 或 zstd。
jaas_path
编辑- 值类型为路径
- 此设置没有默认值。
Java 身份验证和授权服务 (JAAS) API 为 Kafka 提供用户身份验证和授权服务。此设置提供 JAAS 文件的路径。Kafka 客户端的示例 JAAS 文件
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka"; };
请注意,在配置文件中指定 jaas_path
和 kerberos_config
会将它们添加到全局 JVM 系统属性中。这意味着如果您有多个 Kafka 输入,则所有这些输入都将共享相同的 jaas_path
和 kerberos_config
。如果这不是您想要的,您必须在不同的 JVM 实例上运行 Logstash 的单独实例。
kerberos_config
编辑- 值类型为路径
- 此设置没有默认值。
Kerberos 配置文件的可选路径。这是 https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html 中详述的 krb5.conf 样式
linger_ms
编辑- 值类型为数字
- 默认值为
0
生产者将请求传输之间到达的任何记录组合成一个批处理请求。通常,这种情况仅在负载下发生,即记录到达的速度快于发送速度。但是在某些情况下,即使在适度负载下,客户端也可能希望减少请求的数量。此设置通过添加少量的人为延迟来完成此目的,即,生产者不是立即发送记录,而是会等待最多给定的延迟,以允许发送其他记录,以便可以将发送批量处理在一起。
message_headers
编辑键值对的映射,每个键值对分别对应一个头部名称及其值。示例
message_headers => { "event_timestamp" => "%{@timestamp}" }
partitioner
编辑- 值类型为字符串
- 此设置没有默认值。
默认行为是对事件的 message_key
进行哈希以获取分区。当不存在消息键时,插件会以轮询方式选择分区。
选择分区策略的可用选项如下:
-
default
使用上述默认分区器 -
round_robin
将写入平均分配到所有分区,而与message_key
无关 -
uniform_sticky
在批处理期间坚持使用一个分区,然后随机选择一个新的分区
request_timeout_ms
编辑- 值类型为数字
- 默认值是
40000
毫秒 (40 秒)。
此配置控制客户端等待请求响应的最大时间。如果在超时时间到期之前未收到响应,则客户端将在必要时重新发送请求;如果重试次数耗尽,则请求将失败。
retries
编辑- 值类型为数字
- 此设置没有默认值。
默认的重试行为是重试直到成功。为防止数据丢失,不建议更改此设置。
如果您选择设置 retries
,则大于零的值将导致客户端仅重试固定次数。如果传输故障存在的时间长于您的重试次数(网络中断、Kafka 宕机等),则会导致数据丢失。
小于零的值是配置错误。
从 10.5.0 版本开始,此插件将仅重试 RetriableException 和 InterruptException 的子类的异常。如果生成消息抛出任何其他异常,则会记录错误并且消息将被丢弃,而不会重试。这可以防止 Logstash 管道无限期挂起。
在 10.5.0 之前的版本中,任何异常都会无限期重试,除非配置了 retries
选项。
sasl_jaas_config
编辑- 值类型为字符串
- 此设置没有默认值。
此插件实例本地的 JAAS 配置设置,而不是使用 jaas_path
配置的配置文件设置,这些设置在 JVM 中共享。这允许每个插件实例拥有自己的配置。
如果同时设置了 sasl_jaas_config
和 jaas_path
配置,则此处的设置优先。
示例(Azure Event Hub 的设置)
output { kafka { sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='auser' password='apassword';" } }
sasl_kerberos_service_name
编辑- 值类型为字符串
- 此设置没有默认值。
Kafka broker 运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。
security_protocol
编辑- 值可以是以下任意值:
PLAINTEXT
、SSL
、SASL_PLAINTEXT
、SASL_SSL
- 默认值是
"PLAINTEXT"
要使用的安全协议,可以是 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL 中的任何一种