Monitor your Python data pipelines with OTEL

Learn how to configure OTEL for your data pipelines, detect any anomalies, analyze performance, and set up corresponding alerts with Elastic.

Monitor your Python data pipelines with OTEL

This article delves into how to implement observability practices, particularly using OpenTelemetry (OTEL) in Python, to enhance the monitoring and quality control of data pipelines using Elastic. While the primary focus of the examples presented in the article is ETL (Extract, Transform, Load) processes to ensure the accuracy and reliability of data pipelines that is crucial for Business Intelligence (BI), the strategies and tools discussed are equally applicable to Python processes used for Machine Learning (ML) models or other data processing tasks.

Introduction

Data pipelines, particularly ETL processes, form the backbone of modern data architectures. These pipelines are responsible for extracting raw data from various sources, transforming it into meaningful information, and loading it into data warehouses or data lakes for analysis and reporting.

In our organization, we have Python-based ETL scripts that play a pivotal role in exporting and processing data from Elasticsearch (ES) clusters and loading it into Google BigQuery (BQ). This processed data then feeds into DBT (Data Build Tool) models, which further refine the data and make it available for analytics and reporting. To see the full architecture and learn how we monitor our DBT pipelines with Elastic see Monitor your DBT pipelines with Elastic Observability. In this article we focus on the ETL scripts. Given the critical nature of these scripts, it is imperative to set up mechanisms to control and ensure the quality of the data they generate.

The strategies discussed here can be extended to any script or application that handles data processing or machine learning models, regardless of the programming language used as long as there exists a corresponding agent that supports OTEL instrumentation.

Motivation

Observability in data pipelines involves monitoring the entire lifecycle of data processing to ensure that everything works as expected. It includes:

  1. Data Quality Control:
  • Detecting anomalies in the data, such as unexpected drops in record counts.
  • Verifying that data transformations are applied correctly and consistently.
  • Ensuring the integrity and accuracy of the data loaded into the data warehouse.
  1. Performance Monitoring:
  • Tracking the execution time of ETL scripts to identify bottlenecks and optimize performance.
  • Monitoring resource usage, such as memory and CPU consumption, to ensure efficient use of infrastructure.
  1. Real-time Alerting:
  • Setting up alerts for immediate notification of issues such as failed ETL jobs, data quality issues, or performance degradation.
  • Identify the root case of such incidents
  • Proactively addressing incidents to minimize downtime and impact on business operations

Issues such as failed ETL jobs, can even point to larger infrastructure or data source data quality issues.

Steps for Instrumentation

Here are the steps to automatically instrument your Python script for exporting OTEL traces, metrics, and logs.

Step 1: Import Required Libraries

We first need to install the following libraries.

pip install elastic-opentelemetry google-cloud-bigquery[opentelemetry]

You can also them to your project's

requirements.txt
file and install them with
pip install -r requirements.txt
.

Explanation of Dependencies

  1. elastic-opentelemetry: This package is the Elastic Distribution for OpenTelemetry Python. Under the hood it will install the following packages:

    • opentelemetry-distro: This package is a convenience distribution of OpenTelemetry, which includes the OpenTelemetry SDK, APIs, and various instrumentation packages. It simplifies the setup and configuration of OpenTelemetry in your application.

    • opentelemetry-exporter-otlp: This package provides an exporter that sends telemetry data to the OpenTelemetry Collector or any other endpoint that supports the OpenTelemetry Protocol (OTLP). This includes traces, metrics, and logs.

    • opentelemetry-instrumentation-system-metrics: This package provides instrumentation for collecting system metrics, such as CPU usage, memory usage, and other system-level metrics.

  2. google-cloud-bigquery[opentelemetry]: This package integrates Google Cloud BigQuery with OpenTelemetry, allowing you to trace and monitor BigQuery operations.

Step 2: Export OTEL Variables

Set the necessary OpenTelemetry (OTEL) variables by getting the configuration from APM OTEL from Elastic.

Go to APM -> Services -> Add data (top left corner).

In this section you will find the steps how to configure various APM agents. Navigate to OpenTelemetry to find the variables that you need to export.

Find OTLP Endpoint:

  • Look for the section related to OpenTelemetry or OTLP configuration.
  • The
    OTEL_EXPORTER_OTLP_ENDPOINT
    is typically provided as part of the setup instructions for integrating OpenTelemetry with Elastic APM. It might look something like
    https://<your-apm-server>/otlp
    .

Obtain OTLP Headers:

  • In the same section, you should find instructions or a field for OTLP headers. These headers are often used for authentication purposes.
  • Copy the necessary headers provided by the interface. They might look like
    Authorization: Bearer <your-token>
    .

Note: Notice you need to replace the whitespace between

Bearer
and your token with
%20
in the
OTEL_EXPORTER_OTLP_HEADERS
variable when using Python.

Alternatively you can use a different approach for authentication using API keys (see instructions). If you are using our serverless offering you will need to use this approach instead.

Set up the variables:

  • Replace the placeholders in your script with the actual values obtained from the Elastic APM interface and execute it in your shell via the source command
    source env.sh
    .

Below is a script to set these variables:

#!/bin/bash
echo "--- :otel: Setting OTEL variables"
export OTEL_EXPORTER_OTLP_ENDPOINT='https://your-apm-server/otlp:443'
export OTEL_EXPORTER_OTLP_HEADERS='Authorization=Bearer%20your-token'
export OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true
export OTEL_PYTHON_LOG_CORRELATION=true
export ELASTIC_OTEL_SYSTEM_METRICS_ENABLED=true
export OTEL_METRIC_EXPORT_INTERVAL=5000
export OTEL_LOGS_EXPORTER="otlp,console"

With these variables set, we are ready for auto-instrumentation without needing to add anything to the code.

Explanation of Variables

  • OTEL_EXPORTER_OTLP_ENDPOINT: This variable specifies the endpoint to which OTLP data (traces, metrics, logs) will be sent. Replace

    placeholder
    with your actual OTLP endpoint.

  • OTEL_EXPORTER_OTLP_HEADERS: This variable specifies any headers required for authentication or other purposes when sending OTLP data. Replace

    placeholder
    with your actual OTLP headers.

  • OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED: This variable enables auto-instrumentation for logging in Python, allowing logs to be automatically enriched with trace context.

  • OTEL_PYTHON_LOG_CORRELATION: This variable enables log correlation, which includes trace context in log entries to correlate logs with traces.

  • OTEL_METRIC_EXPORT_INTERVAL: This variable specifies the metric export interval in milliseconds, in this case 5s.

  • OTEL_LOGS_EXPORTER: This variable specifies the exporter to use for logs. Setting it to "otlp" means that logs will be exported using the OTLP protocol. Adding "console" specifies that logs should be exported to both the OTLP endpoint and the console. In our case for better visibility on the infa side, we choose to export to console as well.

  • ELASTIC_OTEL_SYSTEM_METRICS_ENABLED: It is needed to use this variable when using the Elastic distribution as by default it is set to false.

Note: OTEL_METRICS_EXPORTER and OTEL_TRACES_EXPORTER: This variables specify the exporter to use for metrics/traces, and are set to "otlp" by default, which means that metrics and traces will be exported using the OTLP protocol.

Running Python ETLs

We run Python ETLs with the following command:

OTEL_RESOURCE_ATTRIBUTES="service.name=x-ETL,service.version=1.0,deployment.environment=production" && opentelemetry-instrument python3 X_ETL.py 

Explanation of the Command

  • OTEL_RESOURCE_ATTRIBUTES: This variable specifies additional resource attributes, such as service name, service version and deployment environment, that will be included in all telemetry data, you can customize these values per your needs. You can use a different service name for each script.

  • opentelemetry-instrument: This command auto-instruments the specified Python script for OpenTelemetry. It sets up the necessary hooks to collect traces, metrics, and logs.

  • python3 X_ETL.py: This runs the specified Python script (

    X_ETL.py
    ).

Tracing

We export the traces via the default OTLP protocol.

Tracing is a key aspect of monitoring and understanding the performance of applications. Spans form the building blocks of tracing. They encapsulate detailed information about the execution of specific code paths. They record the start and end times of activities and can have hierarchical relationships with other spans, forming a parent/child structure.

Spans include essential attributes such as transaction IDs, parent IDs, start times, durations, names, types, subtypes, and actions. Additionally, spans may contain stack traces, which provide a detailed view of function calls, including attributes like function name, file path, and line number, which is especially useful for debugging. These attributes help us analyze the script's execution flow, identify performance issues, and enhance optimization efforts.

With the default instrumentation, the whole Python script would be a single span. In our case we have decided to manually add specific spans per the different phases of the Python process, to be able to measure their latency, throughput, error rate, etc individually. This is how we define spans manually:

from opentelemetry import trace

if __name__ == "__main__":

    tracer = trace.get_tracer("main")
    with tracer.start_as_current_span("initialization") as span:
            # Init code
    with tracer.start_as_current_span("search") as span:
            # Step 1 - Search code
   with tracer.start_as_current_span("transform") as span:
           # Step 2 - Transform code
   with tracer.start_as_current_span("load") as span:
           # Step 3 - Load code

You can explore traces in the APM interface as shown below.

Metrics

We export metrics via the default OTLP protocol as well, such as CPU usage and memory. No extra code needs to be added in the script itself.

Note: Remember to set

ELASTIC_OTEL_SYSTEM_METRICS_ENABLED
to true.

Logging

We export logs via the default OTLP protocol as well.

For logging, we modify the logging calls to add extra fields using a dictionary structure (bq_fields) as shown below:

        job.result()  # Waits for table load to complete
        job_details = client.get_job(job.job_id)  # Get job details

        # Extract job information
        bq_fields = {
            # "slot_time_ms": job_details.slot_ms,
            "job_id": job_details.job_id,
            "job_type": job_details.job_type,
            "state": job_details.state,
            "path": job_details.path,
            "job_created": job_details.created.isoformat(),
            "job_ended": job_details.ended.isoformat(),
            "execution_time_ms": (
                job_details.ended - job_details.created
            ).total_seconds()
            * 1000,
            "bytes_processed": job_details.output_bytes,
            "rows_affected": job_details.output_rows,
            "destination_table": job_details.destination.table_id,
            "event": "BigQuery Load Job", # Custom event type
            "status": "success", # Status of the step (success/error)
            "category": category # ETL category tag 
        }

        logging.info("BigQuery load operation successful", extra=bq_fields)

This code shows how to extract BQ job stats, execution time, bytes processed, rows affected and destination table among them. You can add other metadata like we do such as custom event type, status, and category.

Any calls to logging (of all levels above the set threshold, in this case INFO

logging.getLogger().setLevel(logging.INFO)
) will create a log that will be exported to Elastic. This means that in Python scripts that already use
logging
there is no need to make any changes to export logs to Elastic.

For each of the log messages, you can go into the details view (click on the

when you hover over the log line and go into
View details
) to examine the metadata attached to the log message. You can also explore the logs in Discover.

Explanation of Logging Modification

  • logging.info: This logs an informational message. The message "BigQuery load operation successful" is logged.

  • extra=bq_fields: This adds additional context to the log entry using the

    bq_fields
    dictionary. This context can include details making the log entries more informative and easier to analyze. This data will be later used to set up alerts and data anomaly detection jobs.

Monitoring in Elastic's APM

As shown, we can examine traces, metrics, and logs in the APM interface. To make the most out of this data, we make use on top of nearly the whole suit of features in Elastic Observability alongside Elastic Analytic's ML capabilities.

Rules and Alerts

We can set up rules and alerts to detect anomalies, errors, and performance issues in our scripts.

The

rule is used to create a trigger when the number of errors in a service exceeds a defined threshold.

To create the rule go to Alerts and Insights -> Rules -> Create Rule -> Error count threshold, set the error count threshold, the service or environment you want to monitor (you can also set an error grouping key across services), how often to run the check, and choose a connector.

Next, we create a rule of type

custom threshold
on a given ETL logs data view (create one for your index) filtering on "labels.status: error" to get all the logs with status error from any of the steps of the ETL which have failed. The rule condition is set to document count > 0. In our case, in the last section of the rule config, we also set up Slack alerts every time the rule is activated. You can pick from a long list of connectors Elastic supports.

Then we can set up alerts for failures. We add status to the logs metadata as shown in the code sample below for each of the steps in the ETLs. It then becomes available in ES via

labels.status
.

logging.info(
            "Elasticsearch search operation successful",
            extra={
                "event": "Elasticsearch Search",
                "status": "success",
                "category": category,
                "index": index,
            },
        )

More Rules

We could also add rules to detect anomalies in the execution time of the different spans we define. This is done by selecting transaction/span -> Alerts and rules -> Custom threshold rule -> Latency. In the example below, we want to generate an alert whenever the search step takes more than 25s.

Alternatively, for finer-grained control, you can go with Alerts and rules -> Anomaly rule, set up an anomaly job, and pick a threshold severity level.

Anomaly detection job

In this example we set an anomaly detection job on the number of documents before transform.

We set up an Anomaly Detection jobs on the number of document before the transform using the [Single metric job] (https://elastic.ac.cn/guide/en/machine-learning/current/ml-anomaly-detection-job-types.html#multi-metric-jobs) to detect any anomalies with the incoming data source.

In the last step, you can create alerting similarly to what we did before to receive alerts whenever there is an anomaly detected, by setting up a severity level threshold. Using the anomaly score which is assigned to every anomaly, every anomaly is characterized by a severity level.

Similarly to the previous example, we set up a Slack connector to receive alerts whenever an anomaly is detected.

You can go to your custom dashboard by going to Add Panel -> ML -> Anomaly Swim Lane -> Pick your job.

Similarly, we add jobs for the number of documents after the transform, and a Multi-Metric one on the

execution_time_ms
,
bytes_processed
and
rows_affected
similarly to how it was done in Monitor your DBT pipelines with Elastic Observability.

Custom Dashboard

Now that your logs, metrics, and traces are in Elastic, you can use the full potential of our Kibana dashboards to extract the most from them. We can create a custom dashboard like the following one: a pie chart based on

labels.event
(category field for every type of step in the ETLs), a chart for every type of step broken down by status, a timeline of steps broken down by status, BQ stats for the ETL, and anomaly detection swim lane panels for the various anomaly jobs.

Conclusion

Elastic’s APM, in combination with other Observability and ML features, provides a unified view of our data pipelines, allowing us to bring a lot of value with minimal code changes:

  • Logging of new logs (no need to add custom logging) alongside their execution context
  • Monitor the runtime behavior of our models
  • Track data quality issues
  • Identify and troubleshoot real-time incidents
  • Optimize performance bottlenecks and resource usage
  • Identify dependencies on other services and their latency
  • Optimize data transformation processes
  • Set up alerts on latency, data quality issues, error rates of transactions or CPU usage)

With these capabilities, we can ensure the resilience and reliability of our data pipelines, leading to more robust and accurate BI system and reporting.

In conclusion, setting up OpenTelemetry (OTEL) in Python for data pipeline observability has significantly improved our ability to monitor, detect, and resolve issues proactively. This has led to more reliable data transformations, better resource management, and enhanced overall performance of our data transformation, BI and Machine Learning systems.

Share this article