持久化队列 (PQ)
编辑持久化队列 (PQ)
编辑Logstash 持久化队列通过将正在处理的消息队列存储到磁盘,有助于防止在异常终止期间丢失数据。
持久化队列的优势
编辑持久化队列 (PQ)
- 有助于防止在正常关闭和 Logstash 异常终止时丢失消息。如果在事件正在处理时重新启动 Logstash,Logstash 会尝试传递存储在持久化队列中的消息,直到至少成功传递一次。
- 可以在不需要像 Redis 或 Apache Kafka 这样的外部缓冲机制的情况下吸收事件突发。
默认情况下,持久化队列处于禁用状态。要启用它们,请查看配置持久化队列。
持久化队列的局限性
编辑持久化队列不能解决以下问题
- 不使用请求-响应协议的输入插件无法防止数据丢失。Tcp、udp、zeromq push+pull 和许多其他输入没有向发送者确认接收的机制。(像 beats 和 http 这样的插件,确实具有确认能力,可以很好地受到此队列的保护。)
- 如果在检查点文件提交之前发生异常关闭,则可能会丢失数据。
- 持久化队列不处理永久性的机器故障,例如磁盘损坏、磁盘故障和机器丢失。持久化到磁盘的数据不会被复制。
为了数据完整性和性能,请使用本地文件系统。不支持网络文件系统 (NFS)。
配置持久化队列
编辑要配置持久化队列,请在 Logstash 设置文件中指定选项。设置将应用于每个管道。
当您为容量和大小设置设置值时,请记住您设置的值是按管道应用的,而不是在所有管道之间共享的总值。
如果要为特定管道定义值,请使用 pipelines.yml
。
-
queue.type
- 指定
persisted
以启用持久化队列。默认情况下,持久化队列被禁用(默认值:queue.type: memory
)。 -
path.queue
- 数据文件将存储到的目录路径。默认情况下,文件存储在
path.data/queue
中。 -
queue.page_capacity
- 队列数据由称为“页面”的仅追加文件组成。此值设置队列页面的最大大小(以字节为单位)。默认大小 64MB 对于大多数用户来说是一个很好的值,更改此值不太可能带来性能提升。如果您更改现有队列的页面容量,则新大小仅适用于新页面。
-
queue.drain
- 如果您希望 Logstash 在关闭之前等待直到持久化队列排空,请指定
true
。排空队列所需的时间取决于队列中累积的事件数量。因此,您应避免使用此设置,除非队列(即使已满)相对较小且可以快速排空。 -
queue.max_events
- 管道工作线程尚未读取的最大事件数。默认值为 0(无限制)。我们使用此设置进行内部测试。用户通常不应更改此值。
-
queue.max_bytes
-
每个队列的总容量,以字节为单位。除非在
pipelines.yml
或中央管理中被覆盖,否则每个持久化队列的大小都将为logstash.yml
中指定的queue.max_bytes
的值。默认值为 1024MB (1GB)。请确保您的磁盘有足够的容量来处理所有持久化队列的
queue.max_bytes
的累积总和。所有队列的queue.max_bytes
总和应低于您的磁盘容量。如果您使用持久化队列来防止数据丢失,但不需要太多缓冲,您可以将
queue.max_bytes
设置为较小的值,只要它不小于queue.page_capacity
的值即可。较小的值会产生较小的队列并提高队列性能。 -
queue.checkpoint.acks
- 设置强制检查点之前的已确认事件数。默认值为
1024
。设置为0
表示无限制。 -
queue.checkpoint.writes
-
设置强制检查点之前的最大写入事件数。默认值为
1024
。设置为0
表示无限制。为避免丢失持久化队列中的数据,您可以设置
queue.checkpoint.writes: 1
以强制在写入每个事件后进行检查点操作。请记住,磁盘写入会产生资源成本。将此值设置为1
可确保最大的持久性,但可能会严重影响性能。请参阅控制持久性以更好地了解权衡。 -
queue.checkpoint.interval
- 设置强制在头页面上进行检查点操作的时间间隔(以毫秒为单位)。默认值为
1000
。设置为0
以消除定期检查点。
配置说明
编辑每种情况和环境都不同,“理想”的配置也会有所不同。如果您针对性能进行优化,可能会增加丢失数据的风险。如果您针对数据保护进行优化,可能会影响性能。
队列大小
编辑您可以使用 queue.max_events
和 queue.max_bytes
设置来控制队列大小。如果同时指定了这两个设置,Logstash 将使用首先达到的标准。有关达到队列限制时的行为,请参阅处理反压力。
队列的适当大小取决于用例。作为一般指导原则,请考虑以下公式来调整持久化队列的大小。
Bytes Received Per Second = Incoming Events Per Second * Raw Event Byte Size Bytes Received Per Hour = Bytes Received per Second * 3600s Required Queue Capacity = (Bytes Received Per Hour * Tolerated Hours of Downtime) * Multiplication Factor
按数据类型调整队列大小
编辑Logstash 在将接收到的事件存储到队列中之前会对其进行序列化。此过程会导致 Logstash 内部事件的额外开销。此开销取决于 原始事件大小
的类型和大小。因此,乘法因子
会根据您的用例而变化。下表显示了按事件类型划分的开销示例以及它如何影响乘法因子。
原始字符串消息
纯文本大小(字节) | 序列化的 Logstash 事件大小(字节) | 开销(字节) | 开销 (%) | 乘法因子 |
---|---|---|---|---|
11 |
213 |
|
|
|
1212 |
1416 |
|
|
|
10240 |
10452 |
|
|
|
JSON 文档
JSON 文档大小(字节) | 序列化的 Logstash 事件大小(字节) | 开销(字节) | 开销 (%) | 乘法因子 |
---|---|---|---|---|
947 |
1133 |
|
|
|
2707 |
3206 |
|
|
|
6751 |
7388 |
|
|
|
58901 |
59693 |
|
|
|
示例
让我们考虑一个每秒接收 1000 个 EPS 的 Logstash 实例,每个事件大小为 1KB,或每小时 3.5GB。为了容忍下游组件不可用 12 小时而不会让 Logstash 对上游施加反压力,持久化队列的 max_bytes
必须设置为 3.6*12*1.10 = 47.25GB,或大约 50GB。
较小的队列大小
编辑如果您使用持久化队列来防止数据丢失,但不需要太多缓冲,您可以将 queue.max_bytes
设置为较小的值。较小的值可能会产生较小的队列并提高队列性能。
示例配置
queue.type: persisted queue.max_bytes: 10mb
对持久化队列进行故障排除
编辑持久化队列问题的症状包括 Logstash 或一个或多个管道无法成功启动,并伴有类似于以下内容的错误消息。
message=>"java.io.IOException: Page file size is too small to hold elements"
此错误表明头页面(目录中最旧的页面,也是页面 ID 最低的页面)的大小 < 18 个字节,即页面标题的大小。
要研究和解决此问题
- 通过检查日志文件或运行
pqcheck
实用程序来识别可能损坏的队列。 - 停止 Logstash,并等待它关闭。
- 针对每个损坏的队列运行
pqrepair <path>
。
pqcheck
实用程序
编辑the `pqcheck` utility to identify which persistent queue--or queues--have been corrupted.
从 LOGSTASH_HOME 运行
bin/pqcheck <queue_directory>
其中 <queue_directory>
是持久化队列位置的完整路径。
pqcheck 实用程序
会读取给定目录中的检查点文件,并输出有关这些文件当前状态的信息。该实用程序会为每个检查点文件输出此信息
- 检查点文件名
- 页面文件是否已完全确认。完全确认的页面文件表示所有事件都已被读取和处理。
- 检查点文件引用的页面文件名
- 页面文件的大小。大小为 0 的页面文件会导致输出
未找到
。在这种情况下,请针对指定的队列目录运行pqrepair
。 - 页码
- 第一个未确认的页码(仅在头检查点中相关)
- 页面中第一个未确认的事件序列号
- 页面中第一个事件序列号
- 页面中的事件数
- 页面是否已完全确认
具有健康页面文件的示例
此示例表示一个包含三个页面文件的健康队列。在此示例中,Logstash 当前正在写入 page.2
,如 checkpoint.head
所引用。Logstash 正在从 page.0
读取,如 checkpoint.0
所引用。
ubuntu@bigger:/usr/share/logstash$ bin/pqcheck /var/lib/logstash/queue/main/ Using bundled JDK: /usr/share/logstash/jdk OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Checking queue dir: /var/lib/logstash/queue/main checkpoint.1, fully-acked: NO, page.1 size: 67108864 pageNum=1, firstUnackedPageNum=0, firstUnackedSeqNum=239675, minSeqNum=239675, elementCount=218241, isFullyAcked=no checkpoint.head, fully-acked: NO, page.2 size: 67108864 pageNum=2, firstUnackedPageNum=0, firstUnackedSeqNum=457916, minSeqNum=457916, elementCount=11805, isFullyAcked=no checkpoint.0, fully-acked: NO, page.0 size: 67108864 pageNum=0, firstUnackedPageNum=0, firstUnackedSeqNum=176126, minSeqNum=1, elementCount=239674, isFullyAcked=no
表示 |
|
继续针对 |
包含损坏页面文件的示例
如果 Logstash 没有启动和/或 pqcheck
显示异常,例如页面的 NOT_FOUND
,请在队列目录上运行 pqrepair
。
bin/pqcheck /var/lib/logstash/queue/main/ Using bundled JDK: /usr/share/logstash/jdk OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Checking queue dir: /var/lib/logstash/queue/main checkpoint.head, fully-acked: NO, page.2 size: NOT FOUND pageNum=2, firstUnackedPageNum=2, firstUnackedSeqNum=534041, minSeqNum=457916, elementCount=76127, isFullyAcked=no
如果队列显示 fully-acked: YES
且大小为 0 字节,则可以安全地删除该文件。
pqrepair
实用程序
编辑pqrepair
实用程序尝试删除损坏的队列段,使队列恢复正常工作。它从启动所在的目录开始搜索,并查找 data/queue/main
。
在此操作中,队列可能会丢失一些数据。
从 LOGSTASH_HOME 运行
bin/pqrepair <queue_directory>
其中 <queue_directory>
是持久化队列位置的完整路径。
如果实用程序运行正常,则没有输出。
pqrepair
实用程序需要对目录的写入权限。当 Logstash 作为服务运行时,文件夹权限可能会导致问题。在这种情况下,请使用 sudo
。
/usr/share/logstash$ sudo -u logstash bin/pqrepair /var/lib/logstash/queue/main/
运行 pqrepair
后,重新启动 Logstash 以验证修复操作是否成功。
清空队列
编辑您可能会遇到需要清空队列的情况。示例包括
- 暂停新的摄取。在某些情况下,您可能希望停止新的摄取,但仍保留数据的积压。
- PQ 修复。您可以在修复旧队列时清空队列以路由到不同的 PQ。
- 数据或工作流程迁移。如果您要从磁盘/硬件迁移和/或迁移到新的数据流,您可能需要清空现有队列。
要清空持久队列
- 在
logstash.yml
文件中,设置queue.drain: true
。 - 重新启动 Logstash 以使此设置生效。
- 关闭 Logstash(使用 CTRL+C 或 SIGTERM),并等待队列清空。
持久队列的工作原理
编辑队列位于同一进程中的输入和过滤器阶段之间
输入 → 队列 → 过滤器 + 输出
当输入有准备好处理的事件时,它会将事件写入队列。当写入队列成功后,输入可以向其数据源发送确认。
从队列处理事件时,Logstash 仅在过滤器和输出完成后才在队列内确认已完成的事件。队列会记录管道已处理的事件。如果且仅当事件已被 Logstash 管道完全处理时,该事件才会被记录为已处理(在本文档中称为“已确认”或“ACKed”)。
已确认是什么意思?这意味着事件已由所有已配置的过滤器和输出处理。例如,如果您只有一个输出(Elasticsearch),则当 Elasticsearch 输出成功将此事件发送到 Elasticsearch 时,该事件将被 ACKed。
在正常关闭期间(CTRL+C 或 SIGTERM),Logstash 会停止从队列读取,并完成处理过滤器和输出正在处理的在途事件。重新启动后,Logstash 会恢复处理持久队列中的事件,并接受来自输入的新事件。
如果 Logstash 异常终止,任何在途事件将不会被 ACKed,并且当 Logstash 重新启动时,将由过滤器和输出重新处理。Logstash 会批量处理事件,因此对于任何给定的批次,在发生异常终止时,该批次中的某些事件可能已成功完成,但未记录为已 ACKed。
如果您通过设置 drain.queue: true
来覆盖默认行为,Logstash 将从队列读取,直到队列清空为止,即使在受控关闭之后也是如此。
有关队列写入和确认的具体行为的更多详细信息,请参阅 控制持久性。
处理背压
编辑当队列已满时,Logstash 会对输入施加背压,以阻止数据流入 Logstash。此机制有助于 Logstash 在输入阶段控制数据流速率,而不会使 Elasticsearch 等输出不堪重负。
使用 queue.max_bytes
设置来配置磁盘上队列的总容量。以下示例将队列的总容量设置为 8GB
queue.type: persisted queue.max_bytes: 8gb
通过指定这些设置,Logstash 会在磁盘上缓冲事件,直到队列的大小达到 8GB。当队列中充满未 ACKed 的事件并且达到大小限制时,Logstash 将不再接受新事件。
每个输入都会独立处理背压。例如,当 beats 输入遇到背压时,它将不再接受新连接,并等待持久队列有空间接受更多事件。在过滤器和输出阶段完成处理队列中的现有事件并 ACK 它们之后,Logstash 会自动开始接受新事件。
控制持久性
编辑持久性是存储写入的属性,可确保数据在写入后可用。
启用持久队列功能后,Logstash 会将事件存储在磁盘上。Logstash 通过一种称为检查点的机制提交到磁盘。
队列本身是一组页面。有两种页面:头页面和尾页面。头页面是写入新事件的位置。只有一个头页面。当头页面达到特定大小(请参阅 queue.page_capacity
)时,它将变为尾页面,并创建一个新的头页面。尾页面是不可变的,而头页面是仅追加的。其次,队列会在一个单独的名为检查点文件的文件中记录关于自身(页面、确认等)的详细信息。
记录检查点时,Logstash
- 对头页面调用
fsync
。 - 原子地将队列的当前状态写入磁盘。
检查点的过程是原子的,这意味着对文件的任何更新都将在成功时保存。
如果 Logstash 终止,或者存在硬件级故障,则持久队列中缓冲但尚未检查点的任何数据都将丢失。
您可以通过设置 queue.checkpoint.writes
来强制 Logstash 更频繁地进行检查点。此设置指定在强制执行检查点之前可能写入磁盘的最大事件数。默认值为 1024。要确保最大程度的持久性并避免持久队列中的数据丢失,您可以将 queue.checkpoint.writes: 1
设置为在每次写入事件后强制执行检查点。请记住,磁盘写入会产生资源成本。将此值设置为 1
会严重影响性能。
磁盘垃圾回收
编辑在磁盘上,队列存储为一组页面,其中每个页面都是一个文件。每个页面的大小最大为 queue.page_capacity
。在页面中的所有事件都被 ACKed 后,页面将被删除(垃圾回收)。如果较旧的页面中至少有一个事件尚未被 ACKed,则整个页面将保留在磁盘上,直到该页面中的所有事件都成功处理。每个包含未处理事件的页面都将计入 queue.max_bytes
字节大小。