Almudena Sanz OlivéTamara Dancheva

使用 Elastic Observability 监控 dbt 管道

了解如何使用 Elastic 设置 dbt 监控系统,主动发出有关数据处理成本飙升、每表行数异常和数据质量测试失败的警报

阅读时长 13 分钟
Monitor dbt pipelines with Elastic Observability

在 Elastic 的 Observability 组织内的数据分析团队中,我们使用 dbt (dbt™,数据构建工具) 来执行我们的 SQL 数据转换管道。dbt 是一种 SQL 优先的转换工作流程,它使团队能够快速协作地部署分析代码。特别是,我们使用 dbt core,这个 开源项目,你可以在其中从命令行开发和运行你的 dbt 项目。

我们的数据转换管道每天运行,并处理为我们的内部仪表板、报告、分析和机器学习 (ML) 模型提供数据。

过去曾发生过管道失败、源表中包含错误数据或我们在 SQL 代码中引入导致数据质量问题的情况,而我们只有在看到每周报告中显示异常记录数时才意识到这一点。这就是为什么我们构建了一个监控系统,该系统可以在这些类型的事件发生时主动向我们发出警报,并通过可视化和分析来帮助我们了解其根本原因,从而节省我们数小时或数天的人工调查时间。

我们利用我们自己的 Observability 解决方案来帮助解决这一挑战,监控我们 dbt 实施的整个生命周期。此设置使我们能够跟踪模型的行为并对最终表进行数据质量测试。我们从运行作业和测试中将 dbt 进程日志导出到 Elasticsearch,并利用 Kibana 创建仪表板、设置警报以及配置机器学习作业来监控和评估问题。

下图显示了我们的完整架构。在后续文章中,我们还将介绍如何使用 OTEL 和 Elastic 观察我们的 python 数据处理和 ML 模型进程 - 请继续关注。

为什么使用 Elastic 监控 dbt 管道?

每次调用时,dbt 都会生成并保存一个或多个名为 artifacts 的 JSON 文件,其中包含有关调用结果的日志数据。

dbt run
dbt test
调用日志按照 dbt 文档的说明 存储在文件
run_results.json

此文件包含有关 dbt 完成调用的信息,包括执行的每个节点(模型、测试等)的计时和状态信息。总的来说,许多

run_results.json
可以组合起来计算平均模型运行时、测试失败率、快照捕获的记录更改次数等。

监控

dbt run
调用日志可以帮助解决多个问题,包括跟踪和警报有关表卷、检测资源密集型模型产生的过多的槽时间、识别由于槽时间或卷导致的成本飙升以及查明可能指示调度问题的缓慢执行时间。当我们合并一个包含代码更改的 PR 时,该系统至关重要,该更改存在问题,导致上游表 A 中的每日行数突然下降。通过将
dbt run
日志提取到 Elastic 中,我们的异常检测作业快速识别了表 A 及其下游表 B、C 和 D 的每日行数中的异常。数据分析团队收到了有关该问题的警报通知,使我们能够及时进行故障排除、修复和回填表,以免影响每周仪表板和下游 ML 模型。

监控

dbt test
调用日志还可以解决多个问题,例如识别表中的重复项、通过验证所有枚举字段来检测特定字段的允许值中未被注意的更改以及解决各种其他数据处理和质量问题。借助有关数据质量测试的仪表板和警报,我们可以主动识别重复键、意外类别值和空值增加等问题,从而确保数据完整性。在我们的团队中,我们遇到了一个问题,即其中一个原始查找表的更改在我们的用户表中产生了重复的行,导致报告的用户数量翻了一番。通过将
dbt test
日志提取到 Elastic 中,我们的规则检测到一些重复测试失败。该团队收到了有关该问题的警报通知,使我们能够通过查找作为根本原因的上游表立即对其进行故障排除。这些重复项意味着下游表必须处理 2 倍的数据量,从而导致处理的字节数和槽时间激增。异常检测和关于
dbt run
日志的警报还帮助我们发现了各个表的这些激增,并使我们能够量化对我们账单的影响。

使用 Elastic 和 Kibana 处理我们的 dbt 日志可以使我们获得实时见解,帮助我们快速排除潜在问题,并保持我们的数据转换过程顺利运行。我们在 Kibana 中设置了异常检测作业和警报,以监控 dbt 处理的行数、槽时间和测试结果。这使我们能够捕获实时事件,并且通过及时识别和修复这些问题,Elastic 使我们的数据管道更具弹性,并使我们的模型更具成本效益,从而帮助我们掌握成本飙升或数据质量问题。

我们还可以将此信息与提取到 Elastic 中的其他事件相关联,例如,使用 Elastic Github 连接器,我们可以将数据质量测试失败或其他异常与代码更改相关联,以找到导致问题的提交或 PR 的根本原因。通过将应用程序日志提取到 Elastic 中,我们还可以分析管道中的这些问题是否影响了下游应用程序,从而使用 APM 增加了延迟、吞吐量或错误率。通过提取账单、收入数据或网络流量,我们还可以看到对业务指标的影响。

如何将 dbt 调用日志导出到 Elasticsearch

我们在生产环境中每天运行完

dbt run
dbt test
进程后,使用 Python Elasticsearch 客户端 将 dbt 调用日志发送到 Elastic。该设置只需要你安装 Elasticsearch Python 客户端,并获取你的 Elastic Cloud ID(转到 https://cloud.elastic.co/deployments/,选择你的部署并找到
Cloud ID
)和 Elastic Cloud API 密钥 (按照本指南)

此 python 帮助程序函数会将你的

run_results.json
文件中的结果索引到指定的索引。你只需要将变量导出到环境中

  • RESULTS_FILE
    :你的
    run_results.json
    文件的路径
  • DBT_RUN_LOGS_INDEX
    :你想要在 Elastic 中为 dbt 运行日志索引指定的名称,例如
    dbt_run_logs
  • DBT_TEST_LOGS_INDEX
    :你想要在 Elastic 中为 dbt 测试日志索引指定的名称,例如
    dbt_test_logs
  • ES_CLUSTER_CLOUD_ID
  • ES_CLUSTER_API_KEY

然后调用函数

log_dbt_es
从你的 python 代码或将此代码保存为 python 脚本,并在执行完你的
dbt run
dbt test
命令后运行它

from elasticsearch import Elasticsearch, helpers
import os
import sys
import json

def log_dbt_es():
   RESULTS_FILE = os.environ["RESULTS_FILE"]
   DBT_RUN_LOGS_INDEX = os.environ["DBT_RUN_LOGS_INDEX"]
   DBT_TEST_LOGS_INDEX = os.environ["DBT_TEST_LOGS_INDEX"]
   es_cluster_cloud_id = os.environ["ES_CLUSTER_CLOUD_ID"]
   es_cluster_api_key = os.environ["ES_CLUSTER_API_KEY"]


   es_client = Elasticsearch(
       cloud_id=es_cluster_cloud_id,
       api_key=es_cluster_api_key,
       request_timeout=120,
   )


   if not os.path.exists(RESULTS_FILE):
       print(f"ERROR: {RESULTS_FILE} No dbt run results found.")
       sys.exit(1)


   with open(RESULTS_FILE, "r") as json_file:
       results = json.load(json_file)
       timestamp = results["metadata"]["generated_at"]
       metadata = results["metadata"]
       elapsed_time = results["elapsed_time"]
       args = results["args"]
       docs = []
       for result in results["results"]:
           if result["unique_id"].split(".")[0] == "test":
               result["_index"] = DBT_TEST_LOGS_INDEX
           else:
               result["_index"] = DBT_RUN_LOGS_INDEX
           result["@timestamp"] = timestamp
           result["metadata"] = metadata
           result["elapsed_time"] = elapsed_time
           result["args"] = args
           docs.append(result)
       _ = helpers.bulk(es_client, docs)
   return "Done"

# Call the function
log_dbt_es()

如果想从

run_results.json
中添加/删除任何其他字段,你可以修改上面的函数来实现。

一旦结果被索引,你就可以使用 Kibana 为这两个索引创建数据视图,并在 Discover 中开始探索它们。

转到 Discover,单击左上角的数据视图选择器,然后单击“创建数据视图”。

现在,你可以使用你喜欢的名称创建数据视图。对 dbt 运行(

DBT_RUN_LOGS_INDEX
在你的代码中)和 dbt 测试(
DBT_TEST_LOGS_INDEX
在你的代码中)索引执行此操作

回到“发现” (Discover) 页面,您将能够选择“数据视图” (Data Views) 并浏览数据。

dbt 运行警报、仪表板和 ML 作业

的调用

针对当前数据库执行已编译的 SQL 模型文件。
dbt run
调用日志包含以下字段

  • unique_id
    :唯一的模型标识符
  • execution_time
    :执行此模型运行所花费的总时间

日志还包含来自适配器的关于作业执行的以下指标

  • adapter_response.bytes_processed
  • adapter_response.bytes_billed
  • adapter_response.slot_ms
  • adapter_response.rows_affected

我们使用 Kibana 在上述指标上设置了异常检测作业。您可以配置一个按以下内容拆分的多指标作业

unique_id
以便在每个表的受影响行数总和、消耗的槽时间或计费字节数出现异常时收到警报。您可以为每个指标跟踪一个作业。如果您已经构建了每个表的指标仪表板,您可以使用此快捷方式直接从可视化创建异常检测作业。创建作业并在传入数据上运行后,您可以使用异常时间线中的三个点按钮查看作业并将其添加到仪表板

我们使用 ML 作业设置警报,以便在检测到异常时向我们发送电子邮件/Slack 消息。可以通过单击 ML 作业行末尾的三个点,直接从“作业”(机器学习 > 异常检测作业)页面创建警报

我们还使用Kibana 仪表板来可视化每个表的异常检测作业结果和相关指标,以确定哪些表消耗了我们最多的资源,了解它们的时序演变,并衡量可以帮助我们了解每月变化的聚合指标。

dbt 测试警报和仪表板

您可能已经熟悉dbt 中的测试,但如果您不熟悉,dbt 数据测试是您对模型进行的断言。使用命令

,dbt 将告诉您项目中每个测试是通过还是失败。这是一个关于如何设置它们的示例。在我们的团队中,我们使用开箱即用的 dbt 测试(
unique
,
not_null
,
accepted_values
,和
relationships
)以及 dbt_utilsdbt_expectations 包用于一些额外的测试。当命令
dbt test
运行时,它会生成存储在
run_results.json
.

dbt 测试日志包含以下字段

  • unique_id
    :唯一的测试标识符,测试在其唯一标识符中包含“test”前缀
  • status
    :测试结果,
    pass
    fail
  • execution_time
    :执行此测试所花费的总时间
  • failures
    :如果测试通过则为 0,如果测试失败则为 1
  • message
    :如果测试失败,则为失败原因

日志还包含来自适配器的关于作业执行的指标。

我们已在文档计数上设置了警报(请参阅指南),当有任何测试失败时,将向我们发送电子邮件/Slack 消息。警报的规则是在我们之前创建的 dbt 测试数据视图上设置的,查询过滤条件为

status:fail
以获取失败的测试的日志,规则条件是文档计数大于 0。每当生产环境中的任何测试失败时,我们都会收到带有警报详细信息和仪表板链接的警报,以便能够排除故障

我们还构建了一个仪表板来可视化测试运行、测试失败及其执行时间和槽时间,以查看测试运行的历史视图

使用 AI 助手查找根本原因

对我们来说,分析这些多重信息来源最有效的方法是使用 AI 助手来帮助我们排除事件故障。在我们的案例中,我们收到了关于测试失败的警报,我们使用 AI 助手来为我们提供有关发生情况的背景信息。然后我们询问是否存在任何下游后果,AI 助手解释了异常检测作业的结果,该结果表明我们其中一个下游表的槽时间激增,并且槽时间相对于基线有所增加。然后,我们询问了根本原因,AI 助手能够找到并向我们提供我们 Github 变更日志中的 PR 链接,该链接与事件的开始相匹配,并且是最可能的原因。

结论

作为一个数据分析团队,我们负责保证我们向利益相关者提供的表格、图表、模型、报告和仪表板准确并包含正确的信息来源。随着团队的成长,我们拥有的模型数量变得更大并且更加相互关联,并且不容易保证一切运行顺利并提供准确的结果。拥有一个主动提醒我们成本激增、行计数异常或数据质量测试失败的监控系统,就像拥有一个值得信赖的伴侣,如果出现问题,它会提前提醒您,并帮助您找到问题的根本原因。

dbt 调用日志是我们数据管道状态的关键信息来源,而 Elastic 是从中提取最大潜力的完美工具。使用此博客文章作为开始使用 dbt 日志的起点,以帮助您的团队实现更高的可靠性和安心感,从而使他们能够专注于更具战略意义的任务,而不必担心潜在的数据问题。

分享这篇文章