转换检查点的工作原理

编辑

每次转换检查源索引并创建或更新目标索引时,它都会生成一个检查点

如果您的转换只运行一次,则逻辑上只有一个检查点。但是,如果您的转换持续运行,它会在摄取和转换新的源数据时创建检查点。转换配置的 sync 属性通过指定时间字段来配置检查点。

要创建检查点,连续转换会执行以下操作:

  1. 检查源索引的更改。

    使用简单的定期计时器,转换检查源索引的更改。此检查基于转换的 frequency 属性中定义的间隔进行。

    如果源索引保持不变,或者检查点正在进行中,则它会等待下一个计时器。

    如果发现更改,则会创建一个检查点。

  2. 识别哪些实体和/或时间桶已更改。

    转换会搜索以查看在上次检查点和新检查点之间哪些实体或时间桶已更改。转换使用这些值来同步源索引和目标索引,操作次数少于完全重新运行。

  3. 使用更改更新目标索引(数据帧)。

    转换将与新的或已更改的实体或时间桶相关的更改应用到目标索引。更改集可以分页。转换执行类似于批处理转换操作的复合聚合,但它还基于上一步注入查询过滤器以减少工作量。应用所有更改后,检查点完成。

此检查点过程涉及集群上的搜索和索引活动。我们在开发转换时尝试优先考虑控制而不是性能。我们认为,转换花更长的时间完成,而不是快速完成并优先占用资源是更好的。也就是说,集群仍然需要足够的资源来支持复合聚合搜索及其结果的索引。

如果集群由于转换而出现不合适的性能下降,请停止转换并参考 性能注意事项

使用摄取时间戳同步转换

编辑

在大多数情况下,强烈建议使用源索引的摄取时间戳来同步转换。这是转换能够识别新更改的最优方式。如果您的数据源遵循 ECS 标准,您可能已经有一个 event.ingested 字段。在这种情况下,请使用 event.ingested 作为转换的 sync.time.field 属性。

如果您没有 event.ingested 字段或该字段未填充,您可以使用摄取管道设置它。使用 摄取管道 API(如下例所示)或通过 Kibana 在 堆栈管理 > 摄取管道下创建摄取管道。使用 set 处理器设置该字段并将其与摄取时间戳的值关联。

resp = client.ingest.put_pipeline(
    id="set_ingest_time",
    description="Set ingest timestamp.",
    processors=[
        {
            "set": {
                "field": "event.ingested",
                "value": "{{{_ingest.timestamp}}}"
            }
        }
    ],
)
print(resp)
response = client.ingest.put_pipeline(
  id: 'set_ingest_time',
  body: {
    description: 'Set ingest timestamp.',
    processors: [
      {
        set: {
          field: 'event.ingested',
          value: '{{{_ingest.timestamp}}}'
        }
      }
    ]
  }
)
puts response
const response = await client.ingest.putPipeline({
  id: "set_ingest_time",
  description: "Set ingest timestamp.",
  processors: [
    {
      set: {
        field: "event.ingested",
        value: "{{{_ingest.timestamp}}}",
      },
    },
  ],
});
console.log(response);
PUT _ingest/pipeline/set_ingest_time
{
  "description": "Set ingest timestamp.",
  "processors": [
    {
      "set": {
        "field": "event.ingested",
        "value": "{{{_ingest.timestamp}}}"
      }
    }
  ]
}

创建摄取管道后,将其应用于转换的源索引。管道将字段 event.ingested 添加到每个文档,其值为摄取时间戳。配置转换的 sync.time.field 属性以使用该字段,方法是为新转换使用 创建转换 API,或为现有转换使用 更新转换 APIevent.ingested 字段用于同步转换。

请参阅 向索引请求添加管道摄取管道,了解有关如何使用摄取管道的更多信息。

更改检测启发式

编辑

当转换以连续模式运行时,它会在新数据传入时更新目标索引中的文档。转换使用一组称为更改检测的启发式方法来更新目标索引,操作次数更少。

在此示例中,数据按主机名分组。更改检测会检测哪些主机名已更改,例如,主机 ACG,并且仅更新包含这些主机的文档,但不更新存储有关主机 BD 或任何其他未更改的主机的信息的文档。

当使用 date_histogram 按时间桶分组时,可以对时间桶应用另一个启发式方法。更改检测会检测哪些时间桶已更改,并且仅更新这些时间桶。

错误处理

编辑

转换中的故障往往与搜索或索引相关。为了提高转换的弹性,聚合搜索的游标位置和已更改实体搜索的游标位置在内存中跟踪并定期持久化。

检查点故障可以分为以下几类:

  • 临时故障:重试检查点。如果发生 10 次连续故障,则转换的状态为失败。例如,当出现分片故障且查询仅返回部分结果时,可能会发生这种情况。
  • 不可恢复的故障:转换立即失败。例如,当找不到源索引时,会发生这种情况。
  • 调整故障:转换使用调整后的设置重试。例如,如果在复合聚合期间发生父断路器内存错误,则转换会收到部分结果。使用较少的存储桶重试聚合搜索。此重试按转换的 frequency 属性中定义的间隔执行。如果搜索重试到达到最小存储桶数,则会发生不可恢复的故障。

如果运行转换的节点失败,则转换会从最近持久化的游标位置重新启动。此恢复过程可能会重复转换已经完成的一些工作,但这可以确保数据一致性。