管道间通信
编辑管道间通信
编辑使用 Logstash 的多管道功能时,您可能希望连接同一 Logstash 实例中的多个管道。此配置有助于隔离这些管道的执行,并帮助分解复杂管道的逻辑。pipeline
输入/输出启用大量高级架构模式,这些模式将在本文档后面讨论。
如果需要在 Logstash 实例之间设置通信,请使用 Logstash 到 Logstash 通信,或使用中间队列,例如 Kafka 或 Redis。
持久队列 (PQ) 可以帮助保持数据在管道中的流动。请参阅 PQ 和管道间通信,了解 PQ 如何增强您的管道间通信策略。
配置概述
编辑使用 pipeline
输入和 pipeline
输出连接在同一 Logstash 实例中运行的两个管道。这些输入使用客户端-服务器方法,其中 pipeline
输入注册一个虚拟地址,pipeline
输出可以连接到该地址。
- 创建一个监听虚拟地址上事件的下游管道。
- 创建一个产生事件的上游管道,通过
pipeline
输出将它们发送到一个或多个虚拟地址。
这是一个此配置的简单示例。
# config/pipelines.yml - pipeline.id: upstream config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } } - pipeline.id: downstream config.string: input { pipeline { address => myVirtualAddress } }
工作原理
编辑pipeline
输入充当在本地进程中监听单个虚拟地址的虚拟服务器。只有在同一本地 Logstash 上运行的 pipeline
输出才能将事件发送到此地址。管道 输出
可以将事件发送到虚拟地址列表。pipeline
输出如果下游管道阻塞或不可用将被阻塞。
当事件跨管道发送时,它们的数据将被完全复制。下游管道中对事件的修改不会影响任何上游管道中的该事件。
pipeline
插件可能是管道之间通信最有效的方式,但它仍然会产生性能成本。Logstash 必须为每个下游管道在 Java 堆上完整复制每个事件。使用此功能可能会影响 Logstash 的堆内存利用率。
交付保证
编辑在其标准配置中,pipeline
输入/输出具有至少一次交付保证。如果地址被阻塞或不可用,输出将被阻塞。
默认情况下,pipeline
输出上的 ensure_delivery
选项设置为 true.
如果将 ensure_delivery
标志更改为 false
,则不可用的下游管道会导致丢弃发送的消息。请注意,只有在管道启动或重新加载时才认为管道不可用,而与其可能包含的任何插件被阻塞无关。阻塞的下游管道会阻塞发送输出/管道,而不管 ensure_delivery
标志的值如何。当您想要能够暂时禁用下游管道而不阻塞任何向其发送数据的上游管道时,请使用 ensure_delivery => false
。
这些交付保证也告知此功能的关闭行为。执行管道重新加载时,将立即根据用户的请求进行更改,即使这意味着删除接收来自上游管道的事件的下游管道。这将导致上游管道阻塞。您必须恢复下游管道才能干净地关闭 Logstash。您可以发出强制终止命令,但如果未为该管道启用持久队列,则可能会丢失正在进行的事件。
避免循环
编辑连接管道时,请保持数据单向流动。循环数据或将管道连接到循环图可能会导致问题。Logstash 等待每个管道的处理完成才能关闭。管道循环可能会阻止 Logstash 干净地关闭。
架构模式
编辑您可以使用 pipeline
输入和输出更好地组织代码、简化控制流并隔离复杂配置的性能。连接管道的无限种方法。此处介绍的方法提供了一些想法。
这些示例使用 config.string
来说明流程。您还可以使用配置文件进行管道间通信。
分发器模式
编辑当有多种类型的通过单个输入的数据,每种类型都有其自己的一套复杂的处理规则时,您可以使用分发器模式。使用分发器模式,一个管道用于根据类型将数据路由到其他管道。每种类型都路由到只包含处理该类型的逻辑的管道。这样,每种类型的逻辑都可以隔离。
例如,在许多组织中,单个 Beats 输入可能用于接收来自各种来源的流量,每个来源都有其自己的处理逻辑。处理此类数据的一种常见方法是使用许多 if
条件来分离流量并以不同的方式处理每种类型。当配置很长且很复杂时,这种方法很快就会变得混乱。
这是一个分发器模式配置示例。
# config/pipelines.yml - pipeline.id: beats-server config.string: | input { beats { port => 5044 } } output { if [type] == apache { pipeline { send_to => weblogs } } else if [type] == system { pipeline { send_to => syslog } } else { pipeline { send_to => fallback } } } - pipeline.id: weblog-processing config.string: | input { pipeline { address => weblogs } } filter { # Weblog filter statements here... } output { elasticsearch { hosts => [es_cluster_a_host] } } - pipeline.id: syslog-processing config.string: | input { pipeline { address => syslog } } filter { # Syslog filter statements here... } output { elasticsearch { hosts => [es_cluster_b_host] } } - pipeline.id: fallback-processing config.string: | input { pipeline { address => fallback } } output { elasticsearch { hosts => [es_cluster_b_host] } }
请注意,由于每个管道只处理一项特定任务,因此跟踪数据流非常简单。
输出隔离器模式
编辑如果多个输出之一出现暂时性故障,您可以使用输出隔离器模式来防止 Logstash 被阻塞。默认情况下,当任何单个输出关闭时,Logstash 将被阻塞。此行为对于保证数据的至少一次交付非常重要。
例如,服务器可能配置为将日志数据发送到 Elasticsearch 和 HTTP 端点。由于定期服务或其他原因,HTTP 端点可能经常不可用。在这种情况下,只要 HTTP 端点关闭,数据就会暂停发送到 Elasticsearch。
使用输出隔离器模式和持久队列,即使一个输出关闭,我们也可以继续发送到 Elasticsearch。
这是一个使用输出隔离器模式的此方案示例。
# config/pipelines.yml - pipeline.id: intake config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [es, http] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => es } } output { elasticsearch { } } - pipeline.id: buffered-http queue.type: persisted config.string: | input { pipeline { address => http } } output { http { } }
在此架构中,每个输出都有自己的队列,并具有其自己的调整和设置。请注意,此方法使用的磁盘空间最多是单个管道的两倍,并且产生的序列化/反序列化成本是单个管道的三倍。
如果下游管道(在上面的示例中为 buffered-es
和 buffered-http
)的任何持久队列已满,则两个输出都将停止。
分叉路径模式
编辑对于必须根据不同的规则集多次处理单个事件的情况,您可以使用分叉路径模式。pipeline
输入和输出可用之前,通常通过巧妙地使用 clone
过滤器和 if/else
规则来解决此需求。
让我们想象一个用例,我们接收数据并在我们自己的系统中索引完整事件,但将数据的脱敏版本发布到合作伙伴的 S3 存储桶。我们可以使用上面描述的输出隔离器模式来解耦对任一系统的写入。分叉路径模式的显著特征是下游管道中存在其他规则。
这是一个分叉路径配置示例。
# config/pipelines.yml - pipeline.id: intake queue.type: persisted config.string: | input { beats { port => 5044 } } output { pipeline { send_to => ["internal-es", "partner-s3"] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => "internal-es" } } # Index the full event output { elasticsearch { } } - pipeline.id: partner queue.type: persisted config.string: | input { pipeline { address => "partner-s3" } } filter { # Remove the sensitive data mutate { remove_field => 'sensitive-data' } } output { s3 { } } # Output to partner's bucket
收集器模式
编辑当您想定义许多不同的管道可能使用的公共输出集和预输出过滤器时,可以使用收集器模式。此模式与分发器模式相反。在此模式中,许多管道流入单个管道,在该管道中它们共享输出和处理。此模式简化了配置,但降低了隔离性,因为所有数据都通过单个管道发送。
这是一个收集器模式示例。
# config/pipelines.yml - pipeline.id: beats config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [commonOut] } } - pipeline.id: kafka config.string: | input { kafka { ... } } output { pipeline { send_to => [commonOut] } } - pipeline.id: partner # This common pipeline enforces the same logic whether data comes from Kafka or Beats config.string: | input { pipeline { address => commonOut } } filter { # Always remove sensitive data from all input sources mutate { remove_field => 'sensitive-data' } } output { elasticsearch { } }