Mika Ayenson 博士Eric Forte

简化 ES|QL 查询和规则验证:与 GitHub CI 集成

ES|QL 是 Elastic 新的管道查询语言。Elastic Security Labs 充分利用这项新功能,详细介绍了如何对检测引擎的 ES|QL 规则进行验证。

阅读时长 14 分钟安全研究
Streamlining ES|QL Query and Rule Validation: Integrating with GitHub CI

最近推出的 8.11.0 版本功能中,最令人惊叹的功能之一是 Elasticsearch 查询语言 (ES|QL)。正如 Costin Leau 在之前的文章中强调的那样,它是 Elasitcsearch 的一个功能全面的专用查询和计算引擎。现在它已进入技术预览版,我们希望分享一些验证 ES|QL 查询的选项。此概述适用于 ES|QL 的新手工程师。无论您是在 Kibana 中搜索见解,还是在时间轴中调查安全威胁,您都将看到此功能如何在 Elastic 中无缝集成。

ES|QL 验证基础知识(基于 Kibana 和 Elasticsearch)

如果您想快速验证单个查询,或者可以手动逐个测试查询,则只需要 Elastic Stack UI。导航到 Kibana 中的“发现”选项卡后,单击“数据视图”下拉菜单中的“试用 ES|QL”技术预览版按钮,以加载查询窗格。您还可以从ES|QL 示例中获取示例查询以快速上手。引入非ECS 字段将立即突出显示错误,优先显示语法错误,然后显示未知列错误。

在此示例中,突出显示了两个语法错误

  • 输入 wheres 的无效语法错误,应为 where,以及
  • 未知列 process.worsking_directory,应为 process.working_directory

解决此示例中的语法错误后,您将看到“未知列”错误。以下是可能出现此错误的几个原因

  • 修复字段名称拼写错误:有时您只需修复错误中建议的名称;请参阅 ECS 或任何集成架构,并确认字段是否正确
  • 添加缺失的数据:如果您确信字段正确,有时向堆栈添加数据会填充列
  • 更新映射:您可以配置映射以设置显式字段,或者使用更新映射 API 将新字段添加到现有数据流或索引

ES|QL 警告

并非所有字段都会显示为错误,在这种情况下,您会看到警告和下拉列表。硬故障(例如错误)表示规则无法执行,而警告表示规则可以运行,但函数可能会降级。

当使用跨多个索引的广泛 ES|QL 查询(如 logs-* | limit 10)时,有时某些字段可能不会出现在结果中。这通常是因为这些字段在索引数据中未定义,或者 ES|QL 尚不支持。如果未检索到预期字段,则通常表示数据已摄入到 Elasticsearch 中,但没有按照已建立的映射对这些字段进行索引。ES|QL 不会导致查询失败,而是为不可用字段返回“null”,这可以作为查询中某些内容未按预期执行的警告。此方法确保查询仍然运行,从而将其与硬故障区分开来,当查询根本无法执行时(例如引用不存在的字段时)会发生硬故障。

还可能会出现有用的性能警告。向查询提供 LIMIT 参数将有助于解决性能警告。请注意,此示例突出显示了默认返回 500 个事件的限制。此功能正式推出后,此限制可能会大幅增加。

安全

在调查工作流程中,安全从业人员更倾向于迭代式地搜寻威胁,这可能包括在 UI 中手动测试、优化和调整查询。方便的是,安全分析师和工程师可以在时间轴中本地利用 ES|QL,无需通过在 Kibana 中来回切换视图来中断工作流程。您将在同一安全组件中收到相同的错误和警告,该组件会在后台显示 Elasticsearch 反馈。

在某些组件中,您会根据 ES|QL 的实现上下文收到其他反馈。一种场景是当您使用“检测规则 (SIEM)”选项卡下的“创建新规则”功能创建 ES|QL 规则时。

例如,此查询可以轻松转换为 EQLKQL 查询,因为它没有利用 ES|QL 的强大功能,如统计信息、频率分析或解析非结构化数据。如果您想了解更多关于使用 ES|QL 查询的好处,请查看 Costin 的博客,其中介绍了性能提升。在这种情况下,我们必须向查询添加 [metadata _id, _version, _index],这会告知 UI 在结果中返回哪些组件。

API 调用?当然可以!

在这一节之前,所有示例都引用了直接从 UI 创建 ES|QL 查询并接收反馈。为便于说明,以下示例利用了开发工具,但这些调用可以轻松迁移到 cURL bash 命令或您选择的可以发送 HTTP 请求的语言/工具。

下面是通过 POST 请求发送到查询 API 的相同查询,该查询与之前其他示例中显示的查询相同,并且查询有效。

正如预期的那样,如果您提供无效的查询,您将收到在 UI 中看到的类似反馈。在此示例中,我们还提供了 ?error_trace 标志,如果您需要有关查询未通过验证原因的其他上下文,则可以提供堆栈跟踪。

您可以想象,我们可以使用 API 以编程方式验证 ES|QL 查询。您仍然可以使用创建规则 Kibana API,这需要与安全规则关联的更多元数据。但是,如果您只想验证查询,_query API 就派上用场了。从这里,您可以使用Elasticsearch Python 客户端连接到您的堆栈并验证查询。

from elasticsearch import Elasticsearch
client = Elasticsearch(...)
data = {
"query": """
    from logs-endpoint.events.*
    | keep host.os.type, process.name, process.working_directory, event.type, event.action
    | where host.os.type == "linux" and process.name == "unshadow" and event.type == "start"     and event.action in ("exec", "exec_event")
"""
}

# Execute the query
headers = {"Content-Type": "application/json", "Accept": "application/json"}
response = client.perform_request(
"POST", "/_query", params={"pretty": True}, headers=headers, body=data
)

利用语法

Elastic 开源开发的一大优势是,antlr ES|QL 语法也是可用的。

如果您熟悉 ANTLR,您也可以下载最新的 JAR 文件来构建词法分析器和语法分析器。

pip install antlr4-tools # for antlr4
git clone [email protected]:elastic/elasticsearch.git # large repo
cd elasticsearch/x-pack/plugin/esql/src/main/antlr # navigate to grammar
antlr4 -Dlanguage=Python3 -o build EsqlBaseLexer.g4 # generate lexer
antlr4 -Dlanguage=Python3 -o build EsqlBaseParser.g4 # generate parser

这个过程需要更多的工作来启动 ES|QL 验证,但您至少会构建一个树对象,从而提供对解析字段更精细的控制和访问。

但是,正如您所看到的,监听器是存根,这意味着如果您想走这条路,您需要手动构建语义。

安全规则 GitHub CI 用例

对于我们内部的 Elastic EQL 和 KQL 查询规则验证,我们利用查询的解析抽象语法树 (AST) 对象,跨多个堆栈版本执行细致的语义验证。例如,拥有 AST 可以让我们验证正确的字段使用,验证新功能是否在引入之前在旧堆栈版本中使用,甚至可以进一步确保相关集成是基于查询中使用的数据流构建的。从根本上说,本地验证使我们能够简化对许多堆栈功能和版本的更广泛支持。如果您有兴趣了解更多关于我们可以使用 AST 进行的设计和严格验证的信息,请查看我们的 detection-rules repo

如果您不需要对特定解析树对象的精细访问,并且不需要控制 ES|QL 验证的语义,那么开箱即用的 API 可能就是您验证查询所需的全部内容。 在此用例中,我们希望使用持续集成来验证安全检测规则。通过 GitHub 等系统管理检测规则有助于获得使用版本控制的所有好处,例如跟踪规则更改、通过拉取请求接收反馈等等。从概念上讲,规则作者应该能够在本地创建这些规则(其中包含 ES|QL 查询)并执行 git 规则开发生命周期。

CI 检查有助于确保查询仍然通过 ES|QL 验证,而无需在 UI 中手动检查查询。根据目前为止显示的示例,您必须要么启动一个持久堆栈并根据 API 验证查询,要么基于 Elastic 堆栈之外的可用语法构建一个解析器实现。

使用短期 Elastic 堆栈与利用托管持久堆栈的一种方法是使用 Elastic Container Project (ECP)。 正如广告宣传的那样,这个项目将:

在几分钟内启动一个 100% 容器化的 Elastic 堆栈,具有 TLS 安全性,并预先配置、启用并准备好使用的 Elasticsearch、Kibana、Fleet 和 Detection Engine。

结合以下内容:

  • Elastic 容器(例如 ECP)
  • CI(例如 Github Action 工作流程)
  • ES|QL 规则
  • 自动化 Foo(例如 python 和 bash 脚本)

您可以通过 CI 相对轻松地针对最新堆栈版本验证 ES|QL 规则,但这种方法涉及一些细微之处。

如果您有兴趣了解如何实现它,请随时查看示例 GitHub action 工作流程

注意:如果您有兴趣使用 GitHub action 工作流程,请查看其关于在 Actions 中使用 GitHub secrets设置 Action 工作流程的文档。

CI 细微之处

  1. 任何自定义配置都需要通过脚本来完成(例如,设置额外的策略、enrichments 等)。在我们的 POC 中,我们创建了一个步骤和 bash 脚本,该脚本对我们的临时 CI Elastic 堆栈执行了一系列 POST 请求,从而创建了检测规则中使用的新丰富。
- name: Add Enrich Policy
  env:
    ELASTICSEARCH_SERVER: "https://127.0.0.1:9200"
    ELASTICSEARCH_USERNAME: "elastic"
    ELASTICSEARCH_PASSWORD: "${{ secrets.PASSWORD }}"
  run: |
    set -x
    chmod +x ./add_enrich.sh
    bash ./add_enrich.sh
  1. 在新鲜部署的 CI Elastic 堆栈中没有数据的情况下,会出现许多如前所述的 Unknown Column 问题。解决此问题的一种方法是构建具有适当映射的索引,以使查询能够匹配。例如,如果您的查询搜索索引 logs-endpoint.events.*,则创建一个名为 logs-endpoint.events.ci 的索引,其中包含查询中使用的集成的适当映射。

  2. 配置好临时堆栈后,您需要额外的逻辑来迭代所有规则并使用 _query API 进行验证。例如,您可以创建一个单元测试来迭代所有规则。我们现在通过利用默认的 RuleCollection.default() 来加载 detection-rules repo 中的所有规则来实现这一点,但这是一个快速加载仅限 ES|QL 规则的代码片段。

# tests/test_all_rules.py
class TestESQLRules:
    """Test ESQL Rules."""

    @unittest.skipIf(not os.environ.get("DR_VALIDATE_ESQL"),
         "Test only run when DR_VALIDATE_ESQL environment variable set.")
    def test_environment_variables_set(self):
        collection = RuleCollection()

        # Iterate over all .toml files in the given directory recursively
        for rule in Path(DEFAULT_RULES_DIR).rglob('*.toml'):
            # Read file content
            content = rule.read_text(encoding='utf-8')
            # Search for the pattern
            if re.search(r'language = "esql"', content):
                print(f"Validating {str(rule)}")
                collection.load_file(rule)

每个规则在通过 load_file 加载文件后,都会运行一个验证器方法。

# detection_rules/rule_validator.py
class ESQLValidator(QueryValidator):
    """Specific fields for ESQL query event types."""

    def validate(self, data: 'QueryRuleData', meta: RuleMeta) -> None:
        """Validate an ESQL query while checking TOMLRule."""
        if not os.environ.get("DR_VALIDATE_ESQL"):
            return

        if Version.parse(meta.min_stack_version) < Version.parse("8.11.0"):
            raise ValidationError(f"Rule minstack must be greater than 8.10.0 {data.rule_id}")

        client = Elasticsearch(...)
        client.info()
        client.perform_request("POST", "/_query", params={"pretty": True},
                               headers={"accept": "application/json", 
                                        "content-type": "application/json"},
                               body={"query": f"{self.query} | LIMIT 0"})

如前所述,我们可以 POST 到查询 API 并根据设置为 GitHub 操作秘密并作为环境变量传递给验证的凭据进行验证。请注意,LIMIT 0 是为了防止查询有意返回数据。它的目的是仅执行验证。最后,单个 CI 步骤将是运行单元测试的 bash 调用(例如,pytest tests/test_all_rules.py::TestESQLRules)。

  1. 最后,当针对多个 Elastic 堆栈版本和配置验证大量规则时,利用容器的 CI 可能无法很好地扩展。特别是如果您想在提交的基础上进行测试。部署一个堆栈需要花费五分钟多一点的时间才能完成。此测量值可能会根据您的 CI 设置而大幅增加或减少。

结论

Elasticsearch 的新功能 Elasticsearch 查询语言 (ES|QL) 是 Elasticsearch 的专用查询和计算引擎,目前处于技术预览阶段。它提供跨各种 Elastic 服务(如 Kibana 和 Timelines)的无缝集成,并提供 ES|QL 查询的验证选项。用户可以通过 Elastic Stack UI 或 API 调用来验证查询,并立即收到有关语法或列错误的反馈。

此外,ES|QL 的 ANTLR 语法可用,适合那些喜欢更手动方式构建词法分析器和解析器的人。我们正在探索以自动方式验证 ES|QL 查询的方法,现在轮到您了。请注意,我们尚未完成探索,因此请查看 ES|QL,如果您有任何想法,请告诉我们!我们很乐意听取您计划如何在堆栈内部或 CI 中使用它。

我们始终有兴趣听取此类用例和工作流程,因此一如既往,请通过 GitHub issues 与我们联系,在我们的 社区 Slack 中与我们聊天,并在我们的 Discuss 论坛中提出问题。

查看以下其他资源,了解更多关于我们如何将最新的 AI 功能带给分析师的信息:了解一切 ES|QL 查看 8.11.0 版本博客 介绍 ES|QL