使用 OTEL 监控您的 Python 数据管道

了解如何为您的数据管道配置 OTEL,检测任何异常,分析性能,并使用 Elastic 设置相应的警报。

Monitor your Python data pipelines with OTEL

本文深入探讨了如何实施可观测性实践,特别是如何在 Python 中使用 OpenTelemetry (OTEL),以增强使用 Elastic 的数据管道的监控和质量控制。虽然本文中介绍的示例主要侧重于 ETL(提取、转换、加载)流程,以确保数据管道的准确性和可靠性,这对商业智能 (BI) 至关重要,但所讨论的策略和工具同样适用于用于机器学习 (ML) 模型或其他数据处理任务的 Python 流程。

引言

数据管道,特别是 ETL 流程,构成了现代数据架构的支柱。这些管道负责从各种来源提取原始数据,将其转换为有意义的信息,并将其加载到数据仓库或数据湖中以进行分析和报告。

在我们的组织中,我们有基于 Python 的 ETL 脚本,这些脚本在从 Elasticsearch (ES) 集群导出和处理数据并将其加载到 Google BigQuery (BQ) 中发挥着关键作用。然后,此处理后的数据将馈送到 DBT (Data Build Tool) 模型,这些模型会进一步细化数据并使其可用于分析和报告。要查看完整的架构并了解我们如何使用 Elastic 监控我们的 DBT 管道,请参阅 使用 Elastic Observability 监控您的 DBT 管道。在本文中,我们将重点关注 ETL 脚本。鉴于这些脚本的性质至关重要,必须建立机制来控制和确保它们生成的数据的质量。

这里讨论的策略可以扩展到任何处理数据处理或机器学习模型的脚本或应用程序,无论使用的编程语言是什么,只要存在支持 OTEL 仪表化的相应代理即可。

动机

数据管道中的可观测性涉及监控数据处理的整个生命周期,以确保一切按预期工作。它包括:

  1. 数据质量控制
  • 检测数据中的异常,例如记录计数意外下降。
  • 验证数据转换是否正确且一致地应用。
  • 确保加载到数据仓库中的数据的完整性和准确性。
  1. 性能监控
  • 跟踪 ETL 脚本的执行时间,以识别瓶颈并优化性能。
  • 监控资源使用情况,例如内存和 CPU 消耗,以确保基础设施的有效利用。
  1. 实时警报
  • 设置警报以立即通知问题,例如 ETL 作业失败、数据质量问题或性能下降。
  • 确定此类事件的根本原因
  • 主动解决事件,以最大限度地减少停机时间和对业务运营的影响

诸如 ETL 作业失败之类的问题甚至可能指向更大的基础设施或数据源数据质量问题。

仪表化的步骤

以下是自动检测您的 Python 脚本以导出 OTEL 跟踪、指标和日志的步骤。

步骤 1:导入所需的库

我们首先需要安装以下库。

pip install elastic-opentelemetry google-cloud-bigquery[opentelemetry]

您也可以将它们添加到您的项目的

requirements.txt
文件中,并使用以下命令安装它们:
pip install -r requirements.txt
.

依赖项说明

  1. elastic-opentelemetry:此软件包是 Elastic OpenTelemetry Python 发行版。在后台,它将安装以下软件包:

    • opentelemetry-distro:此软件包是 OpenTelemetry 的便利发行版,其中包含 OpenTelemetry SDK、API 和各种检测软件包。它简化了应用程序中 OpenTelemetry 的设置和配置。

    • opentelemetry-exporter-otlp:此软件包提供一个导出器,用于将遥测数据发送到 OpenTelemetry 收集器或任何其他支持 OpenTelemetry 协议 (OTLP) 的端点。这包括跟踪、指标和日志。

    • opentelemetry-instrumentation-system-metrics:此软件包提供用于收集系统指标(例如 CPU 使用率、内存使用率和其他系统级指标)的检测。

  2. google-cloud-bigquery[opentelemetry]:此软件包将 Google Cloud BigQuery 与 OpenTelemetry 集成,允许您跟踪和监控 BigQuery 操作。

步骤 2:导出 OTEL 变量

通过从 Elastic 的 APM OTEL 获取配置,设置必要的 OpenTelemetry (OTEL) 变量。

转到“APM”->“服务”->“添加数据”(左上角)。

在此部分,您将找到有关如何配置各种 APM 代理的步骤。导航到 OpenTelemetry 以查找您需要导出的变量。

查找 OTLP 端点:

  • 查找与 OpenTelemetry 或 OTLP 配置相关的部分。
  • OTEL_EXPORTER_OTLP_ENDPOINT
    通常作为将 OpenTelemetry 与 Elastic APM 集成的设置说明的一部分提供。它可能看起来像
    https://<您的-apm-服务器>/otlp
    .

获取 OTLP 标头:

  • 在同一部分中,您应该找到 OTLP 标头的说明或字段。这些标头通常用于身份验证目的。
  • 复制界面提供的必要标头。它们可能看起来像
    Authorization: Bearer <您的-令牌>
    .

注意:请注意,您需要替换之间的空格

Bearer
和您的令牌,替换为
%20
OTEL_EXPORTER_OTLP_HEADERS
使用 Python 时的变量。

或者,您可以使用不同的方法使用 API 密钥进行身份验证(请参阅 说明)。如果您正在使用我们的 无服务器产品,则需要改为使用此方法。

设置变量:

  • 将脚本中的占位符替换为从 Elastic APM 界面获得的实际值,并通过 source 命令在 shell 中执行它
    source env.sh
    .

以下是设置这些变量的脚本

#!/bin/bash
echo "--- :otel: Setting OTEL variables"
export OTEL_EXPORTER_OTLP_ENDPOINT='https://your-apm-server/otlp:443'
export OTEL_EXPORTER_OTLP_HEADERS='Authorization=Bearer%20your-token'
export OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true
export OTEL_PYTHON_LOG_CORRELATION=true
export ELASTIC_OTEL_SYSTEM_METRICS_ENABLED=true
export OTEL_METRIC_EXPORT_INTERVAL=5000
export OTEL_LOGS_EXPORTER="otlp,console"

设置这些变量后,我们就可以进行自动检测,而无需向代码中添加任何内容。

变量说明

  • OTEL_EXPORTER_OTLP_ENDPOINT:此变量指定 OTLP 数据(跟踪、指标、日志)将发送到的端点。替换

    占位符
    使用您的实际 OTLP 端点。

  • OTEL_EXPORTER_OTLP_HEADERS:此变量指定发送 OTLP 数据时身份验证或其他目的所需的任何标头。替换

    占位符
    使用您实际的 OTLP 标头。

  • OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED: 此变量启用 Python 中日志的自动检测,允许自动使用跟踪上下文丰富日志。

  • OTEL_PYTHON_LOG_CORRELATION: 此变量启用日志关联,其中包括日志条目中的跟踪上下文,以便将日志与跟踪相关联。

  • OTEL_METRIC_EXPORT_INTERVAL: 此变量指定指标导出间隔,以毫秒为单位,在此例中为 5 秒。

  • OTEL_LOGS_EXPORTER: 此变量指定用于日志的导出器。将其设置为 “otlp” 意味着将使用 OTLP 协议导出日志。添加 “console” 指定日志应导出到 OTLP 端点和控制台。在我们的案例中,为了更好地在 infa 端实现可见性,我们选择也导出到控制台。

  • ELASTIC_OTEL_SYSTEM_METRICS_ENABLED: 当使用 Elastic 分发时,需要使用此变量,因为默认情况下它设置为 false。

注意:OTEL_METRICS_EXPORTEROTEL_TRACES_EXPORTER:这些变量指定用于指标/跟踪的导出器,默认设置为 “otlp”,这意味着指标和跟踪将使用 OTLP 协议导出。

运行 Python ETL

我们使用以下命令运行 Python ETL

OTEL_RESOURCE_ATTRIBUTES="service.name=x-ETL,service.version=1.0,deployment.environment=production" && opentelemetry-instrument python3 X_ETL.py 

命令解释

  • OTEL_RESOURCE_ATTRIBUTES: 此变量指定其他资源属性,例如服务名称、服务版本和部署环境,这些属性将包含在所有遥测数据中,您可以根据需要自定义这些值。您可以为每个脚本使用不同的服务名称。

  • opentelemetry-instrument: 此命令为 OpenTelemetry 自动检测指定的 Python 脚本。它设置必要的钩子来收集跟踪、指标和日志。

  • python3 X_ETL.py: 这将运行指定的 Python 脚本 (

    X_ETL.py
    ).

跟踪

我们通过默认的 OTLP 协议导出跟踪。

跟踪是监控和了解应用程序性能的关键方面。跨度构成了跟踪的构建块。它们封装了有关特定代码路径执行的详细信息。它们记录活动的开始时间和结束时间,并且可以与其他跨度具有层次关系,形成父/子结构。

跨度包括诸如事务 ID、父 ID、开始时间、持续时间、名称、类型、子类型和操作等基本属性。此外,跨度可能包含堆栈跟踪,该跟踪提供了函数调用的详细视图,包括诸如函数名称、文件路径和行号之类的属性,这对于调试特别有用。这些属性有助于我们分析脚本的执行流程、识别性能问题并加强优化工作。

使用默认检测,整个 Python 脚本将是一个单独的跨度。在我们的案例中,我们决定手动为 Python 流程的不同阶段添加特定的跨度,以便能够单独衡量它们的延迟、吞吐量、错误率等。这是我们手动定义跨度的方式

from opentelemetry import trace

if __name__ == "__main__":

    tracer = trace.get_tracer("main")
    with tracer.start_as_current_span("initialization") as span:
            # Init code
    with tracer.start_as_current_span("search") as span:
            # Step 1 - Search code
   with tracer.start_as_current_span("transform") as span:
           # Step 2 - Transform code
   with tracer.start_as_current_span("load") as span:
           # Step 3 - Load code

您可以在 APM 界面中浏览跟踪,如下所示。

指标

我们也通过默认的 OTLP 协议导出指标,例如 CPU 使用率和内存。无需在脚本本身中添加额外的代码。

注意:请记住设置

ELASTIC_OTEL_SYSTEM_METRICS_ENABLED
为 true。

日志记录

我们也通过默认的 OTLP 协议导出日志。

对于日志记录,我们修改了日志调用,以使用字典结构 (bq_fields) 添加额外字段,如下所示

        job.result()  # Waits for table load to complete
        job_details = client.get_job(job.job_id)  # Get job details

        # Extract job information
        bq_fields = {
            # "slot_time_ms": job_details.slot_ms,
            "job_id": job_details.job_id,
            "job_type": job_details.job_type,
            "state": job_details.state,
            "path": job_details.path,
            "job_created": job_details.created.isoformat(),
            "job_ended": job_details.ended.isoformat(),
            "execution_time_ms": (
                job_details.ended - job_details.created
            ).total_seconds()
            * 1000,
            "bytes_processed": job_details.output_bytes,
            "rows_affected": job_details.output_rows,
            "destination_table": job_details.destination.table_id,
            "event": "BigQuery Load Job", # Custom event type
            "status": "success", # Status of the step (success/error)
            "category": category # ETL category tag 
        }

        logging.info("BigQuery load operation successful", extra=bq_fields)

此代码显示了如何提取 BQ 作业统计信息、执行时间、处理的字节数、受影响的行数和目标表等。您可以添加其他元数据,例如我们所做的自定义事件类型、状态和类别。

任何对日志记录的调用(高于设置阈值的所有级别,在本例中为 INFO

logging.getLogger().setLevel(logging.INFO)
)将创建一个日志,该日志将导出到 Elastic。这意味着在已经使用
logging
的 Python 脚本中,无需进行任何更改即可将日志导出到 Elastic。

对于每条日志消息,您可以进入详细信息视图(单击

当您将鼠标悬停在日志行上时,然后进入
查看详细信息
)以检查附加到日志消息的元数据。您还可以在发现中浏览日志。

日志修改解释

  • logging.info: 这将记录一条信息性消息。记录了消息 “BigQuery 加载操作成功”。

  • extra=bq_fields: 这使用

    bq_fields
    字典向日志条目添加其他上下文。此上下文可以包含详细信息,从而使日志条目更具信息性且更易于分析。此数据稍后将用于设置警报和数据异常检测作业。

Elastic APM 中的监控

如图所示,我们可以在 APM 界面中检查跟踪、指标和日志。为了充分利用这些数据,我们利用了 Elastic 可观测性中的几乎所有功能以及 Elastic Analytics 的 ML 功能。

规则和警报

我们可以设置规则和警报来检测脚本中的异常、错误和性能问题。

规则用于在服务中的错误数超过定义的阈值时创建触发器。

要创建规则,请转到“警报和见解”->“规则”->“创建规则”->“错误计数阈值”,设置错误计数阈值、您要监控的服务或环境(您还可以设置跨服务的错误分组键)、检查运行频率,然后选择连接器。

接下来,我们创建一个

自定义阈值
类型的规则,该规则在给定的 ETL 日志数据视图(为您的索引创建一个)上进行,该数据视图过滤 “labels.status: error” 以获取 ETL 中任何失败步骤的所有状态错误日志。规则条件设置为文档计数 > 0。在我们的案例中,在规则配置的最后一部分,我们还设置了 Slack 警报,每当规则被激活时。您可以从 Elastic 支持的连接器的列表中选择。

然后我们可以为故障设置警报。我们如下面的代码示例所示,为 ETL 中每个步骤的日志元数据添加状态。然后它通过以下方式在 ES 中可用

labels.status
.

logging.info(
            "Elasticsearch search operation successful",
            extra={
                "event": "Elasticsearch Search",
                "status": "success",
                "category": category,
                "index": index,
            },
        )

更多规则

我们还可以添加规则来检测我们定义的各个跨度执行时间中的异常情况。这是通过选择“事务/跨度”->“警报和规则”->“自定义阈值规则”->“延迟”来完成的。在下面的示例中,我们希望在搜索步骤花费超过 25 秒时生成警报。

或者,为了进行更精细的控制,您可以使用“警报和规则”->“异常规则”,设置异常作业,然后选择阈值严重级别。

异常检测作业

在此示例中,我们在转换之前的文档数量上设置了异常检测作业。

我们在使用 [单指标作业] 的转换之前,在文档数量上设置了一个异常检测作业 (https://elastic.ac.cn/guide/en/machine-learning/current/ml-anomaly-detection-job-types.html#multi-metric-jobs) 来检测传入数据源的任何异常。

在最后一步中,您可以通过设置严重级别阈值,创建类似于我们之前所做的警报,以在检测到异常时接收警报。使用分配给每个异常的异常评分,每个异常都以严重级别为特征。

与前面的示例类似,我们设置了一个 Slack 连接器,以便在检测到异常时接收警报。

您可以通过转到“添加面板”->“ML”->“异常泳道”->“选择您的作业”来转到您的自定义仪表板。

同样,我们为转换后的文档数量添加作业,并在

execution_time_ms
,
bytes_processed
rows_affected
上添加一个多指标作业,类似于在使用 Elastic 可观测性监控 DBT 管道中所做的那样。

自定义仪表板

现在,您的日志、指标和跟踪都已在 Elastic 中,您可以充分利用我们的 Kibana 仪表板来从中提取最多信息。我们可以创建一个如下所示的自定义仪表板:一个基于

labels.event
(ETL 中每种步骤的类别字段)、按状态细分的每种步骤的图表、按状态细分的步骤时间线、ETL 的 BQ 统计信息以及用于各种异常作业的异常检测泳道面板。

结论

Elastic 的 APM 与其他可观测性和 ML 功能相结合,提供了数据管道的统一视图,使我们能够以最少的代码更改带来很多价值

  • 新日志的日志记录(无需添加自定义日志记录)以及它们的执行上下文
  • 监控我们模型的运行时行为
  • 跟踪数据质量问题
  • 识别和排除实时事件故障
  • 优化性能瓶颈和资源使用
  • 识别对其他服务的依赖及其延迟
  • 优化数据转换过程
  • 设置关于延迟、数据质量问题、事务错误率或 CPU 使用率的警报)

借助这些功能,我们可以确保数据管道的弹性和可靠性,从而形成更强大、更准确的 BI 系统和报告。

总之,在 Python 中设置 OpenTelemetry (OTEL) 用于数据管道的可观察性,极大地提高了我们主动监控、检测和解决问题的能力。这带来了更可靠的数据转换、更好的资源管理以及我们数据转换、BI 和机器学习系统的整体性能提升。

分享这篇文章