管道到管道通信
编辑管道到管道通信
编辑当使用 Logstash 的多管道功能时,您可能希望在同一个 Logstash 实例中连接多个管道。此配置有助于隔离这些管道的执行,并有助于分解复杂管道的逻辑。pipeline
输入/输出支持本文档后面讨论的一些高级架构模式。
如果您需要在 Logstash 实例之间建立通信,请使用 Logstash-to-Logstash 通信,或使用中间队列(如 Kafka 或 Redis)。
持久队列 (PQ) 可以帮助数据在管道中流动。请参阅 PQ 和管道到管道通信,了解 PQ 如何增强您的管道到管道通信策略。
配置概述
编辑使用 pipeline
输入和 pipeline
输出连接在同一 Logstash 实例中运行的两个管道。这些输入使用客户端-服务器方法,其中 pipeline
输入注册一个虚拟地址,pipeline
输出可以连接到该地址。
- 创建一个下游管道,该管道侦听虚拟地址上的事件。
- 创建一个上游管道,该管道生成事件,并通过
pipeline
输出将其发送到一个或多个虚拟地址。
这是一个简单的配置示例。
# config/pipelines.yml - pipeline.id: upstream config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } } - pipeline.id: downstream config.string: input { pipeline { address => myVirtualAddress } }
工作原理
编辑pipeline
输入充当一个虚拟服务器,侦听本地进程中的单个虚拟地址。只有在同一本地 Logstash 上运行的 pipeline
输出才能将事件发送到此地址。管道 输出
可以将事件发送到虚拟地址列表。如果下游管道被阻止或不可用,pipeline
输出将被阻止。
当事件跨管道发送时,其数据将被完全复制。对下游管道中事件的修改不会影响任何上游管道中的该事件。
pipeline
插件可能是管道之间通信的最有效方式,但它仍然会产生性能成本。Logstash 必须在 Java 堆上为每个下游管道完整复制每个事件。使用此功能可能会影响 Logstash 的堆内存利用率。
交付保证
编辑在其标准配置中,pipeline
输入/输出具有至少一次交付保证。如果地址被阻止或不可用,输出将被阻止。
默认情况下,pipeline
输出上的 ensure_delivery
选项设置为 true
。如果将 ensure_delivery
标志更改为 false
,则不可用的下游管道会导致发送的消息被丢弃。请注意,只有当管道正在启动或重新加载时才会被视为不可用,而不是当它可能包含的任何插件被阻止时。一个被阻止的下游管道会阻止发送输出/管道,无论 ensure_delivery
标志的值如何。当您希望在不阻止任何向上游管道发送的情况下临时禁用下游管道时,请使用 ensure_delivery => false
。
这些交付保证也说明了此功能的关闭行为。在执行管道重新加载时,更改将根据用户请求立即进行,即使这意味着删除从上游管道接收事件的下游管道也是如此。这将导致上游管道被阻止。您必须恢复下游管道才能干净地关闭 Logstash。您可以发出强制终止,但除非为该管道启用了持久队列,否则正在进行的事件可能会丢失。
避免循环
编辑连接管道时,请保持数据单向流动。循环数据或将管道连接到循环图中可能会导致问题。Logstash 会等待每个管道的工作完成后再关闭。管道循环可能会阻止 Logstash 干净地关闭。
架构模式
编辑您可以使用 pipeline
输入和输出更好地组织代码、简化控制流并隔离复杂配置的性能。连接管道的方式有很多种。这里介绍的方式提供了一些想法。
这些示例使用 config.string
来说明流程。您还可以将配置文件用于管道到管道通信。
分配器模式
编辑当单个输入中有多种类型的数据进入,每种类型都有其自己的一组复杂处理规则时,可以使用分配器模式。使用分配器模式,一个管道用于根据类型将数据路由到其他管道。每种类型都将路由到仅具有处理该类型的逻辑的管道。通过这种方式,可以隔离每种类型的逻辑。
例如,在许多组织中,可以使用单个 Beats 输入来接收来自各种来源的流量,每个来源都有其自己的处理逻辑。处理此类数据的常用方法是使用一些 if
条件来分隔流量并以不同的方式处理每种类型。当配置又长又复杂时,这种方法可能会很快变得混乱。
这是一个分配器模式配置示例。
# config/pipelines.yml - pipeline.id: beats-server config.string: | input { beats { port => 5044 } } output { if [type] == apache { pipeline { send_to => weblogs } } else if [type] == system { pipeline { send_to => syslog } } else { pipeline { send_to => fallback } } } - pipeline.id: weblog-processing config.string: | input { pipeline { address => weblogs } } filter { # Weblog filter statements here... } output { elasticsearch { hosts => [es_cluster_a_host] } } - pipeline.id: syslog-processing config.string: | input { pipeline { address => syslog } } filter { # Syslog filter statements here... } output { elasticsearch { hosts => [es_cluster_b_host] } } - pipeline.id: fallback-processing config.string: | input { pipeline { address => fallback } } output { elasticsearch { hosts => [es_cluster_b_host] } }
请注意,由于每个管道仅处理一个特定的任务,因此数据流的跟踪非常简单。
输出隔离器模式
编辑如果多个输出之一发生临时故障,可以使用输出隔离器模式来防止 Logstash 被阻止。默认情况下,当任何单个输出关闭时,Logstash 会被阻止。此行为对于保证至少一次交付数据非常重要。
例如,服务器可能被配置为将日志数据发送到 Elasticsearch 和 HTTP 端点。由于定期服务或其他原因,HTTP 端点可能经常不可用。在这种情况下,当 HTTP 端点关闭时,数据将暂停发送到 Elasticsearch。
使用输出隔离器模式和持久队列,即使一个输出关闭,我们也可以继续发送到 Elasticsearch。
以下是使用输出隔离器模式的此场景示例。
# config/pipelines.yml - pipeline.id: intake config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [es, http] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => es } } output { elasticsearch { } } - pipeline.id: buffered-http queue.type: persisted config.string: | input { pipeline { address => http } } output { http { } }
在此架构中,每个输出都有自己的队列,具有自己的调整和设置。请注意,此方法使用的磁盘空间最多是单个管道的两倍,并且产生的序列化/反序列化成本是单个管道的三倍。
如果下游管道(在上面的示例中为 buffered-es
和 buffered-http
)的任何持久队列已满,则两个输出都将停止。
分叉路径模式
编辑对于必须根据不同的规则集多次处理单个事件的情况,可以使用分叉路径模式。在 pipeline
输入和输出可用之前,这种需求通常通过创造性地使用 clone
过滤器和 if/else
规则来解决。
让我们想象一个用例,我们在其中接收数据并在我们自己的系统中索引完整事件,但将数据的修订版本发布到合作伙伴的 S3 存储桶。我们可能会使用上面描述的输出隔离器模式来分离我们对任一系统的写入。分叉路径模式的突出特点是下游管道中存在其他规则。
以下是分叉路径配置示例。
# config/pipelines.yml - pipeline.id: intake queue.type: persisted config.string: | input { beats { port => 5044 } } output { pipeline { send_to => ["internal-es", "partner-s3"] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => "internal-es" } } # Index the full event output { elasticsearch { } } - pipeline.id: partner queue.type: persisted config.string: | input { pipeline { address => "partner-s3" } } filter { # Remove the sensitive data mutate { remove_field => 'sensitive-data' } } output { s3 { } } # Output to partner's bucket
收集器模式
编辑当您希望定义许多不同的管道可能使用的一组通用输出和预输出过滤器时,可以使用收集器模式。此模式与分配器模式相反。在此模式中,许多管道流入单个管道,在其中它们共享输出和处理。此模式简化了配置,但代价是降低了隔离性,因为所有数据都通过单个管道发送。
以下是收集器模式的示例。
# config/pipelines.yml - pipeline.id: beats config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [commonOut] } } - pipeline.id: kafka config.string: | input { kafka { ... } } output { pipeline { send_to => [commonOut] } } - pipeline.id: partner # This common pipeline enforces the same logic whether data comes from Kafka or Beats config.string: | input { pipeline { address => commonOut } } filter { # Always remove sensitive data from all input sources mutate { remove_field => 'sensitive-data' } } output { elasticsearch { } }