死信队列 (DLQ)编辑

死信队列 (DLQ) 被设计为临时写入无法处理的事件的地方。DLQ 使您能够灵活地调查有问题的事件,而不会阻塞管道或丢失事件。您的管道保持流动,并且可以避免立即出现问题。但这些事件仍然需要解决。

您可以使用 dead_letter_queue 输入插件 处理来自 DLQ 的事件

处理事件不会从队列中删除项目,并且 DLQ 有时需要关注。有关更多信息,请参阅 跟踪死信队列大小清除死信队列

死信队列的工作原理编辑

默认情况下,当 Logstash 遇到由于数据包含映射错误或其他问题而无法处理的事件时,Logstash 管道会挂起或丢弃不成功的事件。为了防止在这种情况下丢失数据,您可以 配置 Logstash 将不成功的事件写入死信队列,而不是丢弃它们。

目前,只有 Elasticsearch 输出 支持死信队列。死信队列用于响应代码为 400 或 404 的文档,这两种代码都表示无法重试的事件。

写入死信队列的每个事件都包括原始事件、描述事件无法处理的原因的元数据、有关写入事件的插件的信息以及事件进入死信队列的时间戳。

要处理死信队列中的事件,请创建一个 Logstash 管道配置,该配置使用 dead_letter_queue 输入插件 从队列中读取。有关更多信息,请参阅 处理死信队列中的事件

Diagram showing pipeline reading from the dead letter queue

Elasticsearch 处理和死信队列编辑

HTTP 请求失败。 如果 HTTP 请求失败(因为 Elasticsearch 无法访问或因为它返回了 HTTP 错误代码),Elasticsearch 输出会无限期地重试整个请求。在这些情况下,死信队列没有机会进行拦截。

HTTP 请求成功。 Elasticsearch 批量 API 可以使用同一个请求执行多个操作。如果批量 API 请求成功,它会返回 200 OK,即使批处理中的一些文档 失败。在这种情况下,请求的 errors 标志将为 true

响应正文可以包含元数据,指示批量请求中的一个或多个特定操作无法执行,以及每个条目的 HTTP 风格状态代码,以指示操作无法执行的原因。如果配置了 DLQ,则会将各个索引失败路由到那里。

即使您定期处理事件,事件也会保留在死信队列中。死信队列需要 手动干预 才能清除它。

配置 Logstash 以使用死信队列编辑

默认情况下,死信队列处于禁用状态。要启用死信队列,请在 logstash.yml 设置文件 中设置 dead_letter_queue_enable 选项

dead_letter_queue.enable: true

死信队列作为文件存储在 Logstash 实例的本地目录中。默认情况下,死信队列文件存储在 path.data/dead_letter_queue 中。每个管道都有一个单独的队列。例如,main 管道的死信队列默认存储在 LOGSTASH_HOME/data/dead_letter_queue/main 中。队列文件按顺序编号:1.log2.log,依此类推。

您可以在 logstash.yml 文件中设置 path.dead_letter_queue 以指定文件的不同路径

path.dead_letter_queue: "path/to/data/dead_letter_queue"

使用本地文件系统来确保数据完整性和性能。不支持网络文件系统 (NFS)。

死信队列条目被写入临时文件,然后重命名为死信队列段文件,然后该文件可用于摄取。当此临时文件被视为已满或自上次将符合条件的死信队列事件写入临时文件以来经过一段时间后,就会发生重命名。

可以使用 dead_letter_queue.flush_interval 设置来设置这段时间。此设置以毫秒为单位,默认为 5000 毫秒。此处的值较低意味着,如果很少写入死信队列,则可能会写入更多、更小的队列文件,而值较大则会在“写入”到死信队列的项目与可供读取的项目之间引入更多延迟由 dead_letter_queue 输入。

Note that this value cannot be set to lower than 1000ms.
dead_letter_queue.flush_interval: 5000

您不能对两个不同的 Logstash 实例使用相同的 dead_letter_queue 路径。

文件轮换编辑

死信队列具有内置的文件轮换策略,可以管理队列的文件大小。当文件大小达到预先配置的阈值时,会自动创建一个新文件。

大小管理编辑

默认情况下,每个死信队列的最大大小设置为 1024mb。要更改此设置,请使用 dead_letter_queue.max_bytes 选项。如果条目会导致死信队列的大小超过此设置,则会将其丢弃。使用 dead_letter_queue.storage_policy 选项来控制应丢弃哪些条目以避免超过大小限制。将值设置为 drop_newer(默认值)以停止接受会导致文件大小超过限制的新值。将值设置为 drop_older 以删除最旧的事件,以便为新事件腾出空间。

年龄策略编辑

您可以使用年龄策略来自动控制死信队列中的事件量。使用 dead_letter_queue.retain.age 设置(在 logstash.ymlpipelines.yml 中)让 Logstash 删除早于您定义的值的事件。可用的时间单位分别是 dhms,分别表示天、小时、分钟和秒。没有默认的时间单位,因此您需要指定它。

dead_letter_queue.retain.age: 2d

年龄策略在事件写入和管道关闭期间进行验证和应用。因此,您的死信队列文件夹可能会存储过期的事件的时间超过指定时间,并且读取器管道可能会遇到过时的事件。

自动清除已消耗的事件编辑

默认情况下,死信队列输入插件不会删除它消耗的事件。相反,它会提交一个引用以避免重新处理事件。在死信队列输入插件中使用 clean_consumed 设置,以便删除已完全消耗的段,从而在处理时释放空间。

input {
  dead_letter_queue {
  	path => "/path/to/data/dead_letter_queue"
  	pipeline_id => "main"
    clean_consumed => true
  }
}

处理死信队列中的事件编辑

当您准备好处理死信队列中的事件时,您可以创建一个管道,该管道使用 dead_letter_queue 输入插件 从死信队列中读取。您使用的管道配置当然取决于您需要执行的操作。例如,如果死信队列包含 Elasticsearch 中映射错误导致的事件,您可以创建一个管道来读取“死”事件,删除导致映射问题的字段,并将清理后的事件重新索引到 Elasticsearch 中。

以下示例显示了一个简单的管道,该管道从死信队列中读取事件并将事件(包括元数据)写入标准输出

input {
  dead_letter_queue {
    path => "/path/to/data/dead_letter_queue" 
    commit_offsets => true 
    pipeline_id => "main" 
  }
}

output {
  stdout {
    codec => rubydebug { metadata => true }
  }
}

包含死信队列的顶级目录的路径。此目录包含写入死信队列的每个管道的一个单独文件夹。要查找此目录的路径,请查看 logstash.yml 设置文件。默认情况下,Logstash 在用于持久存储的位置(path.data)下创建 dead_letter_queue 目录,例如,LOGSTASH_HOME/data/dead_letter_queue。但是,如果设置了 path.dead_letter_queue,它将使用该位置。

如果为 true,则保存偏移量。当管道重新启动时,它将继续从它停止的位置读取,而不是重新处理队列中的所有项目。当您正在探索死信队列中的事件并希望多次迭代事件时,可以将 commit_offsets 设置为 false

写入死信队列的管道的 ID。默认为 "main"

有关另一个示例,请参阅示例:处理具有映射错误的数据

当管道处理完死信队列中的所有事件后,它将继续运行并处理流入队列的新事件。 这意味着您无需停止生产系统即可处理死信队列中的事件。

如果无法正确处理从 dead_letter_queue 输入插件 发出的事件,则不会将它们重新提交到死信队列。

从时间戳读取编辑

从死信队列读取数据时,您可能不想处理队列中的所有事件,尤其是在队列中有很多旧事件的情况下。 您可以使用 start_timestamp 选项从队列中的特定点开始处理事件。 此选项将管道配置为根据事件进入队列的时间戳开始处理事件

input {
  dead_letter_queue {
    path => "/path/to/data/dead_letter_queue"
    start_timestamp => "2017-06-06T23:40:37"
    pipeline_id => "main"
  }
}

对于此示例,管道开始读取在 2017 年 6 月 6 日 23:40:37 或之后传递到死信队列的所有事件。

示例:处理具有映射错误的数据编辑

在此示例中,用户尝试索引包含 geo_ip 数据的文档,但由于数据包含映射错误,因此无法处理该数据

{"geoip":{"location":"home"}}

索引失败,因为 Logstash 输出插件需要 location 字段中的 geo_point 对象,但该值是一个字符串。 失败的事件将写入死信队列,以及有关导致失败的错误的元数据

{
   "@metadata" => {
    "dead_letter_queue" => {
       "entry_time" => #<Java::OrgLogstash::Timestamp:0x5b5dacd5>,
        "plugin_id" => "fb80f1925088497215b8d037e622dec5819b503e-4",
      "plugin_type" => "elasticsearch",
           "reason" => "Could not index event to Elasticsearch. status: 400, action: [\"index\", {:_id=>nil, :_index=>\"logstash-2017.06.22\", :_type=>\"doc\", :_routing=>nil}, 2017-06-22T01:29:29.804Z My-MacBook-Pro-2.local {\"geoip\":{\"location\":\"home\"}}], response: {\"index\"=>{\"_index\"=>\"logstash-2017.06.22\", \"_type\"=>\"doc\", \"_id\"=>\"AVzNayPze1iR9yDdI2MD\", \"status\"=>400, \"error\"=>{\"type\"=>\"mapper_parsing_exception\", \"reason\"=>\"failed to parse\", \"caused_by\"=>{\"type\"=>\"illegal_argument_exception\", \"reason\"=>\"illegal latitude value [266.30859375] for geoip.location\"}}}}"
    }
  },
  "@timestamp" => 2017-06-22T01:29:29.804Z,
    "@version" => "1",
       "geoip" => {
    "location" => "home"
  },
        "host" => "My-MacBook-Pro-2.local",
     "message" => "{\"geoip\":{\"location\":\"home\"}}"
}

要处理失败的事件,请创建以下管道,该管道从死信队列读取数据并删除映射问题

input {
  dead_letter_queue {
    path => "/path/to/data/dead_letter_queue/" 
  }
}
filter {
  mutate {
    remove_field => "[geoip][location]" 
  }
}
output {
  elasticsearch{
    hosts => [ "localhost:9200" ] 
  }
}

dead_letter_queue 输入 从死信队列读取数据。

mutate 过滤器删除名为 location 的问题字段。

清理后的事件将发送到 Elasticsearch,由于映射问题已解决,因此可以在其中对其进行索引。

跟踪死信队列大小编辑

在死信队列成为问题之前监控其大小。 通过定期检查,您可以确定每个管道有意义的最大队列大小。

每个管道的 DLQ 大小在节点统计信息 API 中可用。

pipelines.${pipeline_id}.dead_letter_queue.queue_size_in_bytes.

其中 {pipeline_id} 是启用了 DLQ 的管道的名称。

清除死信队列编辑

在上游管道运行时无法清除死信队列。

死信队列是页面的目录。 要清除它,请停止管道并删除 location/<文件名>。

${path.data}/dead_letter_queue/${pipeline_id}

其中 {pipeline_id} 是启用了 DLQ 的管道的名称。

管道在再次启动时会创建一个新的死信队列。