管道间通信
编辑管道间通信编辑
在使用 Logstash 的多管道功能时,您可能希望在同一个 Logstash 实例中连接多个管道。此配置对于隔离这些管道的执行以及帮助分解复杂管道的逻辑非常有用。 pipeline
输入/输出支持本文档后面讨论的一些高级架构模式。
如果您需要在 Logstash 实例之间设置通信,请使用 Logstash 到 Logstash 通信,或使用中间队列,例如 Kafka 或 Redis。
持久队列 (PQ) 可以帮助保持数据在管道中流动。请参阅 PQ 和管道间通信,了解 PQ 如何增强您的管道间通信策略。
配置概述编辑
使用 pipeline
输入和 pipeline
输出连接在同一个 Logstash 实例中运行的两个管道。这些输入使用客户端-服务器方法,其中 pipeline
输入注册一个虚拟地址,pipeline
输出可以连接到该地址。
- 创建一个下游管道,监听虚拟地址上的事件。
- 创建一个上游管道,生成事件,并通过
pipeline
输出将它们发送到一个或多个虚拟地址。
以下是一个简单配置示例。
# config/pipelines.yml - pipeline.id: upstream config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } } - pipeline.id: downstream config.string: input { pipeline { address => myVirtualAddress } }
工作原理编辑
pipeline
输入充当虚拟服务器,监听本地进程中的单个虚拟地址。只有在同一个本地 Logstash 上运行的 pipeline
输出才能将事件发送到此地址。管道 outputs
可以将事件发送到虚拟地址列表。如果下游管道被阻塞或不可用,pipeline
输出将被阻塞。
当事件跨管道发送时,它们的数据将被完全复制。对下游管道中事件的修改不会影响任何上游管道中的该事件。
pipeline
插件可能是管道间通信最有效的方式,但它仍然会产生性能成本。Logstash 必须在 Java 堆中为每个下游管道完全复制每个事件。使用此功能可能会影响 Logstash 的堆内存使用情况。
交付保证编辑
在标准配置中,pipeline
输入/输出具有至少一次交付保证。如果地址被阻塞或不可用,输出将被阻塞。
默认情况下,pipeline
输出上的 ensure_delivery
选项设置为 true.
如果您将 ensure_delivery
标志更改为 false
,则不可用的下游管道会导致发送的消息被丢弃。请注意,只有在下游管道启动或重新加载时,才认为管道不可用,而不是当它可能包含的任何插件被阻塞时。阻塞的下游管道会阻塞发送输出/管道,而不管 ensure_delivery
标志的值如何。当您希望能够暂时禁用下游管道而不会阻塞任何发送到它的上游管道时,请使用 ensure_delivery => false
。
这些交付保证还告知此功能的关闭行为。在执行管道重新加载时,更改将立即按照用户的请求进行,即使这意味着删除接收来自上游管道事件的下游管道。这将导致上游管道被阻塞。您必须恢复下游管道才能干净地关闭 Logstash。您可以发出强制终止命令,但除非为该管道启用了持久队列,否则正在传输的事件可能会丢失。
避免循环编辑
连接管道时,请保持数据单向流动。循环数据或将管道连接成循环图可能会导致问题。Logstash 等待每个管道的作业完成,然后才关闭。管道循环可能会阻止 Logstash 正常关闭。
架构模式编辑
您可以使用 pipeline
输入和输出更好地组织代码,简化控制流,并隔离复杂配置的性能。连接管道的方案数不胜数。这里介绍的方案提供了一些思路。
这些示例使用 config.string
来说明流程。您也可以使用配置文件进行管道间通信。
分发器模式编辑
您可以在有多种类型的数据通过单个输入的情况中使用分发器模式,每种类型都有自己的一组复杂的处理规则。使用分发器模式,一个管道用于根据类型将数据路由到其他管道。每种类型都路由到一个仅包含处理该类型的逻辑的管道。这样,每种类型的逻辑都可以被隔离。
例如,在许多组织中,单个 Beats 输入可能用于接收来自各种来源的流量,每个来源都有自己的处理逻辑。处理此类数据的一种常见方法是使用多个 if
条件来分离流量,并以不同的方式处理每种类型。当配置很长且很复杂时,这种方法很快就会变得混乱。
以下是一个分发器模式配置示例。
# config/pipelines.yml - pipeline.id: beats-server config.string: | input { beats { port => 5044 } } output { if [type] == apache { pipeline { send_to => weblogs } } else if [type] == system { pipeline { send_to => syslog } } else { pipeline { send_to => fallback } } } - pipeline.id: weblog-processing config.string: | input { pipeline { address => weblogs } } filter { # Weblog filter statements here... } output { elasticsearch { hosts => [es_cluster_a_host] } } - pipeline.id: syslog-processing config.string: | input { pipeline { address => syslog } } filter { # Syslog filter statements here... } output { elasticsearch { hosts => [es_cluster_b_host] } } - pipeline.id: fallback-processing config.string: | input { pipeline { address => fallback } } output { elasticsearch { hosts => [es_cluster_b_host] } }
请注意,由于每个管道只处理一项特定任务,因此跟踪数据流非常简单。
输出隔离器模式编辑
您可以使用输出隔离器模式来防止 Logstash 在多个输出中的一个出现暂时故障时被阻塞。默认情况下,Logstash 在任何单个输出关闭时都会被阻塞。此行为对于保证至少一次数据交付非常重要。
例如,服务器可能被配置为将日志数据发送到 Elasticsearch 和 HTTP 端点。HTTP 端点可能由于定期服务或其他原因而经常不可用。在这种情况下,数据将暂停发送到 Elasticsearch,直到 HTTP 端点恢复正常。
使用输出隔离器模式和持久队列,我们可以在一个输出关闭时继续发送到 Elasticsearch。
以下是如何使用输出隔离器模式来实现此场景的示例。
# config/pipelines.yml - pipeline.id: intake config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [es, http] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => es } } output { elasticsearch { } } - pipeline.id: buffered-http queue.type: persisted config.string: | input { pipeline { address => http } } output { http { } }
在此架构中,每个输出都有自己的队列,并具有自己的调整和设置。请注意,这种方法使用最多两倍的磁盘空间,并产生三倍于单个管道的序列化/反序列化成本。
如果下游管道的任何持久队列(在上面的示例中,buffered-es
和 buffered-http
)已满,则两个输出都将停止。
分叉路径模式编辑
您可以在单个事件必须根据不同的规则集进行多次处理的情况下使用分叉路径模式。在 pipeline
输入和输出可用之前,这种需求通常通过巧妙地使用 clone
过滤器和 if/else
规则来解决。
假设我们接收数据,并将完整事件索引到我们自己的系统中,但将数据的脱敏版本发布到合作伙伴的 S3 存储桶中。我们可能会使用上面描述的输出隔离器模式来解耦对任一系统的写入。分叉路径模式的显著特点是下游管道中存在额外的规则。
以下是一个分叉路径配置示例。
# config/pipelines.yml - pipeline.id: intake queue.type: persisted config.string: | input { beats { port => 5044 } } output { pipeline { send_to => ["internal-es", "partner-s3"] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => "internal-es" } } # Index the full event output { elasticsearch { } } - pipeline.id: partner queue.type: persisted config.string: | input { pipeline { address => "partner-s3" } } filter { # Remove the sensitive data mutate { remove_field => 'sensitive-data' } } output { s3 { } } # Output to partner's bucket
收集器模式编辑
当您希望定义一组通用的输出和预输出过滤器,许多不同的管道可以使用这些输出和过滤器时,可以使用收集器模式。这种模式与分发器模式相反。在此模式中,许多管道流入单个管道,在该管道中它们共享输出和处理。这种模式简化了配置,但代价是降低了隔离性,因为所有数据都通过单个管道发送。
以下是一个收集器模式示例。
# config/pipelines.yml - pipeline.id: beats config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [commonOut] } } - pipeline.id: kafka config.string: | input { kafka { ... } } output { pipeline { send_to => [commonOut] } } - pipeline.id: partner # This common pipeline enforces the same logic whether data comes from Kafka or Beats config.string: | input { pipeline { address => commonOut } } filter { # Always remove sensitive data from all input sources mutate { remove_field => 'sensitive-data' } } output { elasticsearch { } }