The blog will take you through best practices to observe Kafka-based solutions implemented on Confluent Cloud with Elastic Observability. (To monitor Kafka brokers that are not in Confluent Cloud, I recommend checking out this blog.) We will instrument Kafka applications with Elastic APM, use the Confluent Cloud metrics endpoint to get data about brokers, and pull it all together with a unified Kafka and Confluent Cloud monitoring dashboard in Elastic Observability.
Using full-stack Elastic Observability to understand Kafka and Confluent performance
In the 2023 Dice Tech Salary Report, Elasticsearch and Kakfa are ranked #3 and #5 out of the top 12 most in demand skills at the moment, so it’s no surprise that we are seeing a large number of customers who are implementing data in motion with Kafka.
Kafka comes with some additional complexities that go beyond traditional architectures and which make observability an even more important topic. Understanding where the bottlenecks are in messaging and stream-based architectures can be tough. This is why you need a comprehensive observability solution with machine learning to help you.
In this blog, we will explore how to get Kafka applications instrumented with Elastic APM, how to collect performance data with JMX, and how you can use the Elasticsearch Platform to pull in data from Confluent Cloud — which is by far the easiest and most cost-effective way to implement Kafka architectures.
For this blog post, we will be following the code at this git repository. There are three services here that are designed to run on two clouds and push data from one cloud to the other and finally into Google BigQuery. We want to monitor all of this using Elastic Observability to give you a complete picture of Confluent and Kafka Services performance as a teaser — this is the goal below:
A look at the architecture
As mentioned, we have three multi-cloud services implemented in our example application.
The first service is a Spring WebFlux service that runs inside AWS EKS. This service will take a message from a REST Endpoint and simply put it straight on to a Kafka topic.
The second service, which is also a Spring WebFlux service hosted inside Google Cloud Platform (GCP) with its Google Cloud monitoring, will then pick this up and forward it to another service that will put the message into BigQuery.
These services are all instrumented using Elastic APM. For this blog, we have decided to use Spring config to inject and configure the APM agent. You could of course use the “-javaagent” argument to inject the agent instead if preferred.
Getting started with Elastic Observability and Confluent Cloud
Before we dive into the application and its configuration, you will want to get an Elastic Cloud and Confluent Cloud account. You can sign up here for Elastic and here for Confluent Cloud. There are some initial configuration steps we need to do inside Confluent Cloud, as you will need to create three topics: gcpTopic, myTopic, and topic_2.
When you sign up for Confluent Cloud, you will be given an option of what type of cluster to create. For this walk-through, a Basic cluster is fine (as shown) — if you are careful about usage, it will not cost you a penny.
Once you have a cluster, go ahead and create the three topics.
For this walk-through, you will only need to create single partition topics as shown below:
Now we are ready to set up the Elastic Cloud cluster.
One thing to note here is that when setting up an Elastic cluster, the defaults are mostly OK. With one minor tweak to add in the Machine Learning under “Advanced Settings,” add capacity for machine learning here.
Getting APM up and running
The first thing we want to do here is get our Spring Boot Webflux-based services up and running. For this blog, I have decided to implement this using the Spring Configuration, as you can see below. For brevity, I have not listed all the JMX configuration information, but you can see those details in GitHub.
package com.elastic.multicloud;
import co.elastic.apm.attach.ElasticApmAttacher;
import jakarta.annotation.PostConstruct;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Setter
@Configuration
@ConfigurationProperties(prefix = "elastic.apm")
@ConditionalOnProperty(value = "elastic.apm.enabled", havingValue = "true")
public class ElasticApmConfig {
private static final String SERVER_URL_KEY = "server_url";
private String serverUrl;
private static final String SERVICE_NAME_KEY = "service_name";
private String serviceName;
private static final String SECRET_TOKEN_KEY = "secret_token";
private String secretToken;
private static final String ENVIRONMENT_KEY = "environment";
private String environment;
private static final String APPLICATION_PACKAGES_KEY = "application_packages";
private String applicationPackages;
private static final String LOG_LEVEL_KEY = "log_level";
private String logLevel;
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticApmConfig.class);
@PostConstruct
public void init() {
LOGGER.info(environment);
Map<String, String> apmProps = new HashMap<>(6);
apmProps.put(SERVER_URL_KEY, serverUrl);
apmProps.put(SERVICE_NAME_KEY, serviceName);
apmProps.put(SECRET_TOKEN_KEY, secretToken);
apmProps.put(ENVIRONMENT_KEY, environment);
apmProps.put(APPLICATION_PACKAGES_KEY, applicationPackages);
apmProps.put(LOG_LEVEL_KEY, logLevel);
apmProps.put("enable_experimental_instrumentations","true");
apmProps.put("capture_jmx_metrics","object_name[kafka.producer:type=producer-metrics,client-id=*] attribute[batch-size-avg:metric_name=kafka.producer.batch-size-avg]");
ElasticApmAttacher.attach(apmProps);
}
}
Now obviously this requires some dependencies, which you can see here in the Maven pom.xml.
<dependency>
<groupId>co.elastic.apm</groupId>
<artifactId>apm-agent-attach</artifactId>
<version>1.35.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>co.elastic.apm</groupId>
<artifactId>apm-agent-api</artifactId>
<version>1.35.1-SNAPSHOT</version>
</dependency>
Strictly speaking, the agent-api is not required, but it could be useful if you have a desire to add your own monitoring code (as per the example below). The agent will happily auto-instrument without needing to do that though.
Transaction transaction = ElasticApm.currentTransaction();
Span span = ElasticApm.currentSpan()
.startSpan("external", "kafka", null)
.setName("DAVID").setServiceTarget("kafka","gcp-elastic-apm-spring-boot-integration");
try (final Scope scope = transaction.activate()) {
span.injectTraceHeaders((name, value) -> producerRecord.headers().add(name,value.getBytes()));
return Mono.fromRunnable(() -> {
kafkaTemplate.send(producerRecord);
});
} catch (Exception e) {
span.captureException(e);
throw e;
} finally {
span.end();
}
Now we have enough code to get our agent bootstrapped.
To get the code from the GitHub repository up and running, you will need the following installed on your system and to ensure that you have the credentials for your GCP and AWS cloud.
Java
Maven
Docker
Kubernetes CLI (kubectl)
Clone the project
Clone the multi-cloud Spring project to your local machine.
git clone https://github.com/davidgeorgehope/multi-cloud
Build the project
From each service in the project (aws-multi-cloud, gcp-multi-cloud, gcp-bigdata-consumer-multi-cloud), run the following commands to build the project.
mvn clean install
Now you can run the Java project locally.
java -jar gcp-bigdata-consumer-multi-cloud-0.0.1-SNAPSHOT.jar --spring.config.location=/Users/davidhope/applicaiton-gcp.properties
That will just get the Java application running locally, but you can also deploy this to Kubernetes using EKS and GKE as shown below.
Create a Docker image
Create a Docker image from the built project using the dockerBuild.sh provided in the project. You may want to customize this shell script to upload the built docker image to your own docker repository.
./dockerBuild.sh
Create a namespace for each service
kubectl create namespace aws
kubectl create namespace gcp-1
kubectl create namespace gcp-2
Once you have the namespaces created, you can switch context using the following command:
kubectl config set-context --current --namespace=my-namespace
Configuration for each service
Each service needs an application.properties file. I have put an example here.
You will need to replace the following properties with those you find in Elastic.
elastic.apm.server-url=
elastic.apm.secret-token=
These can be found by going into Elastic Cloud and clicking on Services inside APM and then Add Data , which should be visible in the top right corner.
From there you will see the following, which gives you the config information you need.
You will need to replace the following properties with those you find in Confluent Cloud.
elastic.kafka.producer.sasl-jaas-config=
This configuration comes from the Clients page in Confluent Cloud.
Adding the config for each service in Kubernetes
Once you have a fully configured application properties, you need to add it to your Kubernetes environment as below.
From the aws namespace.
kubectl create secret generic my-app-config --from-file=application.properties
From the gcp-1 namespace.
kubectl create secret generic my-app-config --from-file=application.properties
From the gcp-2 namespace.
kubectl create secret generic bigdata-creds --from-file=elastic-product-marketing-e145e13fbc7c.json
kubectl create secret generic my-app-config-gcp-bigdata --from-file=application.properties
Create a Kubernetes deployment
Create a Kubernetes deployment YAML file and add your Docker image to it. You can use the deployment.yaml file provided in the project as a template. Make sure to update the image name in the file to match the name of the Docker image you just created.
kubectl apply -f deployment.yaml
Create a Kubernetes service
Create a Kubernetes service YAML file and add your deployment to it. You can use the service.yaml file provided in the project as a template.
kubectl apply -f service.yaml
Access your application
Your application is now running in a Kubernetes cluster. To access it, you can use the service's cluster IP and port. You can get the service's IP and port using the following command.
kubectl get services
Now once you know where the service is, you need to execute it!
You can regularly poke the service endpoint using the following command.
curl -X POST -H "Content-Type: application/json" -d '{"name": "linuxize", "email": "[email protected]"}' https://127.0.0.1:8080/api/my-objects/publish
With this up and running, you should see the following service map build out in the Elastic APM product.
And traces will contain a waterfall graph showing all the spans that have executed across this distributed application, allowing you to pinpoint where any issues are within each transaction.
JMX for Kafka Producer/Consumer metrics
In the previous part of this blog, we briefly touched on the JMX metric configuration you can see below.
"capture_jmx_metrics","object_name[kafka.producer:type=producer-metrics,client-id=*] attribute[batch-size-avg:metric_name=kafka.producer.batch-size-avg]"
We can use this “capture_jmx_metrics” configuration to configure JMX for any Kafka Producer/Consumer metrics we want to monitor.
Check out the documentation here to understand how to configure this and here to see the available JMX metrics you can monitor. In the example code in GitHub, we actually pull all the available metrics in, so you can check in there how to configure this.
One thing that’s worth pointing out here is that it’s important to use the “metric_name” property shown above or it gets quite difficult to find the metrics in Elastic Discover without being specific here.
Monitoring Confluent Cloud with Elastic Observability
So we now have some good monitoring set up for Kafka Producers and Consumers and we can trace transactions between services down to the lines of code that are executing. The core part of our Kafka infrastructure is hosted in Confluent Cloud. How, then, do we get data from there into our full stack observability solution?
Luckily, Confluent has done a fantastic job of making this easy. It provides important Confluent Cloud metrics via an open Prometheus-based metrics URL. So let's get down to business and configure this to bring data into our observability tool.
The first step is to configure Confluent Cloud with the MetricsViewer. The MetricsViewer role provides service account access to the Metrics API for all clusters in an organization. This role also enables service accounts to import metrics into third-party metrics platforms.
To assign the MetricsViewer role to a new service account:
- In the top-right administration menu (☰) in the upper-right corner of the Confluent Cloud user interface, click ADMINISTRATION > Cloud API keys.
- Click Add key.
- Click the Granular access tile to set the scope for the API key. Click Next.
- Click Create a new one and specify the service account name. Optionally, add a description. Click Next.
- The API key and secret are generated for the service account. You will need this API key and secret to connect to the cluster, so be sure to safely store this information. Click Save. The new service account with the API key and associated ACLs is created. When you return to the API access tab, you can view the newly-created API key to confirm.
- Return to Accounts & access in the administration menu, and in the Accounts tab, click Service accounts to view your service accounts.
- Select the service account that you want to assign the MetricsViewer role to.
- In the service account’s details page, click Access.
- In the tree view, open the resource where you want the service account to have the MetricsViewer role.
- Click Add role assignment and select the MetricsViewer tile. Click Save.
Next we can head to Elastic Observability and configure the Prometheus integration to pull in the metrics data.
Go to the integrations page in Kibana.
Find the Prometheus integration. We are using the Prometheus integration because the Confluent Cloud metrics server can provide data in prometheus format. Trust us, this works really well — good work Confluent!
Add Prometheus in the next page.
Configure the Prometheus plugin in the following way: In the hosts box, add the following URL, replacing the resource kafka id with the cluster id you want to monitor.
https://api.telemetry.confluent.cloud:443/v2/metrics/cloud/export?resource.kafka.id=lkc-3rw3gw
Add the username and password under the advanced options you got from the API keys step you executed against Confluent Cloud above.
Once the Integration is created, the policy needs to be applied to an instance of a running Elastic Agent.
That’s it! It’s that easy to get all the data you need for a full stack observability monitoring solution.
Finally, let’s pull all this together in a dashboard.
Pulling it all together
Using Kibana to generate dashboards is super easy. If you configured everything the way we recommended above, you should find the metrics (producer/consumer/brokers) you need to create your own dashboard as per the following screenshot.
Luckily, I made a dashboard for you and stored it in GitHub. Take a look below and use this to import it into your own environments.
Adding the icing on the cake: machine learning anomaly detection
Now that we have all the critical bits in place, we are going to add the icing on the cake: machine learning (ML)!
Within Kibana, let's head over to the Machine Learning tab in “Analytics.”
Go to the jobs page, where we’ll get started creating our first anomaly detection job.
The metrics data view contains what we need to create this new anomaly detection job.
Use the wizard and select a “Single Metric.”
Use the full data.
In this example, we are going to look for anomalies in the connection count. We really do not want a major deviation here, as this could indicate something very bad occurring if we suddenly have too many or too few things connecting to our Kafka cluster.
Once you have selected the connection count metric, you can proceed through the wizard and eventually your ML job will be created and you should be able to view the data as per the example below.
Congratulations, you have now created a machine learning job to alert you if there are any problems with your Kafka cluster, adding a full AIOps solution to your Kafka and Confluent observability!
Summary
We looked at monitoring Kafka-based solutions implemented on Confluent Cloud using Elastic Observability.
We covered the architecture of a multi-cloud solution involving AWS EKS, Confluent Cloud, and GCP GKE. We looked at how to instrument Kafka applications with Elastic APM, use JMX for Kafka Producer/Consumer metrics, integrate Prometheus, and set up machine learning anomaly detection.
We went through a detailed walk-through with code snippets, configuration steps, and deployment instructions included to help you get started.
Interested in learning more about Elastic Observability? Check out the following resources:
- An Introduction to Elastic Observability
- Observability Fundamentals Training
- Watch an Elastic Observability demo
- Observability Predictions and Trends for 2023
And sign up for our Elastic Observability Trends Webinar featuring AWS and Forrester, not to be missed!