持久化队列 (PQ)
编辑持久化队列 (PQ)编辑
Logstash 持久化队列通过将正在传输的消息队列存储到磁盘,帮助防止在异常终止期间丢失数据。
持久化队列的优势编辑
持久化队列 (PQ)
- 有助于防止在正常关闭期间以及 Logstash 异常终止时丢失消息。如果在事件正在传输时重新启动 Logstash,Logstash 会尝试传送存储在持久化队列中的消息,直到传送至少成功一次。
- 可以吸收突发的事件,而无需使用 Redis 或 Apache Kafka 等外部缓冲机制。
默认情况下,持久化队列处于禁用状态。要启用它们,请查看配置持久化队列。
持久化队列的限制编辑
持久化队列无法解决以下问题
- 不使用请求-响应协议的输入插件无法防止数据丢失。Tcp、udp、zeromq 推送+拉取以及许多其他输入没有向发送方确认接收的机制。(确实具有确认功能的插件,例如 Beats 和 http,可以通过此队列得到很好的保护。)
- 如果在提交检查点文件之前发生异常关闭,则数据可能会丢失。
- 持久化队列不处理永久性机器故障,例如磁盘损坏、磁盘故障和机器丢失。持久化到磁盘的数据不会被复制。
使用本地文件系统来确保数据完整性和性能。不支持网络文件系统 (NFS)。
配置持久化队列编辑
要配置持久化队列,请在 Logstash 设置文件中指定选项。设置将应用于每个管道。
设置容量和大小调整设置的值时,请记住,您设置的值是应用于*每个管道*,而不是在所有管道之间共享的总数。
如果要为特定管道定义值,请使用pipelines.yml
。
-
queue.type
- 指定
persisted
以启用持久化队列。默认情况下,持久化队列处于禁用状态(默认值:queue.type: memory
)。 -
path.queue
- 数据文件将存储在其中的目录路径。默认情况下,文件存储在
path.data/queue
中。 -
queue.page_capacity
- 队列数据由称为“页面”的仅追加文件组成。此值以字节为单位设置队列页面的最大大小。对于大多数用户而言,默认大小 64mb 是一个很好的值,更改此值不太可能带来性能优势。如果更改现有队列的页面容量,则新大小仅适用于新页面。
-
queue.drain
- 如果您希望 Logstash 在关闭之前等待持久化队列排空,请指定
true
。排空队列所需的时间取决于队列中累积的事件数量。因此,除非队列即使已满也相对较小并且可以快速排空,否则应避免使用此设置。 -
queue.max_events
- 管道工作线程尚未读取的最大事件数。默认值为 0(无限制)。我们使用此设置进行内部测试。用户通常不应该更改此值。
-
queue.max_bytes
-
*每个队列*的总容量(以字节为单位)。除非在
pipelines.yml
或集中管理中被覆盖,否则每个持久化队列的大小都将设置为logstash.yml
中指定的queue.max_bytes
的值。默认值为 1024mb (1gb)。请确保您的磁盘具有足够的容量来处理所有持久化队列的
queue.max_bytes
的累积总和。*所有*队列的queue.max_bytes
总和应低于磁盘的容量。如果您使用持久化队列来防止数据丢失,但不需要太多缓冲,则可以将
queue.max_bytes
设置为较小的值,只要它不小于queue.page_capacity
的值即可。较小的值会产生较小的队列并提高队列性能。 -
queue.checkpoint.acks
- 设置在强制执行检查点之前已确认的事件数。默认值为
1024
。设置为0
表示无限制。 -
queue.checkpoint.writes
-
设置在强制执行检查点之前写入的最大事件数。默认值为
1024
。设置为0
表示无限制。为了避免在持久化队列中丢失数据,可以将
queue.checkpoint.writes: 1
设置为在写入每个事件后强制执行检查点。请记住,磁盘写入会占用资源。将此值设置为1
可确保最大的持久性,但会严重影响性能。请参阅控制持久性以更好地了解权衡取舍。 -
queue.checkpoint.interval
- 设置在首页上强制执行检查点的间隔时间(以毫秒为单位)。默认值为
1000
。设置为0
可消除定期检查点。
配置说明编辑
每种情况和环境都不同,“理想”配置也不尽相同。如果优化性能,则可能会增加数据丢失的风险。如果优化数据保护,则可能会影响性能。
队列大小编辑
您可以使用queue.max_events
和queue.max_bytes
设置来控制队列大小。如果同时指定了这两个设置,Logstash 将使用先达到的条件。请参阅处理反压以了解达到队列限制时的行为。
队列的适当大小取决于用例。作为一般指导原则,请考虑使用以下公式来确定持久化队列的大小。
Bytes Received Per Second = Incoming Events Per Second * Raw Event Byte Size Bytes Received Per Hour = Bytes Received per Second * 3600s Required Queue Capacity = (Bytes Received Per Hour * Tolerated Hours of Downtime) * Multiplication Factor
按数据类型划分的队列大小编辑
Logstash 会在将接收到的事件存储到队列之前对其进行序列化。此过程会导致 Logstash 内部事件的开销增加。此开销取决于原始事件大小
的类型和大小。因此,乘数
会根据您的用例而变化。下表显示了按事件类型划分的开销示例,以及这如何影响乘数。
原始字符串消息
纯文本大小(字节) | 序列化的 Logstash 事件大小(字节) | 开销(字节) | 开销 (%) | 乘数 |
---|---|---|---|---|
11 |
213 |
|
|
|
1212 |
1416 |
|
|
|
10240 |
10452 |
|
|
|
JSON 文档
JSON 文档大小(字节) | 序列化的 Logstash 事件大小(字节) | 开销(字节) | 开销 (%) | 乘数 |
---|---|---|---|---|
947 |
1133 |
|
|
|
2707 |
3206 |
|
|
|
6751 |
7388 |
|
|
|
58901 |
59693 |
|
|
|
示例
我们假设有一个 Logstash 实例,它每秒接收 1000 个事件,每个事件 1KB,即每小时 3.5GB。为了在不使 Logstash 对上游施加反压的情况下,容忍下游组件不可用 12 小时,持久化队列的max_bytes
必须设置为 3.6*12*1.10 = 47.25GB,即大约 50GB。
较小的队列大小编辑
如果您使用持久化队列来防止数据丢失,但不需要太多缓冲,则可以将queue.max_bytes
设置为较小的值。较小的值可能会产生较小的队列并提高队列性能。
示例配置
queue.type: persisted queue.max_bytes: 10mb
持久化队列故障排除编辑
持久队列问题的症状包括 Logstash 或一个或多个管道无法成功启动,并伴随类似于以下内容的错误消息。
message=>"java.io.IOException: Page file size is too small to hold elements"
此错误表示首页(目录中最旧的页面,页面 ID 最小)的大小 < 18 字节,即页面标题的大小。
要调查和解决此问题
- 通过检查日志文件或运行
pqcheck
实用程序来识别可能已损坏的队列。 - 停止 Logstash,并等待其关闭。
- 为每个损坏的队列运行
pqrepair <path>
。
pqcheck
实用程序编辑
the `pqcheck` utility to identify which persistent queue--or queues--have been corrupted.
从 LOGSTASH_HOME 运行
bin/pqcheck <queue_directory>
其中 <queue_directory>
是持久队列位置的完全限定路径。
pqcheck
实用程序会读取给定目录中的检查点文件,并输出有关这些文件当前状态的信息。该实用程序为每个检查点文件输出以下信息
- 检查点文件名
- 页面文件是否已完全确认。完全确认的页面文件表示所有事件都已读取和处理。
- 检查点文件引用的页面文件名
- 页面文件的大小。大小为 0 的页面文件会导致输出
NOT FOUND
。在这种情况下,请对指定的队列目录运行pqrepair
。 - 页码
- 第一个未确认的页码(仅与头部检查点相关)
- 页面中第一个未确认的事件序列号
- 页面中第一个事件的序列号
- 页面中的事件数
- 页面是否已完全确认
包含正常页面文件的示例
此示例表示一个包含三个页面文件的正常队列。在此示例中,Logstash 当前正在写入 checkpoint.head
引用的 page.2
。Logstash 正在从 checkpoint.0
引用的 page.0
读取。
ubuntu@bigger:/usr/share/logstash$ bin/pqcheck /var/lib/logstash/queue/main/ Using bundled JDK: /usr/share/logstash/jdk OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Checking queue dir: /var/lib/logstash/queue/main checkpoint.1, fully-acked: NO, page.1 size: 67108864 pageNum=1, firstUnackedPageNum=0, firstUnackedSeqNum=239675, minSeqNum=239675, elementCount=218241, isFullyAcked=no checkpoint.head, fully-acked: NO, page.2 size: 67108864 pageNum=2, firstUnackedPageNum=0, firstUnackedSeqNum=457916, minSeqNum=457916, elementCount=11805, isFullyAcked=no checkpoint.0, fully-acked: NO, page.0 size: 67108864 pageNum=0, firstUnackedPageNum=0, firstUnackedSeqNum=176126, minSeqNum=1, elementCount=239674, isFullyAcked=no
表示 |
|
继续 |
包含损坏页面文件的示例
如果 Logstash 未启动和/或 pqcheck
显示异常(例如页面的 NOT_FOUND
),请在队列目录上运行 pqrepair
。
bin/pqcheck /var/lib/logstash/queue/main/ Using bundled JDK: /usr/share/logstash/jdk OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Checking queue dir: /var/lib/logstash/queue/main checkpoint.head, fully-acked: NO, page.2 size: NOT FOUND pageNum=2, firstUnackedPageNum=2, firstUnackedSeqNum=534041, minSeqNum=457916, elementCount=76127, isFullyAcked=no
如果队列显示 fully-acked: YES
且大小为 0 字节,则可以安全地删除该文件。
pqrepair
实用程序编辑
pqrepair
实用程序会尝试删除损坏的队列段,以使队列恢复正常工作秩序。它从启动的目录开始搜索,并查找 data/queue/main
。
队列可能会在此操作中丢失一些数据。
从 LOGSTASH_HOME 运行
bin/pqrepair <queue_directory>
其中 <queue_directory>
是持久队列位置的完全限定路径。
如果该实用程序运行正常,则不会有任何输出。
pqrepair
实用程序需要对该目录具有写访问权限。当 Logstash 作为服务运行时,文件夹权限可能会导致问题。在这种情况下,请使用 sudo
。
/usr/share/logstash$ sudo -u logstash bin/pqrepair /var/lib/logstash/queue/main/
运行 pqrepair
后,请重新启动 Logstash 以验证修复操作是否成功。
清空队列编辑
您可能会遇到想要清空队列的情况。示例包括
- 暂停新数据摄取。在某些情况下,您可能希望停止新数据摄取,但仍保留数据积压。
- PQ 修复。您可以清空队列以路由到不同的 PQ,同时修复旧的 PQ。
- 数据或工作流迁移。如果您要从磁盘/硬件迁移和/或迁移到新的数据流,则可能需要清空现有队列。
要清空持久队列,请执行以下操作
- 在
logstash.yml
文件中,设置queue.drain: true
。 - 重新启动 Logstash 以使此设置生效。
- 关闭 Logstash(使用 CTRL+C 或 SIGTERM),并等待队列清空。
持久队列的工作原理编辑
队列位于同一进程中输入和过滤器阶段之间
输入 → 队列 → 过滤器 + 输出
当输入有准备好处理的事件时,它会将它们写入队列。当成功写入队列时,输入可以向其数据源发送确认。
在处理队列中的事件时,Logstash 仅在过滤器和输出完成后才确认事件已完成。队列会记录已由管道处理的事件。当且仅当事件已由 Logstash 管道完全处理后,才会将其记录为已处理(在本文档中称为“已确认”或“已 ACK”)。
已确认是什么意思?这意味着该事件已由所有配置的过滤器和输出处理。例如,如果您只有一个输出 Elasticsearch,则当 Elasticsearch 输出已成功将此事件发送到 Elasticsearch 时,该事件即被 ACK。
在正常关闭(CTRL+C 或 SIGTERM)期间,Logstash 会停止从队列中读取数据,并完成过滤器和输出正在处理的进行中事件的处理。重新启动后,Logstash 会恢复处理持久队列中的事件,并接受来自输入的新事件。
如果 Logstash 异常终止,则任何进行中的事件都不会被 ACK,并且在 Logstash 重新启动时将由过滤器和输出重新处理。Logstash 会批量处理事件,因此对于任何给定批次,当发生异常终止时,该批次中的一些事件可能已成功完成,但未记录为已 ACK。
如果通过设置 drain.queue: true
覆盖默认行为,则 Logstash 会从队列中读取数据,直到队列清空——即使在受控关闭之后也是如此。
有关队列写入和确认的具体行为的更多详细信息,请参阅控制持久性。
处理背压编辑
当队列已满时,Logstash 会对输入施加背压,以阻止数据流入 Logstash。此机制有助于 Logstash 控制输入阶段的数据流速率,而不会使 Elasticsearch 等输出过载。
使用 queue.max_bytes
设置来配置队列在磁盘上的总容量。以下示例将队列的总容量设置为 8gb
queue.type: persisted queue.max_bytes: 8gb
指定了这些设置后,Logstash 会将事件缓冲在磁盘上,直到队列的大小达到 8gb。当队列中充满了未 ACK 的事件,并且已达到大小限制时,Logstash 将不再接受新事件。
每个输入都独立处理背压。例如,当 beats 输入遇到背压时,它将不再接受新连接,并等待持久队列有空间接受更多事件。在过滤器和输出阶段完成对队列中现有事件的处理并对其进行 ACK 后,Logstash 会自动开始接受新事件。
控制持久性编辑
持久性是存储写入的一个属性,可确保数据在写入后可用。
启用持久队列功能后,Logstash 会将事件存储在磁盘上。Logstash 通过一种称为“检查点”的机制提交到磁盘。
队列本身是一组页面。页面有两种:头部页面和尾部页面。头部页面是写入新事件的位置。只有一个头部页面。当头部页面达到一定大小(请参阅 queue.page_capacity
)时,它将变为尾部页面,并创建一个新的头部页面。尾部页面是不可变的,而头部页面是只允许追加的。其次,队列会将有关自身(页面、确认等)的详细信息记录在一个单独的文件中,该文件称为检查点文件。
记录检查点时,Logstash 会执行以下操作
- 对头部页面调用
fsync
。 - 以原子方式将队列的当前状态写入磁盘。
检查点过程是原子的,这意味着如果成功,则会保存对文件的任何更新。
如果 Logstash 终止,或者发生硬件级故障,则持久队列中已缓冲但尚未检查点的任何数据都将丢失。
您可以通过设置 queue.checkpoint.writes
来强制 Logstash 更频繁地进行检查点。此设置指定在强制进行检查点之前可以写入磁盘的最大事件数。默认值为 1024。为了确保最大程度的持久性并避免持久队列中的数据丢失,您可以设置 queue.checkpoint.writes: 1
以在写入每个事件后强制进行检查点。请记住,磁盘写入会消耗资源。将此值设置为 1
会严重影响性能。
磁盘垃圾收集编辑
在磁盘上,队列存储为一组页面,其中每个页面都是一个文件。每个页面的最大大小为 queue.page_capacity
。在页面中的所有事件都已 ACK 后,将删除(垃圾收集)该页面。如果较旧的页面中至少有一个事件尚未 ACK,则整个页面将保留在磁盘上,直到该页面中的所有事件都已成功处理。每个包含未处理事件的页面都将计入 queue.max_bytes
字节大小。