持久队列 (PQ)
编辑持久队列 (PQ)
编辑Logstash 持久队列有助于防止在异常终止期间发生数据丢失,方法是将正在处理的消息队列存储到磁盘。
持久队列的优势
编辑持久队列 (PQ)
- 有助于防止在正常关机和 Logstash 异常终止期间的消息丢失。如果 Logstash 在事件正在处理时重新启动,Logstash 将尝试传递存储在持久队列中的消息,直到至少成功传递一次。
- 无需外部缓冲机制(如 Redis 或 Apache Kafka)即可吸收突发事件。
持久队列默认情况下是禁用的。要启用它们,请查看 配置持久队列。
持久队列的限制
编辑持久队列无法解决以下问题
- 不使用请求-响应协议的输入插件无法防止数据丢失。Tcp、udp、zeromq push+pull 和许多其他输入没有向发送方确认收到的机制。(例如 Beats 和 http 等确实具有确认功能的插件,可以通过此队列得到很好的保护。)
- 如果在提交检查点文件之前发生异常关机,则可能会丢失数据。
- 持久队列无法处理永久性机器故障,例如磁盘损坏、磁盘故障和机器丢失。持久化到磁盘的数据不会复制。
使用本地文件系统以确保数据完整性和性能。不支持网络文件系统 (NFS)。
配置持久队列
编辑要配置持久队列,请在 Logstash 设置文件 中指定选项。设置应用于每个管道。
设置容量和大小设置的值时,请记住,您设置的值是每个管道应用的,而不是所有管道共享的总值。
如果要为特定管道定义值,请使用 pipelines.yml
。
-
queue.type
- 指定
persisted
以启用持久队列。默认情况下,持久队列是禁用的(默认值:queue.type: memory
)。 -
path.queue
- 将存储数据文件的目录路径。默认情况下,文件存储在
path.data/queue
中。 -
queue.page_capacity
- 队列数据由称为“页面”的仅追加文件组成。此值设置队列页面的最大大小(以字节为单位)。对于大多数用户而言,默认大小 64mb 是一个不错的数值,更改此值不太可能带来性能优势。如果更改现有队列的页面容量,则新大小仅适用于新页面。
-
queue.drain
- 如果希望 Logstash 在关闭之前等待持久队列清空,请指定
true
。清空队列所需的时间取决于队列中累积的事件数量。因此,除非队列(即使已满)也相对较小并且可以快速清空,否则应避免使用此设置。 -
queue.max_events
- 管道工作程序尚未读取的事件的最大数量。默认为 0(无限制)。我们使用此设置进行内部测试。用户通常不应该更改此值。
-
queue.max_bytes
-
每个队列的总容量(以字节为单位)。除非在
pipelines.yml
或中央管理中被覆盖,否则每个持久队列的大小都将设置为在logstash.yml
中指定的queue.max_bytes
值。默认为 1024mb (1gb)。确保您的磁盘具有足够的容量来处理所有持久队列的
queue.max_bytes
的累积总和。所有队列的queue.max_bytes
总和应小于磁盘的容量。如果您使用持久队列来防止数据丢失,但不需要太多缓冲,则可以将
queue.max_bytes
设置为较小的值,只要它不小于queue.page_capacity
的值即可。较小的值会产生较小的队列并提高队列性能。 -
queue.checkpoint.acks
- 设置强制检查点之前的已确认事件数。默认为
1024
。设置为0
表示无限制。 -
queue.checkpoint.writes
-
设置强制检查点之前的已写入事件的最大数量。默认为
1024
。设置为0
表示无限制。为了避免丢失持久队列中的数据,您可以将
queue.checkpoint.writes: 1
设置为在写入每个事件后强制执行检查点。请记住,磁盘写入会产生资源成本。将此值设置为1
可确保最大的持久性,但会严重影响性能。请参阅 控制持久性 以更好地了解权衡。 -
queue.checkpoint.interval
- 设置在头部页面上强制执行检查点的间隔(以毫秒为单位)。默认为
1000
。设置为0
可消除定期检查点。
配置说明
编辑每个情况和环境都不同,“理想”配置也不同。如果您优化性能,则可能会增加丢失数据的风险。如果您优化数据保护,则可能会影响性能。
队列大小
编辑请参阅 处理背压,了解达到队列限制时的行为。
队列的适当大小取决于用例。作为一般指导原则,请考虑使用以下公式来确定持久队列的大小。
Bytes Received Per Second = Incoming Events Per Second * Raw Event Byte Size Bytes Received Per Hour = Bytes Received per Second * 3600s Required Queue Capacity = (Bytes Received Per Hour * Tolerated Hours of Downtime) * Multiplication Factor
按数据类型划分的队列大小
编辑Logstash 在事件存储到队列之前对其进行序列化。此过程会增加 Logstash 内事件的额外开销。此开销取决于原始事件大小
的类型和大小。因此,乘法因子
会根据您的用例而变化。这些表格显示了按事件类型划分的开销示例以及这如何影响乘法因子。
原始字符串消息
纯文本大小(字节) | 序列化的 Logstash 事件大小(字节) | 开销(字节) | 开销(%) | 乘法因子 |
---|---|---|---|---|
11 |
213 |
|
|
|
1212 |
1416 |
|
|
|
10240 |
10452 |
|
|
|
JSON 文档
JSON 文档大小(字节) | 序列化的 Logstash 事件大小(字节) | 开销(字节) | 开销(%) | 乘法因子 |
---|---|---|---|---|
947 |
1133 |
|
|
|
2707 |
3206 |
|
|
|
6751 |
7388 |
|
|
|
58901 |
59693 |
|
|
|
示例
让我们考虑一个接收 1000 EPS 的 Logstash 实例,每个事件为 1KB,每小时为 3.5GB。为了容忍下游组件不可用 12 小时而 Logstash 不对上游施加背压,持久队列的 max_bytes
必须设置为 3.6*12*1.10 = 47.25GB,约为 50GB。
较小的队列大小
编辑如果您使用持久队列来防止数据丢失,但不需要太多缓冲,则可以将 queue.max_bytes
设置为较小的值。较小的值可能会产生较小的队列并提高队列性能。
示例配置
queue.type: persisted queue.max_bytes: 10mb
持久队列故障排除
编辑持久队列问题的症状包括 Logstash 或一个或多个管道无法成功启动,并伴随类似于以下错误消息。
message=>"java.io.IOException: Page file size is too small to hold elements"
此错误表示头部页面(目录中最旧的页面,且页面 ID 最低的页面)的大小 < 18 字节,即页面标头的 size。
研究和解决问题
- 通过检查日志文件或运行
pqcheck
实用程序来识别可能已损坏的队列(或队列)。 - 停止 Logstash,然后等待它关闭。
- 对每个损坏的队列运行
pqrepair <path>
。
pqcheck
实用程序
编辑the `pqcheck` utility to identify which persistent queue--or queues--have been corrupted.
在 LOGSTASH_HOME 目录下运行:
bin/pqcheck <queue_directory>
其中 <queue_directory>
是持久化队列位置的完整路径。
pqcheck 实用程序
读取给定目录中的检查点文件,并输出有关这些文件当前状态的信息。该实用程序会为每个检查点文件输出以下信息:
- 检查点文件名
- 页面文件是否已完全确认。完全确认的页面文件表示所有事件都已读取并处理。
- 检查点文件引用的页面文件名
- 页面文件的大小。大小为 0 的页面文件将输出
NOT FOUND
。在这种情况下,请对指定的队列目录运行pqrepair
。 - 页面编号
- 第一个未确认的页面编号(仅与头部检查点相关)
- 页面中第一个未确认的事件序列号
- 页面中第一个事件序列号
- 页面中的事件数量
- 页面是否已完全确认
正常页面文件的示例
此示例表示一个包含三个页面文件的正常队列。在此示例中,Logstash 当前正在写入 page.2
(如 checkpoint.head
所引用)。Logstash 正在读取 page.0
(如 checkpoint.0
所引用)。
ubuntu@bigger:/usr/share/logstash$ bin/pqcheck /var/lib/logstash/queue/main/ Using bundled JDK: /usr/share/logstash/jdk OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Checking queue dir: /var/lib/logstash/queue/main checkpoint.1, fully-acked: NO, page.1 size: 67108864 pageNum=1, firstUnackedPageNum=0, firstUnackedSeqNum=239675, minSeqNum=239675, elementCount=218241, isFullyAcked=no checkpoint.head, fully-acked: NO, page.2 size: 67108864 pageNum=2, firstUnackedPageNum=0, firstUnackedSeqNum=457916, minSeqNum=457916, elementCount=11805, isFullyAcked=no checkpoint.0, fully-acked: NO, page.0 size: 67108864 pageNum=0, firstUnackedPageNum=0, firstUnackedSeqNum=176126, minSeqNum=1, elementCount=239674, isFullyAcked=no
表示 |
|
继续 |
损坏页面文件的示例
如果 Logstash 无法启动和/或 pqcheck
显示异常,例如页面的 NOT_FOUND
,请在队列目录上运行 pqrepair
。
bin/pqcheck /var/lib/logstash/queue/main/ Using bundled JDK: /usr/share/logstash/jdk OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Checking queue dir: /var/lib/logstash/queue/main checkpoint.head, fully-acked: NO, page.2 size: NOT FOUND pageNum=2, firstUnackedPageNum=2, firstUnackedSeqNum=534041, minSeqNum=457916, elementCount=76127, isFullyAcked=no
如果队列显示 fully-acked: YES
和 0 字节,则可以安全地删除该文件。
pqrepair 实用程序
编辑pqrepair 实用程序
尝试删除损坏的队列段,以使队列恢复工作状态。它从启动它的目录开始搜索,并查找 data/queue/main
。
此操作可能会导致队列丢失一些数据。
在 LOGSTASH_HOME 目录下运行:
bin/pqrepair <queue_directory>
其中 <queue_directory>
是持久化队列位置的完整路径。
如果实用程序运行正常,则不会输出任何内容。
pqrepair
实用程序需要对该目录的写入权限。当 Logstash 作为服务运行时,文件夹权限可能会导致问题。在这种情况下,请使用 sudo
。
/usr/share/logstash$ sudo -u logstash bin/pqrepair /var/lib/logstash/queue/main/
运行 pqrepair
后,重新启动 Logstash 以验证修复操作是否成功。
清空队列
编辑您可能会遇到需要清空队列的情况。例如:
- 暂停新的数据导入。在某些情况下,您可能希望停止新的数据导入,但仍保留积压的数据。
- PQ 修复。您可以在修复旧 PQ 的同时将数据路由到另一个 PQ 来清空队列。
- 数据或工作流程迁移。如果您要从磁盘/硬件迁移和/或迁移到新的数据流,则可能需要清空现有队列。
要清空持久化队列:
- 在
logstash.yml
文件中,设置queue.drain: true
。 - 重新启动 Logstash 以使此设置生效。
- 关闭 Logstash(使用 CTRL+C 或 SIGTERM),然后等待队列清空。
持久化队列的工作原理
编辑队列位于同一进程中的输入和过滤器阶段之间
输入 → 队列 → 过滤器 + 输出
当输入准备好要处理的事件时,它会将这些事件写入队列。当成功写入队列后,输入就可以向其数据源发送确认。
从队列处理事件时,Logstash 只有在过滤器和输出完成之后才会在队列中确认已完成的事件。队列会记录已由管道处理的事件。如果且仅当事件已由 Logstash 管道完全处理时,才会将事件记录为已处理(在此文档中称为“已确认”或“ACKed”)。
已确认是什么意思?这意味着事件已由所有配置的过滤器和输出处理。例如,如果您只有一个输出 Elasticsearch,则当 Elasticsearch 输出已成功将此事件发送到 Elasticsearch 时,该事件即被确认。
在正常关闭(CTRL+C 或 SIGTERM)期间,Logstash 会停止从队列读取数据,并完成正在由过滤器和输出处理的进行中事件的处理。重新启动后,Logstash 将继续处理持久化队列中的事件,以及接受来自输入的新事件。
如果 Logstash 异常终止,则任何进行中的事件都不会被确认,并且在 Logstash 重新启动时将由过滤器和输出重新处理。Logstash 批量处理事件,因此对于任何给定的批次,当发生异常终止时,该批次中的一些事件可能已成功完成,但未记录为已确认。
如果您通过设置 drain.queue: true
来覆盖默认行为,则 Logstash 会从队列读取数据直到队列为空——即使在受控关闭之后也是如此。
有关队列写入和确认的具体行为的更多详细信息,请参见 控制持久性。
处理反压
编辑当队列已满时,Logstash 会对输入施加反压以阻止数据流入 Logstash。此机制有助于 Logstash 控制输入阶段的数据流速率,而不会压垮 Elasticsearch 等输出。
使用 queue.max_bytes
设置来配置磁盘上队列的总容量。以下示例将队列的总容量设置为 8gb
queue.type: persisted queue.max_bytes: 8gb
指定这些设置后,Logstash 会将事件缓冲到磁盘上,直到队列大小达到 8gb。当队列中充满了未确认的事件并且已达到大小限制时,Logstash 将不再接受新事件。
每个输入都独立处理反压。例如,当 beats 输入遇到反压时,它将不再接受新连接,并等待持久化队列有空间接受更多事件。在过滤器和输出阶段完成处理队列中现有事件并确认它们后,Logstash 会自动开始接受新事件。
控制持久性
编辑持久性是存储写入的一个属性,它确保数据在写入后可用。
启用持久化队列功能后,Logstash 会将事件存储在磁盘上。Logstash 通过称为 *检查点* 的机制提交到磁盘。
队列本身是一组页面。页面有两种:头部页面和尾部页面。头部页面是写入新事件的地方。只有一个头部页面。当头部页面达到一定大小(参见 queue.page_capacity
)时,它会变成尾部页面,并创建一个新的头部页面。尾部页面是不可变的,头部页面是仅追加的。其次,队列会将有关自身的信息(页面、确认等)记录到一个名为检查点文件的单独文件中。
记录检查点时,Logstash 会:
- 对头部页面调用
fsync
。 - 以原子方式将队列的当前状态写入磁盘。
检查点过程是原子的,这意味着如果更新成功,则会保存对文件的任何更新。
如果 Logstash 终止,或者发生硬件级故障,则缓冲在持久化队列中但尚未进行检查点的任何数据都将丢失。
您可以通过设置 queue.checkpoint.writes
来强制 Logstash 更频繁地进行检查点。此设置指定在强制进行检查点之前可以写入磁盘的事件的最大数量。默认为 1024。为了确保最大的持久性并避免持久化队列中的数据丢失,您可以设置 queue.checkpoint.writes: 1
以强制在写入每个事件后进行检查点。请记住,磁盘写入会消耗资源。将此值设置为 1
会严重影响性能。
磁盘垃圾回收
编辑在磁盘上,队列存储为一组页面,其中每个页面是一个文件。每个页面的大小最多为 queue.page_capacity
。页面在该页面中的所有事件都已确认后被删除(垃圾回收)。如果较旧的页面至少有一个尚未确认的事件,则该页面将保留在磁盘上,直到该页面中的所有事件都成功处理。每个包含未处理事件的页面都将计入 queue.max_bytes
字节大小。