Kafka 输入插件
编辑Kafka 输入插件
编辑- kafka 集成插件 的一个组件
- 集成版本:v11.5.2
- 发布日期:2024-10-04
- 变更日志
对于其他版本,请参阅 版本化插件文档。
获取帮助
编辑有关插件的问题,请在 Discuss 论坛中发起主题讨论。对于错误或功能请求,请在 Github 中提交问题。有关 Elastic 支持的插件列表,请查阅 Elastic 支持矩阵。
描述
编辑此输入将从 Kafka 主题读取事件。
此插件使用 Kafka 客户端 3.4。有关代理兼容性,请参阅官方的 Kafka 兼容性参考。如果链接的兼容性 wiki 未更新,请联系 Kafka 支持/社区以确认兼容性。
如果您需要此插件中尚不可用的功能(包括客户端版本升级),请提交一个包含您需求详细信息的问题。
此输入支持通过以下方式连接到 Kafka
- SSL(需要插件版本 3.0.0 或更高版本)
- Kerberos SASL(需要插件版本 5.1.0 或更高版本)
默认情况下,安全功能已禁用,但可以根据需要启用。
此插件不支持在与 Kafka 代理通信时使用代理。
此插件支持在使用 schema_registry_proxy
选项与 Schema Registry 通信时使用代理。
Logstash Kafka 消费者处理组管理并使用 Kafka 主题使用默认的偏移量管理策略。
默认情况下,Logstash 实例形成一个订阅 Kafka 主题的单个逻辑组。每个 Logstash Kafka 消费者可以运行多个线程以提高读取吞吐量。或者,您可以运行多个具有相同 group_id
的 Logstash 实例以将负载分散到物理机器上。主题中的消息将分发到所有具有相同 group_id
的 Logstash 实例。
理想情况下,您应该拥有与分区数量相同的线程数以实现完美的平衡——线程数超过分区数意味着某些线程将处于空闲状态。
有关更多信息,请参阅 https://kafka.apache.org/34/documentation.html#theconsumer
Kafka 消费者配置:https://kafka.apache.org/34/documentation.html#consumerconfigs
元数据字段
编辑以下来自 Kafka 代理的元数据将添加到 [@metadata]
字段下
-
[@metadata][kafka][topic]
:消费消息的原始 Kafka 主题。 -
[@metadata][kafka][consumer_group]
:消费者组。 -
[@metadata][kafka][partition]
:此消息的分区信息。 -
[@metadata][kafka][offset]
:此消息的原始记录偏移量。 -
[@metadata][kafka][key]
:记录键(如果有)。 -
[@metadata][kafka][timestamp]
:记录中的时间戳。根据您的代理配置,这可能是记录创建时(默认)或代理接收到记录时。有关属性 log.message.timestamp.type 的更多信息,请参阅 https://kafka.apache.org/34/documentation.html#brokerconfigs
仅当 decorate_events
选项设置为 basic
或 extended
时,才会将元数据添加到事件中(默认为 none
)。
请注意,@metadata
字段不是输出时任何事件的一部分。如果您需要将这些信息插入到原始事件中,则必须使用 mutate
过滤器手动将所需字段复制到您的 event
中。
Kafka 输入配置选项
编辑此插件支持这些配置选项以及稍后描述的 常用选项。
其中一些选项映射到 Kafka 选项。默认值通常反映 Kafka 默认设置,并且如果 Kafka 的消费者默认值发生更改,可能会发生变化。有关更多详细信息,请参阅 https://kafka.apache.org/34/documentation。
设置 | 输入类型 | 必填 |
---|---|---|
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
有效的文件系统路径 |
否 |
|
有效的文件系统路径 |
否 |
|
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
有效的文件系统路径 |
否 |
|
否 |
||
字符串, |
否 |
|
有效的文件系统路径 |
否 |
|
否 |
||
字符串, |
否 |
|
否 |
||
否 |
||
字符串, |
否 |
|
否 |
||
否 |
||
否 |
||
否 |
||
有效的文件系统路径 |
否 |
|
否 |
||
字符串, |
否 |
|
有效的文件系统路径 |
否 |
|
否 |
||
字符串, |
否 |
|
否 |
||
否 |
||
否 |
另请参阅 常用选项,以获取所有输入插件支持的选项列表。
auto_offset_reset
编辑- 值类型为 字符串
- 此设置没有默认值。
在 Kafka 中没有初始偏移量或偏移量超出范围时该怎么办
- earliest:自动将偏移量重置为最早的偏移量。
- latest:自动将偏移量重置为最新的偏移量。
- none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
- 其他任何内容:向消费者抛出异常。
bootstrap_servers
编辑- 值类型为 字符串
- 默认值为
"localhost:9092"
用于建立与集群的初始连接的 Kafka 实例的 URL 列表。此列表应采用 host1:port1,host2:port2
的形式。这些 URL 仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此此列表不必包含完整的服务器集(尽管您可能希望有多个服务器,以防某个服务器出现故障)。
client_dns_lookup
编辑- 值类型为 字符串
- 默认值为
"default"
如何执行 DNS 查找。如果设置为 use_all_dns_ips
,则当查找返回主机名的多个 IP 地址时,将在连接失败之前尝试连接所有这些地址。如果值为 resolve_canonical_bootstrap_servers_only
,则每个条目都将解析并扩展为规范名称列表。
从 Kafka 3 开始,已删除 client.dns.lookup
值的 default
值。如果明确配置,它将回退到 use_all_dns_ips
。
client_id
编辑- 值类型为 字符串
- 默认值为
"logstash"
在发出请求时传递给服务器的 ID 字符串。这样做是为了能够跟踪请求的来源,而不仅仅是 ip/port,而是允许包含逻辑应用程序名称。
client_rack
编辑- 值类型为 字符串
- 此设置没有默认值。
Kafka 消费者的机架标识符。用于选择消费者的物理最近机架以从中读取。该设置对应于 Kafka 的 broker.rack
配置。
仅适用于 Kafka 2.4.0 及更高版本。请参阅 KIP-392。
decorate_events
编辑- 值类型为 字符串
-
接受的值为
-
none
:不添加元数据。 -
basic
:添加记录的属性。 -
extended
:添加记录的属性和标头(仅限于使用 UTF-8 编码的值的标头)。 -
false
:none
的已弃用别名。 -
true
:basic
的已弃用别名。
-
- 默认值为
none
选项,用于将 Kafka 元数据(如主题、消息大小和标头键值)添加到事件中。这将在包含以下属性的 logstash 事件中添加一个名为 kafka
的字段。
-
topic
:此消息关联的主题。 -
consumer_group
:用于读取此事件的消费者组。 -
partition
: 此消息关联的分区 -
offset
: 此消息关联的分区中的偏移量 -
key
: 包含消息键的 ByteBuffer
auto_create_topics
编辑 * 值类型为 布尔值 * 默认值为 true
控制在订阅不存在的主题时是否自动创建主题。仅当此配置设置为 true
且在代理上使用 auto.create.topics.enable
启用了自动主题创建时,才会自动创建主题;否则不允许自动创建主题。
enable_auto_commit
编辑- 值类型为 布尔值
- 默认值为
true
此提交的偏移量将在进程失败时用作消费开始的位置。
如果为 true,则定期向 Kafka 提交消费者已返回的消息的偏移量。但是,如果值为 false
,则每次消费者将从主题获取的数据写入内存或持久队列时都会提交偏移量。
exclude_internal_topics
编辑- 值类型为 字符串
- 此设置没有默认值。
是否应向消费者公开来自内部主题(如偏移量)的记录。如果设置为 true,则接收来自内部主题的记录的唯一方法是订阅它。
fetch_max_bytes
编辑- 值类型为 数字
- 默认值为
52428800
(50MB)
服务器应为获取请求返回的最大数据量。这不是绝对最大值,如果获取请求的第一个非空分区的第一个消息大于此值,则仍会返回该消息,以确保消费者可以继续进行。
fetch_max_wait_ms
编辑- 值类型为 数字
- 默认值为
500
毫秒。
如果数据不足以立即满足 fetch_min_bytes
,则服务器在回复获取请求之前将阻塞的最长时间。这应该小于或等于 poll_timeout_ms
中使用的超时时间
group_id
编辑- 值类型为 字符串
- 默认值为
"logstash"
此消费者所属组的标识符。消费者组是一个单个逻辑订阅者,恰好由多个处理器组成。主题中的消息将分发到具有相同 group_id
的所有 Logstash 实例。
当在单个管道中使用多个输入读取不同主题时,必须为每个输入设置不同的 group_id => ...
。还建议设置唯一的 client_id => ...
。
group_instance_id
编辑- 值类型为 字符串
- 此设置没有默认值。
此 Logstash Kafka 消费者的静态成员身份标识符。静态成员身份功能是在 KIP-345 中引入的,可在 Kafka 属性 group.instance.id
下使用。其目的是在消费者离线后必须转发大量数据的情况下避免重新平衡。此功能可以缓解服务状态繁重且从实例 A 到 B 的一个主题分区的重新平衡会导致大量数据传输的情况。通过使用此选项,经常离线/在线的客户端可以避免频繁且繁重的重新平衡。
对于属于同一 group_id
的所有客户端,group_instance_id
设置必须是唯一的。否则,另一个使用相同 group.instance.id
值连接的客户端会导致最旧的实例断开连接。您可以设置此值以使用主机名、IP 或任何唯一标识客户端应用程序的信息。
当配置了多个线程且 consumer_threads
大于 1 时,会将后缀附加到 group_instance_id
以避免冲突。
heartbeat_interval_ms
编辑- 值类型为 数字
- 默认值为
3000
毫秒(3 秒)。
到消费者协调器的预期心跳间隔时间。心跳用于确保消费者的会话保持活动状态,并在新的消费者加入或离开组时促进重新平衡。该值必须设置为小于 session.timeout.ms
,但通常不应高于该值的 1/3。它可以调整得更低以控制正常重新平衡的预期时间。
isolation_level
编辑- 值类型为 字符串
- 默认值为
"read_uncommitted"
控制如何读取以事务方式写入的消息。如果设置为 read_committed
,则轮询消息只会返回已提交的事务消息。如果设置为 read_uncommitted
(默认值),则轮询消息将返回所有消息,即使是已中止的事务消息。在任何一种模式下,非事务消息都将无条件返回。
jaas_path
编辑- 值类型为 路径
- 此设置没有默认值。
Java 身份验证和授权服务 (JAAS) API 为 Kafka 提供用户身份验证和授权服务。此设置提供 JAAS 文件的路径。Kafka 客户端的示例 JAAS 文件
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka"; };
请注意,在配置文件中指定 jaas_path
和 kerberos_config
会将这些属性添加到全局 JVM 系统属性中。这意味着如果您有多个 Kafka 输入,则所有这些输入都将共享相同的 jaas_path
和 kerberos_config
。如果不需要这样做,则必须在不同的 JVM 实例上运行 Logstash 的单独实例。
kerberos_config
编辑- 值类型为 路径
- 此设置没有默认值。
Kerberos 配置文件的可选路径。这与 https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html 中详细说明的 krb5.conf 样式相同
key_deserializer_class
编辑- 值类型为 字符串
- 默认值为
"org.apache.kafka.common.serialization.StringDeserializer"
用于反序列化记录键的 Java 类
max_partition_fetch_bytes
编辑- 值类型为 数字
- 默认值为
1048576
(1MB)。
服务器将返回的每个分区中的最大数据量。请求使用的最大总内存将为 #partitions * max.partition.fetch.bytes
。此大小必须至少与服务器允许的最大消息大小一样大,否则生产者可能会发送大于消费者可以获取的消息。如果发生这种情况,消费者可能会卡在尝试获取某个分区上的大消息。
max_poll_interval_ms
编辑- 值类型为 数字
- 默认值为
300000
毫秒(5 分钟)。
使用消费者组管理时,poll() 调用之间最大延迟。这为消费者在获取更多记录之前可以处于空闲状态的时间设置了上限。如果在该超时时间到期之前未调用 poll(),则消费者将被视为失败,并且组将重新平衡以将分区重新分配给另一个成员。
metadata_max_age_ms
编辑- 值类型为 数字
- 默认值为
300000
毫秒(5 分钟)。
以毫秒为单位的时间段,在此时间段后,即使我们没有看到任何分区领导权的变化,我们也会强制刷新元数据,以主动发现任何新的代理或分区
partition_assignment_strategy
编辑- 值类型为 字符串
- 此设置没有默认值。
客户端用于在消费者实例之间分配分区所有权的分区分配策略的名称,支持的选项为
-
range
-
round_robin
-
sticky
-
cooperative_sticky
这些映射到 Kafka 对应的 ConsumerPartitionAssignor
实现。
poll_timeout_ms
编辑- 值类型为 数字
- 默认值为
100
毫秒。
Kafka 消费者等待从主题接收新消息的时间。
订阅一组主题后,Kafka 消费者在轮询时会自动加入组。插件循环轮询确保消费者活动。在底层,Kafka 客户端会定期向服务器发送心跳。超时指定在每次轮询中等待输入阻塞的时间。
reconnect_backoff_ms
编辑- 值类型为 数字
- 默认值为
50
毫秒。
尝试重新连接到给定主机之前等待的时间。这避免了在紧密循环中重复连接到主机。此回退适用于消费者发送到代理的所有请求。
request_timeout_ms
编辑- 值类型为 数字
- 默认值为
40000
毫秒(40 秒)。
配置控制客户端等待请求响应的最长时间。如果在超时时间到期之前未收到响应,则客户端将根据需要重新发送请求,或者如果重试次数用尽则使请求失败。
sasl_jaas_config
编辑- 值类型为 字符串
- 此设置没有默认值。
此插件实例的本地 JAAS 配置设置,而不是使用 jaas_path
配置的配置文件设置(在整个 JVM 中共享)。这允许每个插件实例具有自己的配置。
如果同时设置了 sasl_jaas_config
和 jaas_path
配置,则此处设置优先。
示例(Azure 事件中心的设置)
input { kafka { sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='auser' password='apassword';" } }
sasl_kerberos_service_name
编辑- 值类型为 字符串
- 此设置没有默认值。
Kafka 代理运行时的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。
schema_registry_ssl_keystore_password
编辑- 值类型为 password
- 此设置没有默认值。
如果需要 Schema Registry 身份验证,此设置存储密钥库密码。
schema_registry_ssl_truststore_type
编辑- 值类型为 字符串
- 此设置没有默认值。
Schema Registry 信任库文件的格式。它必须是 jks
或 PKCS12
。
schema_registry_url
编辑- 值类型为 uri
指向 Schema Registry 服务实例的 URI,用于管理 Avro 模式。请确保已将用于从指定主题反序列化数据的 Avro 模式上传到 Schema Registry 服务。模式必须遵循模式为 <主题名称>-value 的命名约定。
使用 Schema Registry 配置选项或 value_deserializer_class
配置选项,但不能同时使用两者。
schema_registry_validation
编辑- 值可以是以下之一:
auto
、skip
- 默认值为
"auto"
在大多数情况下,不需要更改默认设置 auto
。
使用 Schema Registry 时,默认情况下,插件会在插件注册期间(在事件处理之前)检查连接并验证 Schema Registry。在某些情况下,当尝试验证已认证的 Schema Registry 时,此过程可能会失败,导致插件崩溃。此设置允许插件跳过注册期间的验证,从而允许插件继续并处理事件。请注意,配置错误的 Schema Registry 仍会阻止插件处理事件。
security_protocol
编辑- 值可以是以下任何一个:
PLAINTEXT
、SSL
、SASL_PLAINTEXT
、SASL_SSL
- 默认值为
"PLAINTEXT"
要使用的安全协议,可以是 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL 中的任何一个。
session_timeout_ms
编辑- 值类型为 数字
- 默认值为
10000
毫秒(10 秒)。
如果未调用 poll_timeout_ms
,则在此超时后,消费者将被标记为已失效,并且将为由 group_id
标识的组触发重新平衡操作。
ssl_endpoint_identification_algorithm
编辑- 值类型为 字符串
- 默认值为
"https"
端点识别算法,默认为 "https"
。设置为空字符串 ""
以禁用端点验证。
topics_pattern
编辑- 值类型为 字符串
- 此设置没有默认值。
要订阅的主题正则表达式模式。
通过正则表达式进行过滤是通过从代理检索完整主题名称列表并在本地应用模式来完成的。当与具有大量主题的代理一起使用时,此操作可能非常慢,尤其是在存在大量消费者时。
当代理配置了一些具有 ACL 规则的主题并且它们缺少 DESCRIBE 权限时,订阅会发生,但在代理端会记录某些主题的订阅被拒绝给配置的用户。
value_deserializer_class
编辑- 值类型为 字符串
- 默认值为
"org.apache.kafka.common.serialization.StringDeserializer"
用于反序列化记录值的 Java 类。仅当您未使用 Schema Registry 时,才能使用自定义值反序列化器。使用 value_deserializer_class
配置选项或 schema_registry_url
配置选项,但不能同时使用两者。