持久化队列 (PQ)

编辑

Logstash 持久化队列通过将正在处理的消息队列存储到磁盘,有助于防止在异常终止期间丢失数据。

持久化队列的优势

编辑

持久化队列 (PQ)

  • 有助于防止在正常关闭和 Logstash 异常终止时丢失消息。如果在事件正在处理时重新启动 Logstash,Logstash 会尝试传递存储在持久化队列中的消息,直到至少成功传递一次。
  • 可以在不需要像 Redis 或 Apache Kafka 这样的外部缓冲机制的情况下吸收事件突发。

默认情况下,持久化队列处于禁用状态。要启用它们,请查看配置持久化队列

持久化队列的局限性

编辑

持久化队列不能解决以下问题

  • 不使用请求-响应协议的输入插件无法防止数据丢失。Tcp、udp、zeromq push+pull 和许多其他输入没有向发送者确认接收的机制。(像 beats 和 http 这样的插件,确实具有确认能力,可以很好地受到此队列的保护。)
  • 如果在检查点文件提交之前发生异常关闭,则可能会丢失数据。
  • 持久化队列不处理永久性的机器故障,例如磁盘损坏、磁盘故障和机器丢失。持久化到磁盘的数据不会被复制。

为了数据完整性和性能,请使用本地文件系统。不支持网络文件系统 (NFS)。

配置持久化队列

编辑

要配置持久化队列,请在 Logstash 设置文件中指定选项。设置将应用于每个管道。

当您为容量和大小设置设置值时,请记住您设置的值是按管道应用的,而不是在所有管道之间共享的总值。

如果要为特定管道定义值,请使用 pipelines.yml

queue.type
指定 persisted 以启用持久化队列。默认情况下,持久化队列被禁用(默认值:queue.type: memory)。
path.queue
数据文件将存储到的目录路径。默认情况下,文件存储在 path.data/queue 中。
queue.page_capacity
队列数据由称为“页面”的仅追加文件组成。此值设置队列页面的最大大小(以字节为单位)。默认大小 64MB 对于大多数用户来说是一个很好的值,更改此值不太可能带来性能提升。如果您更改现有队列的页面容量,则新大小仅适用于新页面。
queue.drain
如果您希望 Logstash 在关闭之前等待直到持久化队列排空,请指定 true。排空队列所需的时间取决于队列中累积的事件数量。因此,您应避免使用此设置,除非队列(即使已满)相对较小且可以快速排空。
queue.max_events
管道工作线程尚未读取的最大事件数。默认值为 0(无限制)。我们使用此设置进行内部测试。用户通常不应更改此值。
queue.max_bytes

每个队列的总容量,以字节为单位。除非在 pipelines.yml 或中央管理中被覆盖,否则每个持久化队列的大小都将为 logstash.yml 中指定的 queue.max_bytes 的值。默认值为 1024MB (1GB)。

请确保您的磁盘有足够的容量来处理所有持久化队列的 queue.max_bytes 的累积总和。所有队列的 queue.max_bytes 总和应低于您的磁盘容量。

如果您使用持久化队列来防止数据丢失,但不需要太多缓冲,您可以将 queue.max_bytes 设置为较小的值,只要它不小于 queue.page_capacity 的值即可。较小的值会产生较小的队列并提高队列性能。

queue.checkpoint.acks
设置强制检查点之前的已确认事件数。默认值为 1024。设置为 0 表示无限制。
queue.checkpoint.writes

设置强制检查点之前的最大写入事件数。默认值为 1024。设置为 0 表示无限制。

为避免丢失持久化队列中的数据,您可以设置 queue.checkpoint.writes: 1 以强制在写入每个事件后进行检查点操作。请记住,磁盘写入会产生资源成本。将此值设置为 1 可确保最大的持久性,但可能会严重影响性能。请参阅控制持久性以更好地了解权衡。

queue.checkpoint.interval
设置强制在头页面上进行检查点操作的时间间隔(以毫秒为单位)。默认值为 1000。设置为 0 以消除定期检查点。

配置说明

编辑

每种情况和环境都不同,“理想”的配置也会有所不同。如果您针对性能进行优化,可能会增加丢失数据的风险。如果您针对数据保护进行优化,可能会影响性能。

队列大小

编辑

您可以使用 queue.max_eventsqueue.max_bytes 设置来控制队列大小。如果同时指定了这两个设置,Logstash 将使用首先达到的标准。有关达到队列限制时的行为,请参阅处理反压力

队列的适当大小取决于用例。作为一般指导原则,请考虑以下公式来调整持久化队列的大小。

Bytes Received Per Second = Incoming Events Per Second * Raw Event Byte Size
Bytes Received Per Hour = Bytes Received per Second * 3600s
Required Queue Capacity = (Bytes Received Per Hour * Tolerated Hours of Downtime) * Multiplication Factor 

首先,您可以将 乘法因子 设置为 1.10,然后根据下表中的指示为特定数据类型进行优化。

按数据类型调整队列大小
编辑

Logstash 在将接收到的事件存储到队列中之前会对其进行序列化。此过程会导致 Logstash 内部事件的额外开销。此开销取决于 原始事件大小的类型和大小。因此,乘法因子会根据您的用例而变化。下表显示了按事件类型划分的开销示例以及它如何影响乘法因子。

原始字符串消息

纯文本大小(字节) 序列化的 Logstash 事件大小(字节) 开销(字节) 开销 (%) 乘法因子

11

213

202

1836%

19.4

1212

1416

204

17%

1.17

10240

10452

212

2%

1.02

JSON 文档

JSON 文档大小(字节) 序列化的 Logstash 事件大小(字节) 开销(字节) 开销 (%) 乘法因子

947

1133

186

20%

1.20

2707

3206

499

18%

1.18

6751

7388

637

9%

1.9

58901

59693

792

1%

1.1

示例

让我们考虑一个每秒接收 1000 个 EPS 的 Logstash 实例,每个事件大小为 1KB,或每小时 3.5GB。为了容忍下游组件不可用 12 小时而不会让 Logstash 对上游施加反压力,持久化队列的 max_bytes 必须设置为 3.6*12*1.10 = 47.25GB,或大约 50GB。

较小的队列大小

编辑

如果您使用持久化队列来防止数据丢失,但不需要太多缓冲,您可以将 queue.max_bytes 设置为较小的值。较小的值可能会产生较小的队列并提高队列性能。

示例配置

queue.type: persisted
queue.max_bytes: 10mb

较少的检查点

编辑

queue.checkpoint.writesqueue.checkpoint.acks 设置为 0 可能会产生最大性能,但可能会对持久性产生潜在影响。

在 Logstash 终止或出现硬件级别故障的情况下,任何尚未进行检查点操作的数据都将丢失。请参阅控制持久性以更好地了解权衡。

PQ 和管道到管道通信

编辑

持久化队列可以在您的管道到管道配置中发挥重要作用。

用例:PQ 和输出隔离器模式
编辑

以下是 Logstash 用户描述的一个真实用例。

在我们的部署中,我们为每个输出使用一个管道,并且每个管道都有一个大的 PQ。此配置允许单个输出停止而不会阻止输入(以及所有其他输出),直到操作员可以恢复到停止的输出的流并让队列排空。

我们的实时输出必须是低延迟的,而我们的批量输出必须是一致的。我们使用 PQ 来防止实时输出停止,而不是为了防止批量输出中的数据丢失。(尽管保护也很好)。

对持久化队列进行故障排除

编辑

持久化队列问题的症状包括 Logstash 或一个或多个管道无法成功启动,并伴有类似于以下内容的错误消息。

message=>"java.io.IOException: Page file size is too small to hold elements"

此错误表明头页面(目录中最旧的页面,也是页面 ID 最低的页面)的大小 < 18 个字节,即页面标题的大小。

要研究和解决此问题

  1. 通过检查日志文件或运行 pqcheck 实用程序来识别可能损坏的队列。
  2. 停止 Logstash,并等待它关闭。
  3. 针对每个损坏的队列运行 pqrepair <path>

pqcheck 实用程序

编辑
the `pqcheck` utility to identify which persistent queue--or queues--have been corrupted.

从 LOGSTASH_HOME 运行

bin/pqcheck <queue_directory>

其中 <queue_directory> 是持久化队列位置的完整路径。

pqcheck 实用程序会读取给定目录中的检查点文件,并输出有关这些文件当前状态的信息。该实用程序会为每个检查点文件输出此信息

  • 检查点文件名
  • 页面文件是否已完全确认。完全确认的页面文件表示所有事件都已被读取和处理。
  • 检查点文件引用的页面文件名
  • 页面文件的大小。大小为 0 的页面文件会导致输出 未找到。在这种情况下,请针对指定的队列目录运行 pqrepair
  • 页码
  • 第一个未确认的页码(仅在头检查点中相关)
  • 页面中第一个未确认的事件序列号
  • 页面中第一个事件序列号
  • 页面中的事件数
  • 页面是否已完全确认

具有健康页面文件的示例

此示例表示一个包含三个页面文件的健康队列。在此示例中,Logstash 当前正在写入 page.2,如 checkpoint.head 所引用。Logstash 正在从 page.0 读取,如 checkpoint.0 所引用。

ubuntu@bigger:/usr/share/logstash$ bin/pqcheck /var/lib/logstash/queue/main/
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Checking queue dir: /var/lib/logstash/queue/main
checkpoint.1, fully-acked: NO, page.1 size: 67108864
  pageNum=1, firstUnackedPageNum=0, firstUnackedSeqNum=239675, minSeqNum=239675,
  elementCount=218241, isFullyAcked=no
checkpoint.head, fully-acked: NO, page.2 size: 67108864
  pageNum=2, firstUnackedPageNum=0, firstUnackedSeqNum=457916, minSeqNum=457916, elementCount=11805, isFullyAcked=no
checkpoint.0, fully-acked: NO, page.0 size: 67108864  
  pageNum=0, firstUnackedPageNum=0, firstUnackedSeqNum=176126, minSeqNum=1,
  elementCount=239674, isFullyAcked=no 

表示 checkpoint.0,它引用页面文件 page.0,大小为 67108864

继续针对 checkpoint.0,这些行表示页面号为 0,第一个未确认的事件编号为 176126,页面文件中包含 239674 个事件,此页面文件中的第一个事件编号为 1,并且该页面文件尚未完全确认。也就是说,页面文件中仍然有事件需要被摄取。

包含损坏页面文件的示例

如果 Logstash 没有启动和/或 pqcheck 显示异常,例如页面的 NOT_FOUND,请在队列目录上运行 pqrepair

bin/pqcheck /var/lib/logstash/queue/main/
Using bundled JDK: /usr/share/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Checking queue dir: /var/lib/logstash/queue/main
checkpoint.head, fully-acked: NO, page.2 size: NOT FOUND 
  pageNum=2, firstUnackedPageNum=2, firstUnackedSeqNum=534041, minSeqNum=457916,
  elementCount=76127, isFullyAcked=no

NOT FOUND 表示页面文件已损坏。针对指定的队列目录运行 pqrepair

如果队列显示 fully-acked: YES 且大小为 0 字节,则可以安全地删除该文件。

pqrepair 实用程序

编辑

pqrepair 实用程序尝试删除损坏的队列段,使队列恢复正常工作。它从启动所在的目录开始搜索,并查找 data/queue/main

在此操作中,队列可能会丢失一些数据。

从 LOGSTASH_HOME 运行

bin/pqrepair <queue_directory>

其中 <queue_directory> 是持久化队列位置的完整路径。

如果实用程序运行正常,则没有输出。

pqrepair 实用程序需要对目录的写入权限。当 Logstash 作为服务运行时,文件夹权限可能会导致问题。在这种情况下,请使用 sudo

/usr/share/logstash$ sudo -u logstash bin/pqrepair /var/lib/logstash/queue/main/

运行 pqrepair 后,重新启动 Logstash 以验证修复操作是否成功。

清空队列

编辑

您可能会遇到需要清空队列的情况。示例包括

  • 暂停新的摄取。在某些情况下,您可能希望停止新的摄取,但仍保留数据的积压。
  • PQ 修复。您可以在修复旧队列时清空队列以路由到不同的 PQ。
  • 数据或工作流程迁移。如果您要从磁盘/硬件迁移和/或迁移到新的数据流,您可能需要清空现有队列。

要清空持久队列

  1. logstash.yml 文件中,设置 queue.drain: true
  2. 重新启动 Logstash 以使此设置生效。
  3. 关闭 Logstash(使用 CTRL+C 或 SIGTERM),并等待队列清空。

持久队列的工作原理

编辑

队列位于同一进程中的输入和过滤器阶段之间

输入 → 队列 → 过滤器 + 输出

当输入有准备好处理的事件时,它会将事件写入队列。当写入队列成功后,输入可以向其数据源发送确认。

从队列处理事件时,Logstash 仅在过滤器和输出完成后才在队列内确认已完成的事件。队列会记录管道已处理的事件。如果且仅当事件已被 Logstash 管道完全处理时,该事件才会被记录为已处理(在本文档中称为“已确认”或“ACKed”)。

已确认是什么意思?这意味着事件已由所有已配置的过滤器和输出处理。例如,如果您只有一个输出(Elasticsearch),则当 Elasticsearch 输出成功将此事件发送到 Elasticsearch 时,该事件将被 ACKed。

在正常关闭期间(CTRL+C 或 SIGTERM),Logstash 会停止从队列读取,并完成处理过滤器和输出正在处理的在途事件。重新启动后,Logstash 会恢复处理持久队列中的事件,并接受来自输入的新事件。

如果 Logstash 异常终止,任何在途事件将不会被 ACKed,并且当 Logstash 重新启动时,将由过滤器和输出重新处理。Logstash 会批量处理事件,因此对于任何给定的批次,在发生异常终止时,该批次中的某些事件可能已成功完成,但未记录为已 ACKed。

如果您通过设置 drain.queue: true 来覆盖默认行为,Logstash 将从队列读取,直到队列清空为止,即使在受控关闭之后也是如此。

有关队列写入和确认的具体行为的更多详细信息,请参阅 控制持久性

处理背压

编辑

当队列已满时,Logstash 会对输入施加背压,以阻止数据流入 Logstash。此机制有助于 Logstash 在输入阶段控制数据流速率,而不会使 Elasticsearch 等输出不堪重负。

使用 queue.max_bytes 设置来配置磁盘上队列的总容量。以下示例将队列的总容量设置为 8GB

queue.type: persisted
queue.max_bytes: 8gb

通过指定这些设置,Logstash 会在磁盘上缓冲事件,直到队列的大小达到 8GB。当队列中充满未 ACKed 的事件并且达到大小限制时,Logstash 将不再接受新事件。

每个输入都会独立处理背压。例如,当 beats 输入遇到背压时,它将不再接受新连接,并等待持久队列有空间接受更多事件。在过滤器和输出阶段完成处理队列中的现有事件并 ACK 它们之后,Logstash 会自动开始接受新事件。

控制持久性

编辑

持久性是存储写入的属性,可确保数据在写入后可用。

启用持久队列功能后,Logstash 会将事件存储在磁盘上。Logstash 通过一种称为检查点的机制提交到磁盘。

队列本身是一组页面。有两种页面:头页面和尾页面。头页面是写入新事件的位置。只有一个头页面。当头页面达到特定大小(请参阅 queue.page_capacity)时,它将变为尾页面,并创建一个新的头页面。尾页面是不可变的,而头页面是仅追加的。其次,队列会在一个单独的名为检查点文件的文件中记录关于自身(页面、确认等)的详细信息。

记录检查点时,Logstash

  • 对头页面调用 fsync
  • 原子地将队列的当前状态写入磁盘。

检查点的过程是原子的,这意味着对文件的任何更新都将在成功时保存。

如果 Logstash 终止,或者存在硬件级故障,则持久队列中缓冲但尚未检查点的任何数据都将丢失。

您可以通过设置 queue.checkpoint.writes 来强制 Logstash 更频繁地进行检查点。此设置指定在强制执行检查点之前可能写入磁盘的最大事件数。默认值为 1024。要确保最大程度的持久性并避免持久队列中的数据丢失,您可以将 queue.checkpoint.writes: 1 设置为在每次写入事件后强制执行检查点。请记住,磁盘写入会产生资源成本。将此值设置为 1 会严重影响性能。

磁盘垃圾回收

编辑

在磁盘上,队列存储为一组页面,其中每个页面都是一个文件。每个页面的大小最大为 queue.page_capacity。在页面中的所有事件都被 ACKed 后,页面将被删除(垃圾回收)。如果较旧的页面中至少有一个事件尚未被 ACKed,则整个页面将保留在磁盘上,直到该页面中的所有事件都成功处理。每个包含未处理事件的页面都将计入 queue.max_bytes 字节大小。