Bahubali Shetti

使用 OpenTelemetry 手动检测 Python 应用程序

在这篇博文中,我们将向您展示如何使用 OpenTelemetry 手动检测 Python 应用程序。我们将探讨如何使用适当的 OpenTelemetry Python 库,特别是如何在 Python 应用程序中检测跟踪。

Manual instrumentation with OpenTelemetry for Python applications

DevOps 和 SRE 团队正在改变软件开发的过程。DevOps 工程师专注于高效的软件应用程序和服务交付,而 SRE 团队则致力于确保可靠性、可扩展性和性能。这些团队必须依赖全栈可观测性解决方案,以便他们能够管理和监控系统,并确保在问题影响业务之前得到解决。

现代分布式应用程序的整个堆栈的可观测性需要数据收集、处理和关联,通常以仪表板的形式呈现。要摄取所有系统数据,需要在堆栈、框架和提供商之间安装代理 — 对于必须处理版本更改、兼容性问题以及随着系统变化而无法扩展的专有代码的团队来说,这个过程可能具有挑战性且耗时。

得益于 OpenTelemetry (OTel),DevOps 和 SRE 团队现在有了一种收集和发送数据的标准方法,这种方法不依赖于专有代码,并且拥有庞大的支持社区,减少了供应商锁定。

之前的博文中,我们还回顾了如何使用OpenTelemetry 演示并将其连接到 Elastic®,以及 Elastic 在 OpenTelemetry 和 Kubernetes 方面的一些功能。

在这篇博文中,我们将展示如何使用OpenTelemetry 的手动检测以及我们名为 Elastiflix 的应用程序的 Python 服务。这种方法比使用自动检测稍微复杂一些。

这样做的好处是不需要 otel-collector!此设置使您能够根据最适合您业务的时间表,缓慢而轻松地将应用程序迁移到 OTel 和 Elastic。

应用程序、先决条件和配置

我们在这篇博文中使用的应用程序称为Elastiflix,这是一个电影流媒体应用程序。它由用 .NET、NodeJS、Go 和 Python 编写的多个微服务组成。

在检测我们的示例应用程序之前,我们首先需要了解 Elastic 如何接收遥测数据。

Elastic Observability 的所有 APM 功能都可通过 OTel 数据获得。其中一些包括:

  • 服务地图
  • 服务详细信息(延迟、吞吐量、失败事务)
  • 服务之间的依赖关系、分布式跟踪
  • 事务(跟踪)
  • 机器学习 (ML) 相关性
  • 日志关联

除了 Elastic 的 APM 和遥测数据的统一视图之外,您还可以使用 Elastic 强大的机器学习功能来减少分析和警报,从而帮助缩短 MTTR。

先决条件

查看示例源代码

完整的源代码(包括此博客中使用的 Dockerfile)可以在GitHub 上找到。该存储库还包含没有检测的相同应用程序。这使您可以比较每个文件并查看差异。

以下步骤将向您展示如何检测此应用程序并在命令行或 Docker 中运行它。如果您对更完整的 OTel 示例感兴趣,请查看此处的 docker-compose 文件,它将启动整个项目。

在开始之前,我们先看看未检测的代码。

这是一个简单的 Python Flask 应用程序,可以接收 GET 请求。(这是完整的main.py 文件的一部分。)

from flask import Flask, request
import sys

import logging
import redis
import os
import ecs_logging
import datetime
import random
import time

redis_host = os.environ.get('REDIS_HOST') or 'localhost'
redis_port = os.environ.get('REDIS_PORT') or 6379

application_port = os.environ.get('APPLICATION_PORT') or 5000

app = Flask(__name__)

# Get the Logger
logger = logging.getLogger("app")
logger.setLevel(logging.DEBUG)

# Add an ECS formatter to the Handler
handler = logging.StreamHandler()
handler.setFormatter(ecs_logging.StdlibFormatter())
logger.addHandler(handler)
logging.getLogger('werkzeug').setLevel(logging.ERROR)
logging.getLogger('werkzeug').addHandler(handler)

r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)

@app.route('/favorites', methods=['GET'])
def get_favorite_movies():
    user_id = str(request.args.get('user_id'))

    logger.info('Getting favorites for user ' + user_id, extra={
        "event.dataset": "favorite.log",
        "user.id": request.args.get('user_id')
    })

    favorites = r.smembers(user_id)

    # convert to list
    favorites = list(favorites)
    logger.info('User ' + user_id + ' has favorites: ' + str(favorites), extra={
        "event.dataset": "favorite.log",
        "user.id": user_id
    })
    return { "favorites": favorites}

logger.info('App startup')
app.run(host='0.0.0.0', port=application_port)
logger.info('App Stopped')

分步指南

步骤 0. 登录您的 Elastic Cloud 帐户

本博客假设您有一个 Elastic Cloud 帐户 — 如果没有,请按照说明在 Elastic Cloud 上开始使用

步骤 1. 安装并初始化 OpenTelemetry

第一步,我们需要向我们的应用程序添加一些额外的库。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.sdk.resources import Resource

此代码导入必要的 OpenTelemetry 库,包括用于跟踪、导出和检测特定库(如 Flask、Requests 和 Redis)的库。

接下来,我们读取变量

OTEL_EXPORTER_OTLP_HEADERS
OTEL_EXPORTER_OTLP_ENDPOINT

然后初始化导出器。

otel_exporter_otlp_headers = os.environ.get('OTEL_EXPORTER_OTLP_HEADERS')

otel_exporter_otlp_endpoint = os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT')

exporter = OTLPSpanExporter(endpoint=otel_exporter_otlp_endpoint, headers=otel_exporter_otlp_headers)

为了将其他参数传递给 OpenTelemetry,我们将读取 OTEL_RESOURCE_ATTRIBUTES 变量并将其转换为对象。

resource_attributes = os.environ.get('OTEL_RESOURCE_ATTRIBUTES') or 'service.version=1.0,deployment.environment=production'
key_value_pairs = resource_attributes.split(',')
result_dict = {}

for pair in key_value_pairs:
    key, value = pair.split('=')
    result_dict[key] = value

接下来,我们将使用这些参数来填充资源配置。

resourceAttributes = {
     "service.name": otel_service_name,
     "service.version": result_dict['service.version'],
     "deployment.environment": result_dict['deployment.environment']
}

resource = Resource.create(resourceAttributes)

然后,我们使用之前创建的资源设置跟踪提供程序。跟踪提供程序允许我们在稍后从其中获取跟踪器实例后创建跨度。

此外,我们指定使用 BatchSPanProcessor。跨度处理器是一个接口,允许跨度开始和结束方法调用的挂钩。

在 OpenTelemetry 中,提供了不同的跨度处理器。BatchSPanProcessor 批量处理跨度并批量发送。可以使用 MultiSpanProcessor 配置多个跨度处理器同时处于活动状态。请参阅 OpenTelemetry 文档

此外,我们还添加了资源模块。这允许我们指定诸如 service.name、version 等属性。有关详细信息,请参阅OpenTelemetry 语义约定文档

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)

# Sets the global default tracer provider
trace.set_tracer_provider(provider)

# Creates a tracer from the global tracer provider
tracer = trace.get_tracer(otel_service_name)

最后,因为我们使用的是 Flask 和 Redis,所以我们还添加了以下内容,这允许我们自动检测 Flask 和 Redis。

从技术上讲,你可以认为这是“作弊”。我们正在使用 Python 自动检测的某些部分。但是,通常来说,采用一些自动检测模块是一个好方法。这可以节省你大量的时间,此外,它还可以确保分布式追踪等功能对于你接收或发送的任何请求都能自动工作。

FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
RedisInstrumentor().instrument()

步骤 2. 添加自定义 Span

现在我们已经添加和初始化了所有内容,我们可以添加自定义 Span。

如果我们想为应用程序的某个部分添加额外的检测,我们只需使用 Python 包裹 /favoritesGET 函数代码,如下所示:

with tracer.start_as_current_span("add_favorite_movies", set_status_on_exception=True) as span:
        ...

包裹的代码如下:

@app.route('/favorites', methods=['GET'])
def get_favorite_movies():
    # add artificial delay if enabled
    if delay_time > 0:
        time.sleep(max(0, random.gauss(delay_time/1000, delay_time/1000/10)))

    with tracer.start_as_current_span("get_favorite_movies") as span:
        user_id = str(request.args.get('user_id'))

        logger.info('Getting favorites for user ' + user_id, extra={
            "event.dataset": "favorite.log",
            "user.id": request.args.get('user_id')
        })

        favorites = r.smembers(user_id)

        # convert to list
        favorites = list(favorites)
        logger.info('User ' + user_id + ' has favorites: ' + str(favorites), extra={
            "event.dataset": "favorite.log",
            "user.id": user_id
        })

其他代码

除了模块和 Span 检测之外,示例应用程序还在启动时检查一些环境变量。在不使用 OTel 收集器的情况下将数据发送到 Elastic 时,需要 OTEL_EXPORTER_OTLP_HEADERS 变量,因为它包含身份验证信息。OTEL_EXPORTER_OTLP_ENDPOINT 也是如此,它是我们将发送遥测数据的主机。

otel_exporter_otlp_headers = os.environ.get('OTEL_EXPORTER_OTLP_HEADERS')
# fail if secret token not set
if otel_exporter_otlp_headers is None:
    raise Exception('OTEL_EXPORTER_OTLP_HEADERS environment variable not set')


otel_exporter_otlp_endpoint = os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT')
# fail if server url not set
if otel_exporter_otlp_endpoint is None:
    raise Exception('OTEL_EXPORTER_OTLP_ENDPOINT environment variable not set')
else:
    exporter = OTLPSpanExporter(endpoint=otel_exporter_otlp_endpoint, headers=otel_exporter_otlp_headers)

最终代码
作为比较,这是我们示例应用程序的检测代码。你可以在 GitHub 中找到完整的源代码。

from flask import Flask, request
import sys

import logging
import redis
import os
import ecs_logging
import datetime
import random
import time

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

#Using grpc exporter since per the instructions in OTel docs this is needed for any endpoint receiving OTLP.

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
#from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware
from opentelemetry.sdk.resources import Resource

redis_host = os.environ.get('REDIS_HOST') or 'localhost'
redis_port = os.environ.get('REDIS_PORT') or 6379
otel_traces_exporter = os.environ.get('OTEL_TRACES_EXPORTER') or 'otlp'
otel_metrics_exporter = os.environ.get('OTEL_TRACES_EXPORTER') or 'otlp'
environment = os.environ.get('ENVIRONMENT') or 'dev'
otel_service_version = os.environ.get('OTEL_SERVICE_VERSION') or '1.0.0'
resource_attributes = os.environ.get('OTEL_RESOURCE_ATTRIBUTES') or 'service.version=1.0,deployment.environment=production'

otel_exporter_otlp_headers = os.environ.get('OTEL_EXPORTER_OTLP_HEADERS')
# fail if secret token not set
if otel_exporter_otlp_headers is None:
    raise Exception('OTEL_EXPORTER_OTLP_HEADERS environment variable not set')
#else:
#    otel_exporter_otlp_fheaders= f"Authorization=Bearer%20{secret_token}"

otel_exporter_otlp_endpoint = os.environ.get('OTEL_EXPORTER_OTLP_ENDPOINT')
# fail if server url not set
if otel_exporter_otlp_endpoint is None:
    raise Exception('OTEL_EXPORTER_OTLP_ENDPOINT environment variable not set')
else:
    exporter = OTLPSpanExporter(endpoint=otel_exporter_otlp_endpoint, headers=otel_exporter_otlp_headers)


key_value_pairs = resource_attributes.split(',')
result_dict = {}

for pair in key_value_pairs:
    key, value = pair.split('=')
    result_dict[key] = value

resourceAttributes = {
     "service.name": result_dict['service.name'],
     "service.version": result_dict['service.version'],
     "deployment.environment": result_dict['deployment.environment']
#     # Add more attributes as needed
}

resource = Resource.create(resourceAttributes)


provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)

# Sets the global default tracer provider
trace.set_tracer_provider(provider)

# Creates a tracer from the global tracer provider
tracer = trace.get_tracer("favorite")


application_port = os.environ.get('APPLICATION_PORT') or 5000

app = Flask(__name__)


FlaskInstrumentor().instrument_app(app)
#OpenTelemetryMiddleware().instrument()
RequestsInstrumentor().instrument()
RedisInstrumentor().instrument()

#app.wsgi_app = OpenTelemetryMiddleware(app.wsgi_app)

# Get the Logger
logger = logging.getLogger("app")
logger.setLevel(logging.DEBUG)

# Add an ECS formatter to the Handler
handler = logging.StreamHandler()
handler.setFormatter(ecs_logging.StdlibFormatter())
logger.addHandler(handler)
logging.getLogger('werkzeug').setLevel(logging.ERROR)
logging.getLogger('werkzeug').addHandler(handler)

r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)

@app.route('/favorites', methods=['GET'])
def get_favorite_movies():
    with tracer.start_as_current_span("get_favorite_movies") as span:
        user_id = str(request.args.get('user_id'))

        logger.info('Getting favorites for user ' + user_id, extra={
            "event.dataset": "favorite.log",
            "user.id": request.args.get('user_id')
        })

        favorites = r.smembers(user_id)

        # convert to list
        favorites = list(favorites)
        logger.info('User ' + user_id + ' has favorites: ' + str(favorites), extra={
            "event.dataset": "favorite.log",
            "user.id": user_id
        })
        return { "favorites": favorites}

logger.info('App startup')
app.run(host='0.0.0.0', port=application_port)
logger.info('App Stopped')

步骤 3. 使用环境变量运行 Docker 镜像

OTEL 文档中所述,我们将使用环境变量并传入配置值,以使其能够与 Elastic Observability 的 APM 服务器连接。

因为 Elastic 原生接受 OTLP,我们只需要提供 OTLP Exporter 需要发送数据的端点和身份验证,以及其他一些环境变量。

获取 Elastic Cloud 变量
你可以从 Kibana® 的路径下复制端点和令牌:

/app/home#/tutorial/apm
.

你将需要复制以下环境变量:

OTEL_EXPORTER_OTLP_ENDPOINT
OTEL_EXPORTER_OTLP_HEADERS

构建镜像

docker build -t  python-otel-manual-image .

运行镜像

docker run \
       -e OTEL_EXPORTER_OTLP_ENDPOINT="<REPLACE WITH OTEL_EXPORTER_OTLP_ENDPOINT>" \
       -e OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <REPLACE WITH TOKEN>" \
       -e OTEL_RESOURCE_ATTRIBUTES="service.version=1.0,deployment.environment=production,service.name=python-favorite-otel-manual" \
       -p 3001:3001 \
       python-otel-manual-image

你现在可以发出一些请求以生成跟踪数据。请注意,这些请求预计会返回错误,因为此服务依赖于与你当前未运行的 Redis 的连接。如前所述,你可以在此处找到使用 docker-compose 的更完整的示例。

curl localhost:500/favorites
# or alternatively issue a request every second

while true; do curl "localhost:5000/favorites"; sleep 1; done;

步骤 4. 在 Elastic APM 中探索跟踪、指标和日志

现在服务已经过检测,当你查看 Python 服务的事务部分时,你应该在 Elastic APM 中看到以下输出:

请注意,这与自动检测版本略有不同,因为我们现在在此视图中也有自定义 Span。

值得吗?

这是一个价值百万美元的问题。根据你需要的细节级别,手动检测可能是必要的。手动检测允许你在需要或想要的地方添加自定义 Span、自定义标签和指标。它允许你获得在其他情况下不可能获得的细节级别,并且通常对于跟踪特定于业务的 KPI 非常重要。

你的操作以及你是否需要对代码的特定部分进行故障排除或分析性能,将决定何时以及检测什么。但是知道你可以选择手动检测是很有帮助的。

如果你注意到我们还没有检测指标,那是另一篇博客的内容。我们在之前的博客中讨论了日志。

结论

在这篇博客中,我们讨论了以下内容:

  • 如何使用 OpenTelemetry 手动检测 Python
  • 如何正确初始化 OpenTelemetry 并添加自定义 Span
  • 如何在不需要收集器的情况下,轻松地使用 Elastic 设置 OTLP ENDPOINT 和 OTLP HEADERS

希望这提供了一个易于理解的演练,介绍如何使用 OpenTelemetry 检测 Python 以及如何轻松将跟踪发送到 Elastic。

开发者资源

通用配置和用例资源

还没有 Elastic Cloud 帐户吗?注册 Elastic Cloud 并尝试我上面讨论的自动检测功能。我很乐意获得你关于使用 Elastic 获取应用程序堆栈可见性的体验的反馈

本文中描述的任何功能或特性的发布和时间安排仍由 Elastic 自行决定。任何当前不可用的功能或特性可能不会按时交付或根本不交付。

分享这篇文章