David Hope

如何使用 Elastic Observability 监控 Kafka 和 Confluent Cloud

这篇博文将带您了解如何使用 Elastic Observability 观察在 Confluent Cloud 上实现的基于 Kafka 的解决方案的最佳实践。

16 分钟阅读
How to monitor Kafka and Confluent Cloud with Elastic Observability

这篇博文将带您了解如何使用 Elastic Observability 观察在 Confluent Cloud 上实现的基于 Kafka 的解决方案的最佳实践。(要监控不在 Confluent Cloud 中的 Kafka 代理,我建议您查看这篇博文。)我们将使用 Elastic APM 仪表化 Kafka 应用程序,使用 Confluent Cloud 指标端点获取有关代理的数据,并将其与 Elastic Observability 中的统一 Kafka 和 Confluent Cloud 监控仪表板汇总在一起。

使用全栈 Elastic Observability 了解 Kafka 和 Confluent 性能

2023 Dice 技术薪资报告中,Elasticsearch 和 Kakfa 在目前最受欢迎的 12 项 技能中分别排名第三和第五,因此我们看到大量客户使用 Kafka 实施动态数据也就不足为奇了。

Kafka 带来了一些超出传统架构的额外复杂性,这使得可观测性成为一个更加重要的话题。了解基于消息和流的架构中的瓶颈可能很困难。这就是为什么您需要一个具有机器学习功能的全面可观测性解决方案来帮助您。

在本博文中,我们将探讨如何使用 Elastic APM 来仪表化 Kafka 应用程序,如何使用 JMX 收集性能数据,以及如何使用 Elasticsearch 平台从 Confluent Cloud 中提取数据 — 这是目前实施 Kafka 架构最简单且最具成本效益的方式。

对于这篇博文,我们将遵循此 git 存储库中的代码。这里有三个服务,旨在在两个云上运行,并将数据从一个云推送到另一个云,最后推送到 Google BigQuery。我们希望使用 Elastic Observability 监控所有这些,以便为您提供 Confluent 和 Kafka 服务性能的完整概览,如下面的目标所示

架构概述

如前所述,我们的示例应用程序中实现了三个 多云服务

第一个服务是在 AWS EKS 内运行的 Spring WebFlux 服务。此服务将从 REST 端点接收消息,并将其直接放入 Kafka 主题中。

第二个服务也是一个 Spring WebFlux 服务,托管在 Google Cloud Platform (GCP) 中,并具有其 Google Cloud 监控,然后将接收此消息并将其转发到另一个服务,该服务会将消息放入 BigQuery 中。

这些服务都使用 Elastic APM 进行仪表化。对于本博文,我们决定使用 Spring 配置来注入和配置 APM 代理。当然,如果需要,您也可以使用“-javaagent”参数来注入代理。

开始使用 Elastic Observability 和 Confluent Cloud

在我们深入了解应用程序及其配置之前,您需要获得一个 Elastic Cloud 和 Confluent Cloud 帐户。您可以在此处注册 Elastic,在此处注册 Confluent Cloud。我们需要在 Confluent Cloud 中执行一些初始配置步骤,因为您需要创建三个主题:gcpTopic、myTopic 和 topic_2。

当您注册 Confluent Cloud 时,系统会为您提供一个创建哪种类型集群的选项。对于本演练,基本集群就可以了(如图所示)— 如果您谨慎使用,它不会花费您一分钱。

拥有集群后,继续创建三个主题。

对于本演练,您只需要创建如下所示的单分区主题

现在我们已准备好设置 Elastic Cloud 集群。

这里需要注意的一点是,在设置 Elastic 集群时,默认设置大多是可以的。只需进行一个小调整,在“高级设置”下添加机器学习,即可在此处添加机器学习容量。

启动并运行 APM

我们这里要做的第一件事是启动并运行基于 Spring Boot Webflux 的服务。对于这篇博文,我决定使用 Spring 配置来实现此目的,如下所示。为了简洁起见,我没有列出所有 JMX 配置信息,但您可以在 GitHub 中看到这些详细信息。

package com.elastic.multicloud;
import co.elastic.apm.attach.ElasticApmAttacher;
import jakarta.annotation.PostConstruct;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Setter
@Configuration
@ConfigurationProperties(prefix = "elastic.apm")
@ConditionalOnProperty(value = "elastic.apm.enabled", havingValue = "true")
public class ElasticApmConfig {

    private static final String SERVER_URL_KEY = "server_url";
    private String serverUrl;

    private static final String SERVICE_NAME_KEY = "service_name";
    private String serviceName;

    private static final String SECRET_TOKEN_KEY = "secret_token";
    private String secretToken;

    private static final String ENVIRONMENT_KEY = "environment";
    private String environment;

    private static final String APPLICATION_PACKAGES_KEY = "application_packages";
    private String applicationPackages;

    private static final String LOG_LEVEL_KEY = "log_level";
    private String logLevel;
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticApmConfig.class);

    @PostConstruct
    public void init() {
        LOGGER.info(environment);

        Map<String, String> apmProps = new HashMap<>(6);
        apmProps.put(SERVER_URL_KEY, serverUrl);
        apmProps.put(SERVICE_NAME_KEY, serviceName);
        apmProps.put(SECRET_TOKEN_KEY, secretToken);
        apmProps.put(ENVIRONMENT_KEY, environment);
        apmProps.put(APPLICATION_PACKAGES_KEY, applicationPackages);
        apmProps.put(LOG_LEVEL_KEY, logLevel);
        apmProps.put("enable_experimental_instrumentations","true");
          apmProps.put("capture_jmx_metrics","object_name[kafka.producer:type=producer-metrics,client-id=*] attribute[batch-size-avg:metric_name=kafka.producer.batch-size-avg]");


        ElasticApmAttacher.attach(apmProps);
    }
}

现在,显然这需要一些依赖项,您可以在 Maven pom.xml 中看到这些依赖项。

<dependency>
			<groupId>co.elastic.apm</groupId>
			<artifactId>apm-agent-attach</artifactId>
			<version>1.35.1-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>co.elastic.apm</groupId>
			<artifactId>apm-agent-api</artifactId>
			<version>1.35.1-SNAPSHOT</version>
		</dependency>

严格来说,不需要 agent-api,但如果您希望添加自己的监控代码(如下例所示),它可能会很有用。不过,代理会在不需要您执行任何操作的情况下自动仪表化。

Transaction transaction = ElasticApm.currentTransaction();
        Span span = ElasticApm.currentSpan()
                .startSpan("external", "kafka", null)
                .setName("DAVID").setServiceTarget("kafka","gcp-elastic-apm-spring-boot-integration");
        try (final Scope scope = transaction.activate()) {
            span.injectTraceHeaders((name, value) -> producerRecord.headers().add(name,value.getBytes()));
            return Mono.fromRunnable(() -> {
                kafkaTemplate.send(producerRecord);
            });
        } catch (Exception e) {
            span.captureException(e);
            throw e;
        } finally {
            span.end();
        }

现在我们有足够的代码来启动我们的代理。

要使 GitHub 存储库中的代码启动并运行,您需要在系统上安装以下内容,并确保您拥有 GCP 和 AWS 云的凭据。


Java
Maven
Docker
Kubernetes CLI (kubectl)

克隆项目

将多云 Spring 项目克隆到您的本地计算机。

git clone https://github.com/davidgeorgehope/multi-cloud

构建项目

从项目中的每个服务(aws-multi-cloud、gcp-multi-cloud、gcp-bigdata-consumer-multi-cloud)中,运行以下命令来构建项目。

mvn clean install

现在您可以在本地运行 Java 项目。

java -jar gcp-bigdata-consumer-multi-cloud-0.0.1-SNAPSHOT.jar --spring.config.location=/Users/davidhope/applicaiton-gcp.properties

这只会使 Java 应用程序在本地运行,但您也可以使用如下所示的 EKS 和 GKE 将其部署到 Kubernetes。

创建 Docker 映像

使用项目中提供的 dockerBuild.sh 从构建的项目创建 Docker 映像。您可能需要自定义此 shell 脚本,以将构建的 docker 映像上传到您自己的 docker 存储库。

./dockerBuild.sh

为每个服务创建命名空间

kubectl create namespace aws
kubectl create namespace gcp-1
kubectl create namespace gcp-2

创建命名空间后,可以使用以下命令切换上下文

kubectl config set-context --current --namespace=my-namespace

每个服务的配置

每个服务都需要一个 application.properties 文件。我在这里提供了一个示例 这里

您需要将以下属性替换为在 Elastic 中找到的属性。

elastic.apm.server-url=
elastic.apm.secret-token=

可以通过进入 Elastic Cloud 并点击 APM 中的 Services,然后点击右上角的 Add Data 来找到这些属性。

在那里,您将看到以下内容,其中提供了您需要的配置信息。

您需要将以下属性替换为在 Confluent Cloud 中找到的属性。

elastic.kafka.producer.sasl-jaas-config=

此配置来自 Confluent Cloud 中的“Clients”页面。

在 Kubernetes 中为每个服务添加配置

一旦您拥有完全配置的 application.properties,您需要将其添加到您的 Kubernetes 环境,如下所示。

从 aws 命名空间。

kubectl create secret generic my-app-config --from-file=application.properties

从 gcp-1 命名空间。

kubectl create secret generic my-app-config --from-file=application.properties

从 gcp-2 命名空间。

kubectl create secret generic bigdata-creds --from-file=elastic-product-marketing-e145e13fbc7c.json

kubectl create secret generic my-app-config-gcp-bigdata --from-file=application.properties

创建 Kubernetes 部署

创建一个 Kubernetes 部署 YAML 文件,并将您的 Docker 镜像添加到其中。您可以使用项目中提供的 deployment.yaml 文件作为模板。请确保更新文件中的镜像名称,以匹配您刚刚创建的 Docker 镜像的名称。

kubectl apply -f deployment.yaml

创建 Kubernetes 服务

创建一个 Kubernetes 服务 YAML 文件,并将您的部署添加到其中。您可以使用项目中提供的 service.yaml 文件作为模板。

kubectl apply -f service.yaml

访问您的应用程序

您的应用程序现在正在 Kubernetes 集群中运行。要访问它,您可以使用服务的集群 IP 和端口。您可以使用以下命令获取服务的 IP 和端口。

kubectl get services

现在,一旦您知道服务的位置,您需要执行它!

您可以使用以下命令定期探测服务端点。

curl -X POST -H "Content-Type: application/json" -d '{"name": "linuxize", "email": "[email protected]"}' https://127.0.0.1:8080/api/my-objects/publish

在启动并运行后,您应该在 Elastic APM 产品中看到以下服务地图构建完成。

并且跟踪将包含一个瀑布图,显示在此分布式应用程序中执行的所有跨度,允许您精确定位每个事务中存在的任何问题。

用于 Kafka 生产者/消费者指标的 JMX

在本博客的前一部分中,我们简要介绍了您可以在下面看到的 JMX 指标配置。

"capture_jmx_metrics","object_name[kafka.producer:type=producer-metrics,client-id=*] attribute[batch-size-avg:metric_name=kafka.producer.batch-size-avg]"

我们可以使用此“capture_jmx_metrics”配置为我们想要监控的任何 Kafka 生产者/消费者指标配置 JMX。

请查看此处的文档以了解如何配置此项,并查看此处以了解您可以监控的可用 JMX 指标。在 GitHub 中的示例代码中,我们实际上拉入了所有可用的指标,因此您可以在其中检查如何配置此项。

这里值得指出的一点是,重要的是使用上面显示的“metric_name”属性,否则如果不在此处指定,则很难在 Elastic Discover 中找到指标。

使用 Elastic Observability 监控 Confluent Cloud

因此,我们现在为 Kafka 生产者和消费者设置了一些良好的监控,并且可以将服务之间的事务跟踪到正在执行的代码行。我们 Kafka 基础设施的核心部分托管在 Confluent Cloud 中。那么,我们如何从那里将数据获取到我们的 全栈可观测性解决方案 中呢?

幸运的是,Confluent 在使此操作变得简单方面做得非常出色。它通过一个基于 Prometheus 的开放指标 URL 提供重要的 Confluent Cloud 指标。因此,让我们开始配置此项,以便将数据导入我们的 可观测性工具

第一步是用 MetricsViewer 配置 Confluent Cloud。MetricsViewer 角色为组织中的所有集群提供对 Metrics API 的服务帐户访问权限。此角色还使服务帐户能够将指标导入到第三方指标平台。

要将 MetricsViewer 角色分配给新的服务帐户

  1. 在 Confluent Cloud 用户界面右上角的管理菜单 (☰) 中,单击 ADMINISTRATION > Cloud API keys
  2. 单击 Add key
  3. 单击 Granular access tile 设置 API 密钥的范围。单击 Next
  4. 单击 Create a new one 并指定服务帐户名称。可选地,添加描述。单击 Next
  5. 将为服务帐户生成 API 密钥和密钥。您将需要此 API 密钥和密钥才能连接到集群,因此请务必安全地存储此信息。单击 Save。将创建具有 API 密钥和相关 ACL 的新服务帐户。当您返回到 API 访问选项卡时,您可以查看新创建的 API 密钥以进行确认。
  6. 返回管理菜单中的“Accounts & access”,然后在“Accounts”选项卡中,单击 Service accounts 以查看您的服务帐户。
  7. 选择要为其分配 MetricsViewer 角色的服务帐户。
  8. 在服务帐户的详细信息页面中,单击 Access
  9. 在树视图中,打开您希望服务帐户拥有 MetricsViewer 角色的资源。
  10. 单击 Add role assignment 并选择 MetricsViewer 磁贴。单击 Save

接下来,我们可以前往 Elastic Observability 并配置 Prometheus 集成以提取指标数据。

转到 Kibana 中的集成页面。

找到 Prometheus 集成。我们正在使用 Prometheus 集成,因为 Confluent Cloud 指标服务器可以以 prometheus 格式提供数据。相信我们,这效果非常好 - Confluent 做得很好!

在下一页中添加 Prometheus。

以以下方式配置 Prometheus 插件:在主机框中,添加以下 URL,并将资源 kafka id 替换为您要监控的集群 ID。

https://api.telemetry.confluent.cloud:443/v2/metrics/cloud/export?resource.kafka.id=lkc-3rw3gw

在从您针对上述 Confluent Cloud 执行的 API 密钥步骤中获得的“高级”选项下添加用户名和密码。

创建集成后,策略需要应用于正在运行的 Elastic Agent 实例。

就是这样!获取全栈可观测性监控解决方案所需的所有数据就是这么简单。

最后,让我们将所有这些内容整合到一个仪表板中。

将所有内容整合在一起

使用 Kibana 生成仪表板非常容易。如果您按照我们上面推荐的方式配置了所有内容,您应该找到创建自己的仪表板所需的指标(生产者/消费者/代理),如以下屏幕截图所示。

幸运的是,我为您制作了一个仪表板并将其存储在 GitHub 中。请看下面,并使用它将其导入到您自己的环境中。

锦上添花:机器学习异常检测

现在我们已经准备好所有关键部分,我们将添加锦上添花:机器学习 (ML)!

在 Kibana 中,让我们转到“分析”中的“机器学习”选项卡。

转到作业页面,我们将在那里开始创建我们的第一个异常检测作业。

指标数据视图包含我们创建此新异常检测作业所需的内容。

使用向导并选择“单个指标”。

使用完整数据。

在本示例中,我们将查找连接计数中的异常。我们真的不希望在这里出现重大偏差,因为如果突然有太多或太少的事物连接到我们的 Kafka 集群,这可能表明发生了非常糟糕的事情。

选择连接计数指标后,您可以继续执行向导,最终将创建您的 ML 作业,并且您应该能够查看数据,如下例所示。

恭喜,您现在已经创建了一个机器学习作业,以便在您的 Kafka 集群出现任何问题时发出警报,从而为您的 Kafka 和 Confluent 可观测性添加 完整的 AIOps 解决方案

摘要

我们研究了使用 Elastic Observability 监控在 Confluent Cloud 上实现的基于 Kafka 的解决方案。

我们介绍了涉及 AWS EKS、Confluent Cloud 和 GCP GKE 的多云解决方案的架构。我们研究了如何使用 Elastic APM 检测 Kafka 应用程序、使用 JMX 获取 Kafka 生产者/消费者指标、集成 Prometheus 以及设置机器学习异常检测。

我们详细介绍了代码片段、配置步骤和部署说明,以帮助您入门。

有兴趣了解有关 Elastic Observability 的更多信息?请查看以下资源

并注册我们的 Elastic Observability 趋势网络研讨会,其中包含 AWS 和 Forrester 的内容,不容错过!

分享这篇文章