数据去重
编辑数据去重编辑
Beats 框架保证至少一次传送,以确保在将事件发送到支持确认的输出(例如 Elasticsearch、Logstash、Kafka 和 Redis)时不会丢失数据。如果一切按计划进行,那就太好了。但是,如果 Filebeat 在处理过程中关闭,或者在确认事件之前连接断开,则可能会导致数据重复。
是什么原因导致 Elasticsearch 中出现重复数据?编辑
当输出被阻塞时,Filebeat 中的重试机制会尝试重新发送事件,直到输出确认它们为止。如果输出接收到了事件,但无法确认它们,则数据可能会多次发送到输出。由于文档 ID 通常是在 Elasticsearch 从 Beats 接收数据*之后*设置的,因此重复的事件会被索引为新文档。
如何避免重复?编辑
不要让 Elasticsearch 设置文档 ID,而是在 Beats 中设置 ID。ID 存储在 Beats @metadata._id
字段中,并在索引期间用于设置文档 ID。这样,如果 Beats 多次将同一个事件发送到 Elasticsearch,Elasticsearch 会覆盖现有文档,而不是创建新文档。
@metadata._id
字段与事件一起传递,因此您可以在 Filebeat 发布事件之后但在 Elasticsearch 接收事件之前使用它来设置文档 ID。例如,请参阅Logstash 管道示例。
有几种方法可以在 Beats 中设置文档 ID
-
add_id
处理器当您的数据没有自然键字段,并且您无法从现有字段派生唯一键时,请使用
add_id
处理器。此示例为每个事件生成一个唯一的 ID,并将其添加到
@metadata._id
字段中processors: - add_id: ~
-
fingerprint
处理器使用
fingerprint
处理器从一个或多个现有字段派生唯一键。此示例使用
field1
和field2
的值派生唯一键,并将其添加到@metadata._id
字段中processors: - fingerprint: fields: ["field1", "field2"] target_field: "@metadata._id"
-
decode_json_fields
处理器当您要解码包含自然键字段的 JSON 字符串时,请在
decode_json_fields
处理器中使用document_id
设置。对于此示例,假设
message
字段包含 JSON 字符串{"myid": "100", "text": "Some text"}
。此示例从 JSON 字符串中获取myid
的值,并将其存储在@metadata._id
字段中processors: - decode_json_fields: document_id: "myid" fields: ["message"] max_depth: 1 target: ""
生成的文档 ID 为
100
。 -
JSON 输入设置
如果您要提取 JSON 格式的数据,并且数据具有自然键字段,请使用
json.document_id
输入设置。此示例从 JSON 文档中获取
key1
的值,并将其存储在@metadata._id
字段中filebeat.inputs: - type: log paths: - /path/to/json.log json.document_id: "key1"
Logstash 管道示例编辑
对于此示例,假设您已使用前面描述的方法之一将文档 ID 存储在 Beats @metadata._id
字段中。要在将 Beats 数据通过 Logstash 发送到 Elasticsearch 时保留 ID,请在 Logstash 管道中设置 document_id
字段
input { beats { port => 5044 } } output { if [@metadata][_id] { elasticsearch { hosts => ["https://127.0.0.1:9200"] document_id => "%{[@metadata][_id]}" index => "%{[@metadata][beat]}-%{[@metadata][version]}" } } else { elasticsearch { hosts => ["https://127.0.0.1:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}" } } }
将 Elasticsearch 输出 中的 |
当 Elasticsearch 为文档编制索引时,它会将文档 ID 设置为指定的值,并保留从 Beats 传递的 ID。