在 Elastic 的 Observability 组织内的数据分析团队中,我们使用 dbt (dbt™,数据构建工具) 来执行我们的 SQL 数据转换管道。dbt 是一种 SQL 优先的转换工作流程,它使团队能够快速协作地部署分析代码。特别是,我们使用 dbt core,这个 开源项目,你可以在其中从命令行开发和运行你的 dbt 项目。
我们的数据转换管道每天运行,并处理为我们的内部仪表板、报告、分析和机器学习 (ML) 模型提供数据。
过去曾发生过管道失败、源表中包含错误数据或我们在 SQL 代码中引入导致数据质量问题的情况,而我们只有在看到每周报告中显示异常记录数时才意识到这一点。这就是为什么我们构建了一个监控系统,该系统可以在这些类型的事件发生时主动向我们发出警报,并通过可视化和分析来帮助我们了解其根本原因,从而节省我们数小时或数天的人工调查时间。
我们利用我们自己的 Observability 解决方案来帮助解决这一挑战,监控我们 dbt 实施的整个生命周期。此设置使我们能够跟踪模型的行为并对最终表进行数据质量测试。我们从运行作业和测试中将 dbt 进程日志导出到 Elasticsearch,并利用 Kibana 创建仪表板、设置警报以及配置机器学习作业来监控和评估问题。
下图显示了我们的完整架构。在后续文章中,我们还将介绍如何使用 OTEL 和 Elastic 观察我们的 python 数据处理和 ML 模型进程 - 请继续关注。
为什么使用 Elastic 监控 dbt 管道?
每次调用时,dbt 都会生成并保存一个或多个名为 artifacts 的 JSON 文件,其中包含有关调用结果的日志数据。
此文件包含有关 dbt 完成调用的信息,包括执行的每个节点(模型、测试等)的计时和状态信息。总的来说,许多
run_results.json可以组合起来计算平均模型运行时、测试失败率、快照捕获的记录更改次数等。
监控
监控
使用 Elastic 和 Kibana 处理我们的 dbt 日志可以使我们获得实时见解,帮助我们快速排除潜在问题,并保持我们的数据转换过程顺利运行。我们在 Kibana 中设置了异常检测作业和警报,以监控 dbt 处理的行数、槽时间和测试结果。这使我们能够捕获实时事件,并且通过及时识别和修复这些问题,Elastic 使我们的数据管道更具弹性,并使我们的模型更具成本效益,从而帮助我们掌握成本飙升或数据质量问题。
我们还可以将此信息与提取到 Elastic 中的其他事件相关联,例如,使用 Elastic Github 连接器,我们可以将数据质量测试失败或其他异常与代码更改相关联,以找到导致问题的提交或 PR 的根本原因。通过将应用程序日志提取到 Elastic 中,我们还可以分析管道中的这些问题是否影响了下游应用程序,从而使用 APM 增加了延迟、吞吐量或错误率。通过提取账单、收入数据或网络流量,我们还可以看到对业务指标的影响。
如何将 dbt 调用日志导出到 Elasticsearch
我们在生产环境中每天运行完
此 python 帮助程序函数会将你的
- RESULTS_FILE:你的run_results.json文件的路径
- DBT_RUN_LOGS_INDEX:你想要在 Elastic 中为 dbt 运行日志索引指定的名称,例如dbt_run_logs
- DBT_TEST_LOGS_INDEX:你想要在 Elastic 中为 dbt 测试日志索引指定的名称,例如dbt_test_logs
- ES_CLUSTER_CLOUD_ID
- ES_CLUSTER_API_KEY
然后调用函数
from elasticsearch import Elasticsearch, helpers
import os
import sys
import json
def log_dbt_es():
RESULTS_FILE = os.environ["RESULTS_FILE"]
DBT_RUN_LOGS_INDEX = os.environ["DBT_RUN_LOGS_INDEX"]
DBT_TEST_LOGS_INDEX = os.environ["DBT_TEST_LOGS_INDEX"]
es_cluster_cloud_id = os.environ["ES_CLUSTER_CLOUD_ID"]
es_cluster_api_key = os.environ["ES_CLUSTER_API_KEY"]
es_client = Elasticsearch(
cloud_id=es_cluster_cloud_id,
api_key=es_cluster_api_key,
request_timeout=120,
)
if not os.path.exists(RESULTS_FILE):
print(f"ERROR: {RESULTS_FILE} No dbt run results found.")
sys.exit(1)
with open(RESULTS_FILE, "r") as json_file:
results = json.load(json_file)
timestamp = results["metadata"]["generated_at"]
metadata = results["metadata"]
elapsed_time = results["elapsed_time"]
args = results["args"]
docs = []
for result in results["results"]:
if result["unique_id"].split(".")[0] == "test":
result["_index"] = DBT_TEST_LOGS_INDEX
else:
result["_index"] = DBT_RUN_LOGS_INDEX
result["@timestamp"] = timestamp
result["metadata"] = metadata
result["elapsed_time"] = elapsed_time
result["args"] = args
docs.append(result)
_ = helpers.bulk(es_client, docs)
return "Done"
# Call the function
log_dbt_es()
如果想从
一旦结果被索引,你就可以使用 Kibana 为这两个索引创建数据视图,并在 Discover 中开始探索它们。
转到 Discover,单击左上角的数据视图选择器,然后单击“创建数据视图”。
现在,你可以使用你喜欢的名称创建数据视图。对 dbt 运行(
回到“发现” (Discover) 页面,您将能够选择“数据视图” (Data Views) 并浏览数据。
dbt 运行警报、仪表板和 ML 作业
针对当前数据库执行已编译的 SQL 模型文件。- unique_id:唯一的模型标识符
- execution_time:执行此模型运行所花费的总时间
日志还包含来自适配器的关于作业执行的以下指标
- adapter_response.bytes_processed
- adapter_response.bytes_billed
- adapter_response.slot_ms
- adapter_response.rows_affected
我们使用 Kibana 在上述指标上设置了异常检测作业。您可以配置一个按以下内容拆分的多指标作业
我们使用 ML 作业设置警报,以便在检测到异常时向我们发送电子邮件/Slack 消息。可以通过单击 ML 作业行末尾的三个点,直接从“作业”(机器学习 > 异常检测作业)页面创建警报
我们还使用Kibana 仪表板来可视化每个表的异常检测作业结果和相关指标,以确定哪些表消耗了我们最多的资源,了解它们的时序演变,并衡量可以帮助我们了解每月变化的聚合指标。
dbt 测试警报和仪表板
您可能已经熟悉dbt 中的测试,但如果您不熟悉,dbt 数据测试是您对模型进行的断言。使用命令
,dbt 将告诉您项目中每个测试是通过还是失败。这是一个关于如何设置它们的示例。在我们的团队中,我们使用开箱即用的 dbt 测试(dbt 测试日志包含以下字段
- unique_id:唯一的测试标识符,测试在其唯一标识符中包含“test”前缀
- status:测试结果,pass或fail
- execution_time:执行此测试所花费的总时间
- failures:如果测试通过则为 0,如果测试失败则为 1
- message:如果测试失败,则为失败原因
日志还包含来自适配器的关于作业执行的指标。
我们已在文档计数上设置了警报(请参阅指南),当有任何测试失败时,将向我们发送电子邮件/Slack 消息。警报的规则是在我们之前创建的 dbt 测试数据视图上设置的,查询过滤条件为
我们还构建了一个仪表板来可视化测试运行、测试失败及其执行时间和槽时间,以查看测试运行的历史视图
使用 AI 助手查找根本原因
对我们来说,分析这些多重信息来源最有效的方法是使用 AI 助手来帮助我们排除事件故障。在我们的案例中,我们收到了关于测试失败的警报,我们使用 AI 助手来为我们提供有关发生情况的背景信息。然后我们询问是否存在任何下游后果,AI 助手解释了异常检测作业的结果,该结果表明我们其中一个下游表的槽时间激增,并且槽时间相对于基线有所增加。然后,我们询问了根本原因,AI 助手能够找到并向我们提供我们 Github 变更日志中的 PR 链接,该链接与事件的开始相匹配,并且是最可能的原因。
结论
作为一个数据分析团队,我们负责保证我们向利益相关者提供的表格、图表、模型、报告和仪表板准确并包含正确的信息来源。随着团队的成长,我们拥有的模型数量变得更大并且更加相互关联,并且不容易保证一切运行顺利并提供准确的结果。拥有一个主动提醒我们成本激增、行计数异常或数据质量测试失败的监控系统,就像拥有一个值得信赖的伴侣,如果出现问题,它会提前提醒您,并帮助您找到问题的根本原因。
dbt 调用日志是我们数据管道状态的关键信息来源,而 Elastic 是从中提取最大潜力的完美工具。使用此博客文章作为开始使用 dbt 日志的起点,以帮助您的团队实现更高的可靠性和安心感,从而使他们能够专注于更具战略意义的任务,而不必担心潜在的数据问题。