Kafka 输出
编辑Kafka 输出
编辑Kafka 输出将事件发送到 Apache Kafka。
兼容性:此输出可以连接到 Kafka 版本 0.8.2.0 及更高版本。旧版本也可能有效,但不受支持。
以下示例在 Elastic Agent elastic-agent.yml
文件中配置一个名为 kafka-output
的 Kafka 输出,其设置如下所述:
outputs: kafka-output: type: kafka hosts: - 'kafka1:9092' - 'kafka2:9092' - 'kafka3:9092' client_id: Elastic version: 1.0.0 compression: gzip compression_level: 4 username: <my-kafka-username> password: <my-kakfa-password> sasl: mechanism: SCRAM-SHA-256 partition: round_robin: group_events: 1 topic: 'elastic-agent' headers: [] timeout: 30 broker_timeout: 30 required_acks: 1 ssl: verification_mode: full
Kafka 输出和使用 Logstash 将数据索引到 Elasticsearch
编辑如果您正在考虑使用 Logstash 将数据从 kafka
发送到 Elasticsearch,请注意从 Elastic Agent 发送到 kafka
的文档结构不得被 Logstash 修改。我们建议禁用 kafka
输入和 json
编解码器上的 ecs_compatibility
,以确保输入不会编辑字段及其内容。
集成设置的数据流期望接收与直接从 Elastic Agent 发送的事件具有相同结构和字段名称的事件。
有关更多详细信息,请参阅 Elastic Agent 的 Logstash 输出文档。
inputs { kafka { ... ecs_compatibility => "disabled" codec => json { ecs_compatibility => "disabled" } ... } } ...
Kafka 输出配置设置
编辑kafka
输出支持以下按类别分组的设置。其中许多设置具有合理的默认值,允许您以最少的配置运行 Elastic Agent。
设置 | 描述 |
---|---|
(布尔值)启用或禁用输出。如果设置为 |
|
您的 Elastic Agent 将用于连接到一个或多个 Kafka 代理的地址。 以下是一个示例 hosts: - 'localhost:9092' - 'mykafkahost01:9092' - 'mykafkahost02:9092' |
|
Elastic Agent 在连接时将请求的 Kafka 协议版本。默认为 1.0.0。 协议版本控制可供 Elastic Agent 使用的 Kafka 客户端功能;它不会阻止 Elastic Agent 连接到比协议版本更新的 Kafka 版本。 |
身份验证设置
编辑设置 | 描述 |
---|---|
连接到 Kafka 的用户名。如果配置了用户名,则必须同时配置密码。 |
|
连接到 Kafka 的密码。 |
|
连接到 Kafka 时要使用的 SASL 机制。它可以是以下之一:
|
|
当通过 |
内存队列设置
编辑内存队列将所有事件保存在内存中。
内存队列等待输出确认或丢弃事件。如果队列已满,则无法将新事件插入内存队列。只有在收到输出的信号后,队列才会释放空间以接受更多事件。
内存队列由参数 flush.min_events
和 flush.timeout
控制。flush.min_events
限制了单个批次中可以包含的事件数量,而 flush.timeout
指定队列应等待多长时间才能完全填充事件请求。如果输出支持 bulk_max_size
参数,则最大批次大小将是 bulk_max_size
和 flush.min_events
中较小的值。
flush.min_events
是一个旧参数,新配置应优先使用 bulk_max_size
来控制批次大小。从 8.13 开始,使用 flush.min_events
而不是 bulk_max_size
来限制批次大小永远不会有性能优势。
在同步模式下,事件请求始终在事件可用时立即填充,即使没有足够的事件来填充请求的批次。这在必须最小化延迟时很有用。要使用同步模式,请将 flush.timeout
设置为 0。
为了向后兼容,还可以通过将 flush.min_events
设置为 0 或 1 来激活同步模式。在这种情况下,批次大小将被限制为队列容量的 1/2。
在异步模式下,事件请求将等待指定超时时间,以尝试完全填充请求的批次。如果超时到期,队列将返回包含所有可用事件的部分批次。要使用异步模式,请将 flush.timeout
设置为正持续时间,例如 5 秒。
此示例配置在有足够的事件来填充输出的请求(通常由 bulk_max_size
控制,并且最多受 flush.min_events
限制为 512 个事件)时,或者在事件已等待一段时间的情况下,将事件转发到输出
queue.mem.events: 4096 queue.mem.flush.min_events: 512 queue.mem.flush.timeout: 5s
设置 | 描述 |
---|---|
队列可以存储的事件数。此值应可被 默认值: |
|
默认值: |
|
(整数)满足 默认值: |
分区设置
编辑创建的分区数由 Kafka 代理根据主题列表自动设置。然后,记录将随机、以循环方式或根据计算的哈希值发布到分区。
在以下示例中,每个事件发布到分区后,分区器将以循环方式选择下一个分区。
partition: round_robin: group_events: 1
标头设置
编辑标头是键值对,多个标头可以包含相同的键。仅支持字符串值。这些标头将包含在每个生成的 Kafka 消息中。
其他配置设置
编辑您可以在代理配置文件的 kafka-output
部分中指定这些各种其他选项。
设置 | 描述 |
---|---|
(字符串)在网络错误后尝试重新连接到 Kafka 之前等待的秒数。等待 默认值: |
|
(字符串) 网络错误后尝试连接 Kafka 之前等待的最大秒数。 默认值: |
|
Kafka broker 等待所需数量的 ACK 确认的最大时长,然后超时(请参阅后面的 默认值: |
|
(整数) 发送批量 Kafka 请求之前等待的时长。 默认值: |
|
(整数) 单个 Kafka 请求中批量处理的最大事件数。 默认值: |
|
(整数) 每个 Kafka broker 在输出管道中缓冲的消息数量。 默认值: |
|
输出编解码器配置。您可以指定
以下示例配置使用启用美化打印的 output.console: codec.json: pretty: true escape_html: false
以下示例配置使用 output.console: codec.format: string: '%{[@timestamp]} %{[message]}' |
|
选择要使用的压缩编解码器。支持的编解码器有 |
|
对于 增加压缩级别会减少网络使用,但会增加 CPU 使用率。 默认值: |
|
(字符串) 活动网络连接的保持活动周期。如果为 默认值: |
|
(整数) JSON 编码消息的最大允许大小。更大的消息将被丢弃。该值应等于或小于 broker 的 默认值: |
|
Kafka 元数据更新设置。元数据包含有关 brokers、topics、分区和用于发布的活动 leader 的信息。
|
|
从 broker 收到的 ACK 可靠性级别。0=无响应,1=等待本地提交,-1=等待所有副本提交。默认值为 1。 注意:如果设置为 0,Kafka 不会返回任何 ACK。错误时消息可能会静默丢失。 默认值: |
|
等待 Kafka brokers 响应超时前的秒数。默认值为 30 (秒)。 默认值: |