Kafka 集成

编辑

版本

1.16.0 (查看全部)

兼容的 Kibana 版本

8.13.0 或更高版本

支持的 Serverless 项目类型
这是什么?

安全
可观测性

订阅级别
这是什么?

基础

支持级别
这是什么?

Elastic

此集成从 Kafka 服务器收集日志和指标。

兼容性

编辑

log 数据集已使用 Kafka 0.9、1.1.0 和 2.0.0 的日志进行测试。

brokerconsumergrouppartitionproducer 指标集已使用 Kafka 0.10.2.1、1.1.0、2.1.1 和 2.2.2 进行测试。

broker 指标集需要 Jolokia 来获取 JMX 指标。有关更多信息,请参阅有关 Jolokia 的 Metricbeat 文档。

日志

编辑

log 数据集从 Kafka 服务器收集和解析日志。

ECS 字段参考

有关 ECS 字段的详细信息,请参阅以下文档

导出的字段
字段 描述 类型

@timestamp

事件时间戳。

日期

cloud.image.id

云实例的映像 ID。

keyword

data_stream.dataset

数据流数据集。

constant_keyword

data_stream.namespace

数据流命名空间。

constant_keyword

data_stream.type

数据流类型。

constant_keyword

event.dataset

事件数据集

constant_keyword

event.module

事件模块

constant_keyword

host.containerized

主机是否为容器。

布尔值

host.os.build

操作系统构建信息。

keyword

host.os.codename

操作系统代号(如果有)。

keyword

kafka.log.class

日志来源的 Java 类。

keyword

kafka.log.component

日志来源的组件。

keyword

kafka.log.thread

日志来源的线程名称。

keyword

kafka.log.trace.class

跟踪来源的 Java 类。

keyword

kafka.log.trace.message

跟踪的消息部分。

文本

指标

编辑
broker
编辑

broker 数据集使用 Jolokia 从 Kafka brokers 收集 JMX 指标。

示例

broker 的一个示例事件如下所示

{
    "@timestamp": "2020-05-15T15:12:12.270Z",
    "agent": {
        "ephemeral_id": "178ff0e9-e3dd-4bdf-8e3d-8f67a6bd72ef",
        "id": "5aba67f2-2050-4d19-8953-ba20f0a5483c",
        "name": "kafka-01",
        "type": "metricbeat",
        "version": "8.0.0"
    },
    "ecs": {
        "version": "8.11.0"
    },
    "event": {
        "dataset": "kafka.broker",
        "duration": 4572918,
        "module": "kafka"
    },
    "kafka": {
        "broker": {
            "mbean": "kafka.server:name=BytesOutPerSec,topic=messages,type=BrokerTopicMetrics",
            "topic": {
                "net": {
                    "out": {
                        "bytes_per_sec": 0.6089809926927563
                    }
                }
            }
        }
    },
    "metricset": {
        "name": "broker",
        "period": 10000
    },
    "service": {
        "address": "localhost:8778",
        "type": "kafka"
    }
}

ECS 字段参考

有关 ECS 字段的详细信息,请参阅以下文档

导出的字段
字段 描述 类型 指标类型

@timestamp

事件时间戳。

日期

agent.id

keyword

cloud.account.id

用于在多租户环境中识别不同实体的云帐户或组织 ID。示例:AWS 帐户 ID、Google Cloud ORG ID 或其他唯一标识符。

keyword

cloud.availability_zone

此主机运行所在的可用区。

keyword

cloud.image.id

云实例的映像 ID。

keyword

cloud.instance.id

主机实例 ID。

keyword

cloud.provider

云提供商的名称。示例值包括 aws、azure、gcp 或 digitalocean。

keyword

cloud.region

此主机运行所在的区域。

keyword

container.id

唯一容器 ID。

keyword

data_stream.dataset

数据流数据集。

constant_keyword

data_stream.namespace

数据流命名空间。

constant_keyword

data_stream.type

数据流类型。

constant_keyword

event.dataset

事件数据集

constant_keyword

event.module

事件模块

constant_keyword

host.containerized

主机是否为容器。

布尔值

host.name

主机名称。它可以包含 Unix 系统上 hostname 返回的内容、完全限定域名或用户指定的名称。发送方决定使用哪个值。

keyword

host.os.build

操作系统构建信息。

keyword

host.os.codename

操作系统代号(如果有)。

keyword

kafka.broker.address

Broker 通告地址

keyword

kafka.broker.id

Broker ID

long

kafka.broker.log.flush_rate

日志刷新率

浮点数

gauge

kafka.broker.mbean

此事件相关的 Mbean

keyword

kafka.broker.messages_in

传入消息速率

浮点数

gauge

kafka.broker.net.in.bytes_per_sec

传入字节速率

浮点数

gauge

kafka.broker.net.out.bytes_per_sec

传出字节速率

浮点数

gauge

kafka.broker.net.rejected.bytes_per_sec

拒绝字节速率

浮点数

gauge

kafka.broker.replication.leader_elections

leader 选举速率

浮点数

gauge

kafka.broker.replication.unclean_leader_elections

不干净的 leader 选举速率

浮点数

gauge

kafka.broker.request.channel.queue.size

请求队列的大小

long

gauge

kafka.broker.request.fetch.failed

客户端获取请求失败的次数

浮点数

计数器

kafka.broker.request.fetch.failed_per_second

每秒客户端获取请求失败的速率

浮点数

gauge

kafka.broker.request.produce.failed

失败的生成请求数

浮点数

计数器

kafka.broker.request.produce.failed_per_second

每秒失败的生成请求的速率

浮点数

gauge

kafka.broker.session.zookeeper.disconnect

每秒 ZooKeeper 关闭的会话数

浮点数

gauge

kafka.broker.session.zookeeper.expire

每秒 ZooKeeper 过期的会话数

浮点数

gauge

kafka.broker.session.zookeeper.readonly

每秒 ZooKeeper 只读会话数

浮点数

gauge

kafka.broker.session.zookeeper.sync

每秒 ZooKeeper 客户端连接数

浮点数

gauge

kafka.broker.topic.messages_in

每个主题的传入消息速率

浮点数

gauge

kafka.broker.topic.net.in.bytes_per_sec

每个主题的传入字节速率

浮点数

gauge

kafka.broker.topic.net.out.bytes_per_sec

每个主题的传出字节速率

浮点数

gauge

kafka.broker.topic.net.rejected.bytes_per_sec

每个主题的拒绝字节速率

浮点数

gauge

kafka.partition.id

分区 ID。

long

kafka.partition.topic_broker_id

主题和 Broker 中分区的唯一 ID。

keyword

kafka.partition.topic_id

主题中分区的唯一 ID。

keyword

kafka.topic.error.code

主题错误代码。

long

kafka.topic.name

主题名称

keyword

service.address

从中收集有关此服务的数据的地址。这应该是一个 URI、网络地址(ipv4:port 或 [ipv6]:port)或资源路径(套接字)。

keyword

consumergroup
编辑
示例

consumergroup 的一个示例事件如下所示

{
    "@timestamp": "2020-05-15T15:18:13.919Z",
    "agent": {
        "ephemeral_id": "178ff0e9-e3dd-4bdf-8e3d-8f67a6bd72ef",
        "id": "5aba67f2-2050-4d19-8953-ba20f0a5483c",
        "name": "kafka-01",
        "type": "metricbeat",
        "version": "8.0.0"
    },
    "ecs": {
        "version": "8.11.0"
    },
    "event": {
        "dataset": "kafka.consumergroup",
        "duration": 8821045,
        "module": "kafka"
    },
    "kafka": {
        "broker": {
            "address": "kafka-01:9092",
            "id": 0
        },
        "consumergroup": {
            "client": {
                "host": "127.0.0.1",
                "id": "consumer-console-consumer-99447-1",
                "member_id": "consumer-console-consumer-99447-1-208fdf91-2f28-4336-a2ff-5e5f4b8b71e4"
            },
            "consumer_lag": 112,
            "error": {
                "code": 0
            },
            "id": "console-consumer-99447",
            "meta": "",
            "offset": -1
        },
        "partition": {
            "id": 0,
            "topic_id": "0-messages"
        },
        "topic": {
            "name": "messages"
        }
    },
    "metricset": {
        "name": "consumergroup",
        "period": 10000
    },
    "service": {
        "address": "localhost:9092",
        "type": "kafka"
    }
}

ECS 字段参考

有关 ECS 字段的详细信息,请参阅以下文档

导出的字段
字段 描述 类型 指标类型

@timestamp

事件时间戳。

日期

agent.id

keyword

cloud.account.id

用于在多租户环境中识别不同实体的云帐户或组织 ID。示例:AWS 帐户 ID、Google Cloud ORG ID 或其他唯一标识符。

keyword

cloud.availability_zone

此主机运行所在的可用区。

keyword

cloud.image.id

云实例的映像 ID。

keyword

cloud.instance.id

主机实例 ID。

keyword

cloud.provider

云提供商的名称。示例值包括 aws、azure、gcp 或 digitalocean。

keyword

cloud.region

此主机运行所在的区域。

keyword

container.id

唯一容器 ID。

keyword

data_stream.dataset

数据流数据集。

constant_keyword

data_stream.namespace

数据流命名空间。

constant_keyword

data_stream.type

数据流类型。

constant_keyword

event.dataset

事件数据集

constant_keyword

event.module

事件模块

constant_keyword

host.containerized

主机是否为容器。

布尔值

host.name

主机名称。它可以包含 Unix 系统上 hostname 返回的内容、完全限定域名或用户指定的名称。发送方决定使用哪个值。

keyword

host.os.build

操作系统构建信息。

keyword

host.os.codename

操作系统代号(如果有)。

keyword

kafka.broker.address

Broker 通告地址

keyword

kafka.broker.id

Broker ID

long

kafka.consumergroup.client.host

客户端主机

keyword

kafka.consumergroup.client.id

客户端 ID(kafka 设置 client.id)

keyword

kafka.consumergroup.client.member_id

内部消费者组会员 ID

keyword

kafka.consumergroup.consumer_lag

为分区/主题计算的消费者滞后,为分区偏移量和消费者偏移量之间的差值

long

gauge

kafka.consumergroup.error.code

kafka 消费者/分区错误代码。

long

kafka.consumergroup.id

消费者组 ID

keyword

kafka.consumergroup.meta

自定义消费者元数据字符串

keyword

kafka.consumergroup.offset

正在读取的分区中的消费者偏移量

long

gauge

kafka.partition.id

分区 ID。

long

kafka.partition.topic_broker_id

主题和 Broker 中分区的唯一 ID。

keyword

kafka.partition.topic_id

主题中分区的唯一 ID。

keyword

kafka.topic.error.code

主题错误代码。

long

kafka.topic.name

主题名称

keyword

service.address

从中收集有关此服务的数据的地址。这应该是一个 URI、网络地址(ipv4:port 或 [ipv6]:port)或资源路径(套接字)。

keyword

partition
编辑
示例

partition 的一个示例事件如下所示

{
    "@timestamp": "2020-05-15T15:19:44.240Z",
    "agent": {
        "ephemeral_id": "178ff0e9-e3dd-4bdf-8e3d-8f67a6bd72ef",
        "id": "5aba67f2-2050-4d19-8953-ba20f0a5483c",
        "name": "kafka-01",
        "type": "metricbeat",
        "version": "8.0.0"
    },
    "ecs": {
        "version": "8.11.0"
    },
    "event": {
        "dataset": "kafka.partition",
        "duration": 11263377,
        "module": "kafka"
    },
    "kafka": {
        "broker": {
            "address": "kafka-01:9092",
            "id": 0
        },
        "partition": {
            "id": 0,
            "offset": {
                "newest": 111,
                "oldest": 0
            },
            "partition": {
                "insync_replica": true,
                "is_leader": true,
                "leader": 0,
                "replica": 0
            },
            "topic_broker_id": "0-messages-0",
            "topic_id": "0-messages"
        },
        "topic": {
            "name": "messages"
        }
    },
    "metricset": {
        "name": "partition",
        "period": 10000
    },
    "service": {
        "address": "localhost:9092",
        "type": "kafka"
    }
}

ECS 字段参考

有关 ECS 字段的详细信息,请参阅以下文档

导出的字段
字段 描述 类型 指标类型

@timestamp

事件时间戳。

日期

agent.id

keyword

cloud.account.id

用于在多租户环境中识别不同实体的云帐户或组织 ID。示例:AWS 帐户 ID、Google Cloud ORG ID 或其他唯一标识符。

keyword

cloud.availability_zone

此主机运行所在的可用区。

keyword

cloud.image.id

云实例的映像 ID。

keyword

cloud.instance.id

主机实例 ID。

keyword

cloud.provider

云提供商的名称。示例值包括 aws、azure、gcp 或 digitalocean。

keyword

cloud.region

此主机运行所在的区域。

keyword

container.id

唯一容器 ID。

keyword

data_stream.dataset

数据流数据集。

constant_keyword

data_stream.namespace

数据流命名空间。

constant_keyword

data_stream.type

数据流类型。

constant_keyword

event.dataset

事件数据集

constant_keyword

event.module

事件模块

constant_keyword

host.containerized

主机是否为容器。

布尔值

host.name

主机名称。它可以包含 Unix 系统上 hostname 返回的内容、完全限定域名或用户指定的名称。发送方决定使用哪个值。

keyword

host.os.build

操作系统构建信息。

keyword

host.os.codename

操作系统代号(如果有)。

keyword

kafka.broker.address

Broker 通告地址

keyword

kafka.broker.id

Broker ID

long

kafka.partition.id

分区 ID。

long

kafka.partition.offset.newest

分区的最新偏移量。

long

gauge

kafka.partition.offset.oldest

分区的最旧偏移量。

long

gauge

kafka.partition.partition.error.code

从获取分区返回的错误代码。

long

kafka.partition.partition.insync_replica

指示副本是否包含在同步副本集 (ISR) 中。

布尔值

kafka.partition.partition.is_leader

指示副本是否为 leader

布尔值

kafka.partition.partition.leader

Leader ID(broker)。

long

kafka.partition.partition.replica

副本 ID(broker)。

long

kafka.partition.topic_broker_id

主题和 Broker 中分区的唯一 ID。

keyword

kafka.partition.topic_id

主题中分区的唯一 ID。

keyword

kafka.topic.error.code

主题错误代码。

long

kafka.topic.name

主题名称

keyword

service.address

从中收集有关此服务的数据的地址。这应该是一个 URI、网络地址(ipv4:port 或 [ipv6]:port)或资源路径(套接字)。

keyword

变更日志

编辑
变更日志
版本 详细信息 Kibana 版本

1.16.0

增强 (查看拉取请求)
添加对 broker、consumergroup 和分区数据流的处理器支持。

8.13.0 或更高版本

1.15.0

增强 (查看拉取请求)
ECS 版本更新至 8.11.0。将 kibana 约束更新为 ^8.13.0。修改字段定义以删除 ecs@mappings 组件模板使之多余的 ECS 字段。

8.13.0 或更高版本

1.14.0

增强 (查看拉取请求)
添加对 data_stream.dataset 的全局筛选器以提高性能。

8.12.0 或更高版本

1.13.0

增强 (查看拉取请求)
启用敏感字段的密钥。有关更多详细信息,请参阅 https://elastic.ac.cn/guide/en/fleet/current/agent-policy.html#agent-policy-secret-values

8.12.0 或更高版本

1.12.1

Bug 修复 (查看拉取请求)
由于错误,禁用较旧堆栈版本的密钥。

8.8.0 或更高版本

1.12.0

增强 (查看拉取请求)
为敏感字段启用密钥,从 8.12 开始支持。

8.8.0 或更高版本

1.11.0

增强 (查看拉取请求)
将缺少的 SSL 字段添加到代理配置。

8.8.0 或更高版本

1.10.0

增强 (查看拉取请求)
将软件包 format_version 更新为 3.0.0。

8.8.0 或更高版本

1.9.2

Bug 修复 (查看拉取请求)
向重命名处理器添加 null 检查和 ignore_missing 检查

8.8.0 或更高版本

1.9.1

增强 (查看拉取请求)
将可视化迁移到 lens。

8.8.0 或更高版本

1.9.0

增强 (查看拉取请求)
还原对权限的更改,以将事件重新路由到 logs-- 用于日志数据流

8.8.0 或更高版本

1.8.0

增强 (查看拉取请求)
为指标数据集启用时间序列数据流。这将显著减少指标的存储空间,并有望逐步提高查询性能。更多详细信息,请参阅 https://elastic.ac.cn/guide/en/elasticsearch/reference/current/tsds.html

8.8.0 或更高版本

1.7.0

增强 (查看拉取请求)
为日志数据流添加将事件重新路由到 logs-- 的权限

7.14.0 或更高版本
8.0.0 或更高版本

1.6.0

增强 (查看拉取请求)
将所有权从 obs-service-integrations 重命名为 obs-infraobs-integrations

7.14.0 或更高版本
8.0.0 或更高版本

1.5.6

增强 (查看拉取请求)
修改了维度字段映射以支持公有云部署。

7.14.0 或更高版本
8.0.0 或更高版本

1.5.5

增强 (查看拉取请求)
为分区数据流添加维度字段。

7.14.0 或更高版本
8.0.0 或更高版本

1.5.4

增强 (查看拉取请求)
为消费者组数据流添加维度映射。

7.14.0 或更高版本
8.0.0 或更高版本

1.5.3

增强 (查看拉取请求)
为 broker 数据流添加了维度字段。

7.14.0 或更高版本
8.0.0 或更高版本

1.5.2

增强 (查看拉取请求)
为分区数据流添加 metric_type 映射。

7.14.0 或更高版本
8.0.0 或更高版本

1.5.1

增强 (查看拉取请求)
为 consumer_group 数据流添加 metric_type 映射。

7.14.0 或更高版本
8.0.0 或更高版本

1.5.0

增强 (查看拉取请求)
为 broker 数据流添加 metric_type 映射。

7.14.0 或更高版本
8.0.0 或更高版本

1.4.1

增强 (查看拉取请求)
添加了类别和/或子类别。

7.14.0 或更高版本
8.0.0 或更高版本

1.4.0

增强 (查看拉取请求)
将 ECS 版本更新至 8.5.1

7.14.0 或更高版本
8.0.0 或更高版本

1.3.1

增强 (查看拉取请求)
更新数据仪表板字段

7.14.0 或更高版本
8.0.0 或更高版本

1.3.0

增强 (查看拉取请求)
添加了基础架构类别。

7.14.0 或更高版本
8.0.0 或更高版本

1.2.4

增强 (查看拉取请求)
支持 SASL 机制

7.14.0 或更高版本
8.0.0 或更高版本

1.2.3

增强 (查看拉取请求)
添加多字段的文档

7.14.0 或更高版本
8.0.0 或更高版本

1.2.2

错误修复 (查看拉取请求)
传递 SSL 配置。

7.14.0 或更高版本
8.0.0 或更高版本

1.2.1

错误修复 (查看拉取请求)
将缺失的 event.* 字段添加到 ecs.yml

1.2.0

增强 (查看拉取请求)
更新至 ECS 8.0

1.1.0

增强 (查看拉取请求)
支持 Kibana 8.0

7.14.0 或更高版本
8.0.0 或更高版本

1.0.0

增强 (查看拉取请求)
发布 GA 版本的 Kafka

0.7.2

增强 (查看拉取请求)
与指南保持一致

0.7.1

错误修复 (查看拉取请求)
修复检查 _forwarded_ 标签的逻辑

0.7.0

增强 (查看拉取请求)
更新至 ECS 1.12.0

0.6.2

增强 (查看拉取请求)
转换为生成的 ECS 字段

0.6.1

增强 (查看拉取请求)
更新至 ECS 1.11.0

0.6.0

增强 (查看拉取请求)
更新集成描述

0.5.0

增强 (查看拉取请求)
启用 ECS 依赖

增强 (查看拉取请求)
设置 "event.module" 和 "event.dataset"

0.4.0

增强 (查看拉取请求)
更新至 ECS 1.10.0 并添加 event.original 选项

0.3.8

增强 (查看拉取请求)
更新包所有者

0.3.7

错误修复 (查看拉取请求)
更正示例事件文件。

0.1.0

增强 (查看拉取请求)
初始版本