数据去重

编辑

Beats 框架保证至少一次投递,以确保在将事件发送到支持确认的输出(例如 Elasticsearch、Logstash、Kafka 和 Redis)时不会丢失任何数据。如果一切按计划进行,这是很好的。但是,如果 Filebeat 在处理过程中关闭,或者在确认事件之前连接丢失,则最终可能会导致重复数据。

Elasticsearch 中重复数据的原因是什么?

编辑

当输出被阻塞时,Filebeat 中的重试机制会尝试重新发送事件,直到输出确认它们为止。如果输出接收了事件,但无法确认它们,则数据可能会多次发送到输出。由于文档 ID 通常是在 Elasticsearch *接收* 到来自 Beats 的数据*之后*设置的,因此重复事件会被索引为新文档。

如何避免重复数据?

编辑

与其允许 Elasticsearch 设置文档 ID,不如在 Beats 中设置 ID。ID 存储在 Beats 的 @metadata._id 字段中,并在索引期间用于设置文档 ID。这样,如果 Beats 将相同的事件多次发送到 Elasticsearch,Elasticsearch 会覆盖现有文档,而不是创建新文档。

@metadata._id 字段与事件一起传递,以便您可以在 Filebeat 发布事件但 Elasticsearch 接收事件之前使用它来设置文档 ID。例如,请参见 Logstash pipeline 示例

有几种方法可以在 Beats 中设置文档 ID

  • add_id 处理器

    当您的数据没有自然主键字段,并且您无法从现有字段派生唯一键时,请使用 add_id 处理器。

    此示例为每个事件生成一个唯一 ID 并将其添加到 @metadata._id 字段

    processors:
      - add_id: ~
  • fingerprint 处理器

    使用 fingerprint 处理器从一个或多个现有字段派生唯一键。

    此示例使用 field1field2 的值来派生一个唯一键,并将其添加到 @metadata._id 字段

    processors:
      - fingerprint:
          fields: ["field1", "field2"]
          target_field: "@metadata._id"
  • decode_json_fields 处理器

    当您解码包含自然主键字段的 JSON 字符串时,请在 decode_json_fields 处理器中使用 document_id 设置。

    对于此示例,假设 message 字段包含 JSON 字符串 {"myid": "100", "text": "Some text"}。此示例获取 JSON 字符串中 myid 的值并将其存储在 @metadata._id 字段中

    processors:
      - decode_json_fields:
          document_id: "myid"
          fields: ["message"]
          max_depth: 1
          target: ""

    生成的文档 ID 为 100

  • JSON 输入设置

    如果您正在摄取 JSON 格式的数据,并且数据具有自然主键字段,请使用 json.document_id 输入设置。

    此示例获取 JSON 文档中 key1 的值并将其存储在 @metadata._id 字段中

    filebeat.inputs:
    - type: log
      paths:
        - /path/to/json.log
      json.document_id: "key1"

Logstash pipeline 示例

编辑

对于此示例,假设您已使用前面描述的方法之一将文档 ID 存储在 Beats 的 @metadata._id 字段中。要在通过 Logstash 将 Beats 数据发送到 Elasticsearch 的途中保留 ID,请在 Logstash pipeline 中设置 document_id 字段

input {
  beats {
    port => 5044
  }
}

output {
  if [@metadata][_id] {
    elasticsearch {
      hosts => ["https://127.0.0.1:9200"]
      document_id => "%{[@metadata][_id]}" 
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
    }
  } else {
    elasticsearch {
      hosts => ["https://127.0.0.1:9200"]
      index => "%{[@metadata][beat]}-%{[@metadata][version]}"
    }
  }
}

Elasticsearch 输出 中的 document_id 字段设置为存储在 @metadata._id 中的值。

当 Elasticsearch 索引文档时,它会将文档 ID 设置为指定的值,从而保留从 Beats 传递的 ID。