转换检查点的工作原理

编辑

每次转换检查源索引并创建或更新目标索引时,它都会生成一个检查点

如果您的转换只运行一次,则逻辑上只有一个检查点。但是,如果您的转换持续运行,则它会在摄取和转换新的源数据时创建检查点。转换配置的sync属性通过指定时间字段来配置检查点。

要创建检查点,连续转换将

  1. 检查源索引是否有更改。

    使用简单的周期性计时器,转换会检查源索引是否有更改。此检查基于转换的frequency属性中定义的间隔进行。

    如果源索引保持不变,或者检查点已经在进行中,则它将等待下一个计时器。

    如果发现更改,则会创建一个检查点。

  2. 识别哪些实体和/或时间桶已更改。

    转换搜索以查看在最后一个检查点和新检查点之间哪些实体或时间桶发生了更改。转换使用这些值来同步源索引和目标索引,操作次数少于完全重新运行。

  3. 使用更改更新目标索引(数据帧)。

    转换将与新实体或更改的实体或时间桶相关的更改应用于目标索引。更改集可以进行分页。转换执行类似于批量转换操作的复合聚合,但它还会根据上一步注入查询过滤器以减少工作量。应用所有更改后,检查点完成。

此检查点过程涉及群集上的搜索和索引活动。在开发转换时,我们已尝试优先考虑控制而不是性能。我们认为,与其快速完成并优先使用资源消耗,不如让转换花费更长时间完成。也就是说,群集仍然需要足够的资源来支持复合聚合搜索及其结果的索引。

如果群集由于转换而遇到不合适的性能下降,请停止转换并参考性能注意事项

使用摄取时间戳同步转换

编辑

在大多数情况下,强烈建议使用源索引的摄取时间戳来同步转换。这是转换能够识别新更改的最优方法。如果您的数据源遵循ECS 标准,您可能已经有一个event.ingested字段。在这种情况下,请使用event.ingested作为转换的sync.time.field属性。

如果您没有event.ingested字段或它未填充,您可以使用摄取管道设置它。使用摄取管道 API(例如下面的示例)或通过 Kibana 在堆栈管理 > 摄取管道下创建摄取管道。使用set处理器来设置字段并将其与摄取时间戳的值关联。

resp = client.ingest.put_pipeline(
    id="set_ingest_time",
    description="Set ingest timestamp.",
    processors=[
        {
            "set": {
                "field": "event.ingested",
                "value": "{{{_ingest.timestamp}}}"
            }
        }
    ],
)
print(resp)
response = client.ingest.put_pipeline(
  id: 'set_ingest_time',
  body: {
    description: 'Set ingest timestamp.',
    processors: [
      {
        set: {
          field: 'event.ingested',
          value: '{{{_ingest.timestamp}}}'
        }
      }
    ]
  }
)
puts response
const response = await client.ingest.putPipeline({
  id: "set_ingest_time",
  description: "Set ingest timestamp.",
  processors: [
    {
      set: {
        field: "event.ingested",
        value: "{{{_ingest.timestamp}}}",
      },
    },
  ],
});
console.log(response);
PUT _ingest/pipeline/set_ingest_time
{
  "description": "Set ingest timestamp.",
  "processors": [
    {
      "set": {
        "field": "event.ingested",
        "value": "{{{_ingest.timestamp}}}"
      }
    }
  ]
}

创建摄取管道后,将其应用于转换的源索引。该管道将event.ingested字段添加到每个文档中,其值为摄取时间戳。使用创建转换 API(对于新转换)或更新转换 API(对于现有转换)配置转换的sync.time.field属性以使用该字段。event.ingested字段用于同步转换。

请参阅向索引请求添加管道摄取管道以了解有关如何使用摄取管道的更多信息。

更改检测启发式方法

编辑

当转换以连续模式运行时,它会在新的数据进入时更新目标索引中的文档。转换使用一组称为更改检测的启发式方法,以较少的操作更新目标索引。

在此示例中,数据按主机名分组。更改检测会检测哪些主机名已更改,例如主机ACG,并且仅更新具有这些主机的文档,但不更新存储有关主机BD或任何其他未更改的主机的信息的文档。

当使用date_histogram按时间桶分组时,可以应用另一种启发式方法。更改检测会检测哪些时间桶已更改,并且仅更新这些时间桶。

错误处理

编辑

转换中的故障往往与搜索或索引相关。为了提高转换的弹性,聚合搜索和更改实体搜索的光标位置将在内存中跟踪并在定期持久化。

检查点故障可以分类如下

  • 临时故障:检查点将重试。如果发生 10 次连续故障,则转换将处于失败状态。例如,当存在分片故障并且查询仅返回部分结果时,可能会出现这种情况。
  • 不可恢复的故障:转换立即失败。例如,当找不到源索引时,就会发生这种情况。
  • 调整故障:转换将使用调整后的设置重试。例如,如果在复合聚合期间发生父断路器内存错误,则转换将收到部分结果。聚合搜索将使用较少的桶数重试。此重试将在转换的frequency属性中定义的间隔内执行。如果搜索重试到达到最少桶数的程度,则会发生不可恢复的故障。

如果运行转换的节点发生故障,则转换将从最新的持久化光标位置重新启动。此恢复过程可能会重复转换已完成的一些工作,但它可以确保数据一致性。