Kafka 输入插件编辑

有关其他版本,请参阅 版本化插件文档

获取帮助编辑

如果您对插件有任何疑问,请在 Discuss 论坛中发布主题。对于错误或功能请求,请在 Github 中创建问题。有关 Elastic 支持的插件列表,请参阅 Elastic 支持矩阵

描述编辑

此输入将从 Kafka 主题读取事件。

此插件使用 Kafka 客户端 3.4。有关代理兼容性,请参阅官方 Kafka 兼容性参考。如果链接的兼容性维基百科未更新,请联系 Kafka 支持/社区以确认兼容性。

如果您需要此插件中尚未提供的功能(包括客户端版本升级),请提交一个包含您需求详细信息的问题。

此输入支持通过以下方式连接到 Kafka

  • SSL(需要插件版本 3.0.0 或更高版本)
  • Kerberos SASL(需要插件版本 5.1.0 或更高版本)

默认情况下,安全功能已禁用,但可以根据需要启用。

此插件不支持在与 Kafka 代理通信时使用代理。

此插件支持在使用 schema_registry_proxy 选项与 Schema Registry 通信时使用代理。

Logstash Kafka 消费者处理组管理并使用使用 Kafka 主题的默认偏移量管理策略。

默认情况下,Logstash 实例形成一个单一的逻辑组来订阅 Kafka 主题。每个 Logstash Kafka 消费者可以运行多个线程以提高读取吞吐量。或者,您可以运行多个具有相同 group_id 的 Logstash 实例,以将负载分散到物理机器上。主题中的消息将分发到具有相同 group_id 的所有 Logstash 实例。

理想情况下,您应该拥有与分区数量相同的线程,以实现完美的平衡——线程数量超过分区数量意味着某些线程将处于空闲状态。

有关更多信息,请参阅 https://kafka.apache.org/34/documentation.html#theconsumer

Kafka 消费者配置:https://kafka.apache.org/34/documentation.html#consumerconfigs

元数据字段编辑

来自 Kafka 代理的以下元数据将添加到 [@metadata] 字段下

  • [@metadata][kafka][topic]:消费消息的原始 Kafka 主题。
  • [@metadata][kafka][consumer_group]:消费者组
  • [@metadata][kafka][partition]:此消息的分区信息。
  • [@metadata][kafka][offset]:此消息的原始记录偏移量。
  • [@metadata][kafka][key]:记录键(如果有)。
  • [@metadata][kafka][timestamp]:记录中的时间戳。根据您的代理配置,这可以是记录创建的时间(默认值)或记录被代理接收的时间。有关属性 log.message.timestamp.type 的更多信息,请参阅 https://kafka.apache.org/34/documentation.html#brokerconfigs

仅当 decorate_events 选项设置为 basicextended 时(默认值为 none),元数据才会添加到事件中。

请注意,@metadata 字段不是输出时任何事件的一部分。如果您需要将这些信息插入到原始事件中,则必须使用 mutate 过滤器将所需字段手动复制到您的 event 中。

Kafka 输入配置选项编辑

此插件支持这些配置选项以及后面描述的 通用选项

其中一些选项映射到 Kafka 选项。默认值通常反映 Kafka 默认设置,并且可能会在 Kafka 的消费者默认值更改时发生变化。有关更多详细信息,请参阅 https://kafka.apache.org/34/documentation

设置 输入类型 必需

auto_commit_interval_ms

数字

auto_offset_reset

字符串

bootstrap_servers

字符串

check_crcs

布尔值

client_dns_lookup

字符串

client_id

字符串

client_rack

字符串

connections_max_idle_ms

数字

consumer_threads

数字

decorate_events

字符串

enable_auto_commit

布尔值

exclude_internal_topics

字符串

fetch_max_bytes

数字

fetch_max_wait_ms

数字

fetch_min_bytes

数字

group_id

字符串

group_instance_id

字符串

heartbeat_interval_ms

数字

isolation_level

字符串

jaas_path

有效的系统文件路径

kerberos_config

有效的系统文件路径

key_deserializer_class

字符串

max_partition_fetch_bytes

数字

max_poll_interval_ms

数字

max_poll_records

数字

metadata_max_age_ms

数字

partition_assignment_strategy

字符串

poll_timeout_ms

数字

receive_buffer_bytes

数字

reconnect_backoff_ms

数字

request_timeout_ms

数字

retry_backoff_ms

数字

sasl_jaas_config

字符串

sasl_kerberos_service_name

字符串

sasl_mechanism

字符串

schema_registry_key

字符串

schema_registry_proxy

uri

schema_registry_secret

字符串

schema_registry_ssl_keystore_location

有效的系统文件路径

schema_registry_ssl_keystore_password

密码

schema_registry_ssl_keystore_type

字符串,其中之一为 ["jks", "PKCS12"]

schema_registry_ssl_truststore_location

有效的系统文件路径

schema_registry_ssl_truststore_password

密码

schema_registry_ssl_truststore_type

字符串,其中之一为 ["jks", "PKCS12"]

schema_registry_url

uri

schema_registry_validation

字符串

security_protocol

字符串,其中之一为 ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]

send_buffer_bytes

数字

session_timeout_ms

数字

ssl_endpoint_identification_algorithm

字符串

ssl_key_password

密码

ssl_keystore_location

有效的系统文件路径

ssl_keystore_password

密码

ssl_keystore_type

字符串,其中之一为 ["jks", "PKCS12"]

ssl_truststore_location

有效的系统文件路径

ssl_truststore_password

密码

ssl_truststore_type

字符串,其中之一为 ["jks", "PKCS12"]

topics

数组

topics_pattern

字符串

value_deserializer_class

字符串

另请参阅 通用选项,了解所有输入插件支持的选项列表。

 

auto_commit_interval_ms编辑

  • 值类型为 数字
  • 默认值为 5000

消费者偏移量提交到 Kafka 的频率(以毫秒为单位)。

auto_offset_reset编辑

  • 值类型为 字符串
  • 此设置没有默认值。

在 Kafka 中没有初始偏移量或偏移量超出范围时该怎么办

  • earliest:自动将偏移量重置为最早的偏移量
  • latest:自动将偏移量重置为最新的偏移量
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
  • 其他任何内容:向消费者抛出异常。

bootstrap_servers编辑

  • 值类型为 字符串
  • 默认值为 "localhost:9092"

要用于建立与集群的初始连接的 Kafka 实例的 URL 列表。此列表应采用 host1:port1,host2:port2 的形式。这些 URL 仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此此列表不必包含完整的服务器集(您可能需要多个服务器,以防服务器出现故障)。

check_crcs编辑

自动检查所消费记录的 CRC32。这确保消息没有发生在线或磁盘上的损坏。此检查会增加一些开销,因此可以在寻求极端性能的情况下禁用它。

client_dns_lookup编辑

  • 值类型为 字符串
  • 默认值为 "default"

如何执行 DNS 查找。如果设置为 use_all_dns_ips,则当查找返回主机名的多个 IP 地址时,将在连接失败之前尝试连接到所有这些地址。如果值为 resolve_canonical_bootstrap_servers_only,则将解析每个条目并将其扩展为规范名称列表。

从 Kafka 3 开始,已删除 client.dns.lookup 值的 default 值。如果显式配置,它将回退到 use_all_dns_ips

client_id编辑

  • 值类型为 字符串
  • 默认值为 "logstash"

在进行请求时传递给服务器的 ID 字符串。这样做的目的是能够通过允许包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是 ip/port。

client_rack编辑

  • 值类型为 字符串
  • 此设置没有默认值。

Kafka 消费者的机架标识符。用于选择与消费者物理上最近的机架进行读取。此设置对应于 Kafka 的 broker.rack 配置。

仅适用于 Kafka 2.4.0 及更高版本。请参阅 KIP-392

connections_max_idle_msedit

  • 值类型为 数字
  • 默认值为 540000 毫秒(9 分钟)。

在由此配置指定的毫秒数后关闭空闲连接。

consumer_threadsedit

  • 值类型为 数字
  • 默认值为 1

理想情况下,您应该拥有与分区数量相同的线程,以实现完美的平衡——线程数量超过分区数量意味着某些线程将处于空闲状态。

decorate_eventsedit

  • 值类型为 字符串
  • 接受的值为

    • none: 不添加元数据
    • basic: 添加记录的属性
    • extended: 添加记录的属性和头信息
    • false: none 的弃用别名
    • true: basic 的弃用别名
  • 默认值为 none

选项,用于将 Kafka 元数据(如主题、消息大小和头信息键值)添加到事件。这将在包含以下属性的 logstash 事件中添加一个名为 kafka 的字段

  • topic: 此消息关联的主题
  • consumer_group: 用于读取此事件的消费者组
  • partition: 此消息关联的分区
  • offset: 此消息关联的分区中的偏移量
  • key: 包含消息键的 ByteBuffer

enable_auto_commitedit

此已提交的偏移量将在进程失败时用作消费开始位置。

如果为 true,则定期向 Kafka 提交消费者已返回的消息的偏移量。如果值为 false,则每次消费者将从主题获取的数据写入内存或持久队列时都会提交偏移量。

exclude_internal_topicsedit

  • 值类型为 字符串
  • 此设置没有默认值。

是否将来自内部主题(如偏移量)的记录公开给消费者。如果设置为 true,则接收来自内部主题的记录的唯一方法是订阅该主题。

fetch_max_bytesedit

  • 值类型为 数字
  • 默认值为 52428800(50 MB)。

服务器为获取请求返回的最大数据量。这不是绝对最大值,如果第一个非空分区的第一个消息大于此值,则仍会返回该消息,以确保消费者可以取得进展。

fetch_max_wait_msedit

  • 值类型为 数字
  • 默认值为 500 毫秒。

如果无法立即满足 fetch_min_bytes 的数据,服务器在回答获取请求之前阻塞的最大时间。这应该小于或等于 poll_timeout_ms 中使用的超时时间。

fetch_min_bytesedit

  • 值类型为 数字
  • 此设置没有默认值。

服务器为获取请求返回的最小数据量。如果可用数据不足,请求将等待这些数据累积到足够数量后才回答请求。

group_idedit

  • 值类型为 字符串
  • 默认值为 "logstash"

此消费者所属组的标识符。消费者组是一个单个逻辑订阅者,它恰好由多个处理器组成。主题中的消息将分发到具有相同 group_id 的所有 Logstash 实例。

在单个管道中使用多个输入从不同主题读取数据的情况下,必须为每个输入设置不同的 group_id => ...。还建议设置唯一的 client_id => ...

group_instance_idedit

  • 值类型为 字符串
  • 此设置没有默认值。

此 Logstash Kafka 消费者的静态成员标识符。静态成员资格功能在 KIP-345 中引入,在 Kafka 属性 group.instance.id 下可用。其目的是在消费者离线后必须转发大量数据的情况下避免重新平衡。此功能缓解了服务状态繁重且将一个主题分区从实例 A 重新平衡到 B 会导致大量数据传输的情况。通过使用此选项,频繁离线/在线的客户端可以避免频繁且繁重的重新平衡。

group_instance_id 设置必须在属于同一 group_id 的所有客户端中唯一。否则,另一个使用相同 group.instance.id 值连接的客户端会导致最旧的实例断开连接。您可以将此值设置为使用主机名、IP 或任何唯一标识客户端应用程序的信息。

在配置多个线程且 consumer_threads 大于 1 的情况下,会在 group_instance_id 后面追加一个后缀以避免冲突。

heartbeat_interval_msedit

  • 值类型为 数字
  • 默认值为 3000 毫秒(3 秒)。

消费者协调器之间心跳的预期时间。心跳用于确保消费者的会话保持活动状态,并在新的消费者加入或离开组时促进重新平衡。该值必须设置为低于 session.timeout.ms,但通常不应高于该值的 1/3。它可以调整得更低以控制正常重新平衡的预期时间。

isolation_leveledit

  • 值类型为 字符串
  • 默认值为 "read_uncommitted"

控制如何读取以事务方式写入的消息。如果设置为 read_committed,则轮询消息将仅返回已提交的事务消息。如果设置为 read_uncommitted(默认值),则轮询消息将返回所有消息,即使是已中止的事务消息。非事务消息将在两种模式下无条件返回。

jaas_pathedit

  • 值类型为 path
  • 此设置没有默认值。

Java 身份验证和授权服务 (JAAS) API 为 Kafka 提供用户身份验证和授权服务。此设置提供 JAAS 文件的路径。Kafka 客户端的示例 JAAS 文件

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=true
  renewTicket=true
  serviceName="kafka";
  };

请注意,在配置文件中指定 jaas_pathkerberos_config 将将其添加到全局 JVM 系统属性中。这意味着如果您有多个 Kafka 输入,它们将共享相同的 jaas_pathkerberos_config。如果这不是您想要的,您将不得不分别在不同的 JVM 实例上运行 Logstash 实例。

kerberos_configedit

  • 值类型为 path
  • 此设置没有默认值。

Kerberos 配置文件的可选路径。这是 https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html 中详细说明的 krb5.conf 样式。

key_deserializer_classedit

  • 值类型为 字符串
  • 默认值为 "org.apache.kafka.common.serialization.StringDeserializer"

用于反序列化记录键的 Java 类

max_partition_fetch_bytesedit

  • 值类型为 数字
  • 默认值为 1048576(1 MB)。

服务器将返回的每个分区中的最大数据量。请求使用的最大总内存将为 #partitions * max.partition.fetch.bytes。此大小必须至少与服务器允许的最大消息大小一样大,否则生产者可能会发送大于消费者可以获取的消息。如果发生这种情况,消费者可能会卡在尝试获取某个分区上的大型消息。

max_poll_interval_msedit

  • 值类型为 数字
  • 默认值为 300000 毫秒(5 分钟)。

使用消费者组管理时,调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以处于空闲状态的时间设置了上限。如果在超时到期之前未调用 poll(),则消费者将被视为失败,并且组将重新平衡以将分区重新分配给另一个成员。

max_poll_recordsedit

  • 值类型为 数字
  • 默认值为 500

一次调用 poll() 返回的最大记录数。

metadata_max_age_msedit

  • 值类型为 数字
  • 默认值为 300000 毫秒(5 分钟)。

即使我们没有看到任何分区领导权变更,我们也会强制刷新元数据的毫秒时间段,以主动发现任何新的代理或分区。

partition_assignment_strategyedit

  • 值类型为 字符串
  • 此设置没有默认值。

客户端用于在消费者实例之间分配分区所有权的分区分配策略的名称,支持的选项为

  • range
  • round_robin
  • sticky
  • cooperative_sticky

这些映射到 Kafka 对应的 ConsumerPartitionAssignor 实现。

poll_timeout_msedit

  • 值类型为 数字
  • 默认值为 100 毫秒。

Kafka 消费者等待从主题接收新消息的时间。

订阅一组主题后,Kafka 消费者在轮询时会自动加入组。插件循环轮询确保消费者存活。在底层,Kafka 客户端会定期向服务器发送心跳。超时时间指定了在每次轮询时阻塞等待输入的时间。

receive_buffer_bytesedit

  • 值类型为 数字
  • 默认值为 32768 (32KB)。

读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 大小。

reconnect_backoff_msedit

  • 值类型为 数字
  • 默认值为 50 毫秒。

尝试重新连接到给定主机之前等待的时间。这避免了在紧密循环中反复连接到主机。此回退适用于消费者发送到代理的所有请求。

request_timeout_msedit

  • 值类型为 数字
  • 默认值为 40000 毫秒 (40 秒)。

该配置控制客户端等待请求响应的最长时间。如果在超时时间到期之前未收到响应,客户端将根据需要重新发送请求,或者如果重试次数用尽则失败请求。

retry_backoff_msedit

  • 值类型为 数字
  • 默认值为 100 毫秒。

尝试重新尝试对给定主题分区失败的获取请求之前等待的时间。这避免了在紧密循环中反复获取和失败。

sasl_jaas_configedit

  • 值类型为 字符串
  • 此设置没有默认值。

JAAS 配置设置特定于此插件实例,而不是使用 jaas_path 配置的配置文件设置,这些设置在整个 JVM 中共享。这允许每个插件实例都有自己的配置。

如果同时设置了 sasl_jaas_configjaas_path 配置,则此处的设置优先。

示例(Azure 事件中心的设置)

    input {
      kafka {
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='auser'  password='apassword';"
      }
    }

sasl_kerberos_service_nameedit

  • 值类型为 字符串
  • 此设置没有默认值。

Kafka 代理运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。

sasl_mechanismedit

  • 值类型为 字符串
  • 默认值为 "GSSAPI"

SASL 机制 用于客户端连接。这可以是任何安全提供程序可用的机制。GSSAPI 是默认机制。

schema_registry_keyedit

  • 值类型为 字符串
  • 此设置没有默认值。

设置用于基本授权以访问远程 Schema Registry 的用户名。

schema_registry_proxyedit

  • 值类型为 uri
  • 此设置没有默认值。

设置转发 HTTP 代理的地址。空字符串被视为未设置代理。

schema_registry_secretedit

  • 值类型为 字符串
  • 此设置没有默认值。

设置用于基本授权以访问远程 Schema Registry 的密码。

schema_registry_ssl_keystore_locationedit

  • 值类型为 path
  • 此设置没有默认值。

如果需要 Schema Registry 客户端身份验证,此设置将存储密钥库路径。

schema_registry_ssl_keystore_passwordedit

  • 值类型为 password
  • 此设置没有默认值。

如果需要 Schema Registry 身份验证,此设置将存储密钥库密码。

schema_registry_ssl_keystore_typeedit

  • 值类型为 字符串
  • 此设置没有默认值。

密钥库文件格式。它必须是 jksPKCS12

schema_registry_ssl_truststore_locationedit

  • 值类型为 path
  • 此设置没有默认值。

用于验证 Schema Registry 证书的信任库路径。

schema_registry_ssl_truststore_passwordedit

  • 值类型为 password
  • 此设置没有默认值。

Schema Registry 信任库密码。

schema_registry_ssl_truststore_typeedit

  • 值类型为 字符串
  • 此设置没有默认值。

Schema Registry 信任库文件的格式。它必须是 jksPKCS12

schema_registry_urledit

  • 值类型为 uri

指向 Schema Registry 服务实例的 URI,用于管理 Avro 架构。确保已将用于从指定主题反序列化数据的 Avro 架构上传到 Schema Registry 服务。架构必须遵循模式 <主题名称>-value 的命名约定。

使用 Schema Registry 配置选项或 value_deserializer_class 配置选项,但不要同时使用两者。

schema_registry_validationedit

  • 值可以是以下之一:autoskip
  • 默认值为 "auto"

在大多数情况下,不需要更改 auto 的默认设置。

使用 Schema Registry 时,默认情况下,插件会在插件注册期间检查连接并验证 Schema Registry,然后再处理事件。在某些情况下,此过程可能会在尝试验证经过身份验证的 Schema Registry 时失败,导致插件崩溃。此设置允许插件在注册期间跳过验证,从而允许插件继续并处理事件。请注意,配置错误的 Schema Registry 仍然会阻止插件处理事件。

security_protocoledit

  • 值可以是以下任何一个:PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL
  • 默认值为 "PLAINTEXT"

要使用的安全协议,可以是 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL 中的任何一个。

send_buffer_bytesedit

  • 值类型为 数字
  • 默认值为 131072 (128KB)。

发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 大小

session_timeout_msedit

  • 值类型为 数字
  • 默认值为 10000 毫秒 (10 秒)。

如果 poll_timeout_ms 未被调用,则此超时时间过后,消费者将被标记为已死,并且将为由 group_id 标识的组触发重新平衡操作。

ssl_endpoint_identification_algorithmedit

  • 值类型为 字符串
  • 默认值为 "https"

端点标识算法,默认为 "https"。设置为空字符串 "" 以禁用端点验证

ssl_key_passwordedit

  • 值类型为 password
  • 此设置没有默认值。

密钥库文件中私钥的密码。

ssl_keystore_locationedit

  • 值类型为 path
  • 此设置没有默认值。

如果需要客户端身份验证,此设置将存储密钥库路径。

ssl_keystore_passwordedit

  • 值类型为 password
  • 此设置没有默认值。

如果需要客户端身份验证,此设置将存储密钥库密码

ssl_keystore_typeedit

  • 值类型为 字符串
  • 此设置没有默认值。

密钥库文件格式。它必须是 jksPKCS12

ssl_truststore_locationedit

  • 值类型为 path
  • 此设置没有默认值。

用于验证 Kafka 代理证书的 JKS 信任库路径。

ssl_truststore_passwordedit

  • 值类型为 password
  • 此设置没有默认值。

信任库密码。

ssl_truststore_typeedit

  • 值类型为 字符串
  • 此设置没有默认值。

信任库文件的格式。它必须是 jksPKCS12

topicsedit

  • 值类型为 数组
  • 默认值为 ["logstash"]

要订阅的主题列表,默认为 ["logstash"]。

topics_patternedit

  • 值类型为 字符串
  • 此设置没有默认值。

要订阅的主题正则表达式模式。

通过正则表达式进行过滤是通过从代理获取完整的主题名称列表并在本地应用模式来完成的。当与具有大量主题的代理一起使用时,此操作可能非常慢,尤其是在有大量消费者的情况下。

当代理配置了一些具有 ACL 规则的主题并且它们缺少 DESCRIBE 权限时,订阅会发生,但在代理端会记录一些主题的订阅被拒绝给配置的用户。

value_deserializer_classedit

  • 值类型为 字符串
  • 默认值为 "org.apache.kafka.common.serialization.StringDeserializer"

用于反序列化记录值的 Java 类。只有在不使用 Schema Registry 时才能使用自定义值反序列化器。使用 value_deserializer_class 配置选项或 schema_registry_url 配置选项,但不能同时使用两者。

通用选项edit

以下配置选项受所有输入插件支持

设置 输入类型 必需

add_field

hash

codec

codec

enable_metric

布尔值

id

字符串

tags

数组

type

字符串

详情edit

 

add_fieldedit

  • 值类型为 哈希
  • 默认值为 {}

向事件添加字段

codecedit

用于输入数据的编解码器。输入编解码器是在数据进入输入之前对其进行解码的便捷方法,无需在 Logstash 管道中使用单独的过滤器。

enable_metricedit

禁用或启用此特定插件实例的指标日志记录,默认情况下我们会记录所有可以记录的指标,但您可以禁用特定插件的指标收集。

idedit

  • 值类型为 字符串
  • 此设置没有默认值。

向插件配置添加唯一的 ID。如果未指定 ID,Logstash 将生成一个。强烈建议在配置中设置此 ID。当您有两个或多个相同类型的插件时,这尤其有用,例如,如果您有两个 kafka 输入。在这种情况下,添加一个命名 ID 将有助于在使用监控 API 时监控 Logstash。

input {
  kafka {
    id => "my_plugin_id"
  }
}

id 字段中的变量替换仅支持环境变量,不支持使用来自密钥存储的值。

tagsedit

  • 值类型为 数组
  • 此设置没有默认值。

向您的事件添加任意数量的任意标签。

这可以帮助以后处理。

typeedit

  • 值类型为 字符串
  • 此设置没有默认值。

向此输入处理的所有事件添加 type 字段。

类型主要用于过滤器激活。

类型存储为事件本身的一部分,因此您也可以使用类型在 Kibana 中搜索它。

如果您尝试在已经具有类型的事件上设置类型(例如,当您从发送器发送事件到索引器时),那么新的输入将不会覆盖现有的类型。在发送器处设置的类型会一直保留在该事件中,即使发送到另一个 Logstash 服务器也是如此。