聚合过滤器插件
编辑聚合过滤器插件编辑
- 插件版本:v2.10.0
- 发布时间:2021-10-11
- 更新日志
有关其他版本,请参阅版本化插件文档。
获取帮助编辑
如果您对该插件有任何疑问,请在论坛中打开一个主题。对于错误或功能请求,请在Github中打开一个问题。有关 Elastic 支持的插件列表,请参阅Elastic 支持矩阵。
描述编辑
此过滤器的目的是聚合属于同一任务的多个事件(通常是日志行)中可用的信息,并将聚合的信息推送到最终任务事件中。
您应该非常小心地将 Logstash 过滤器工作线程设置为 1(-w 1
标志),以使此过滤器正常工作,否则事件可能会乱序处理,并导致意外结果。
示例 #1编辑
- 使用以下给定的日志
INFO - 12345 - TASK_START - start INFO - 12345 - SQL - sqlQuery1 - 12 INFO - 12345 - SQL - sqlQuery2 - 34 INFO - 12345 - TASK_END - end
- 您可以使用此配置聚合整个任务的“sql 持续时间”
filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "TASK_START" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] = 0" map_action => "create" } } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] += event.get('duration')" map_action => "update" } } if [logger] == "TASK_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" map_action => "update" end_of_task => true timeout => 120 } } }
- 最终事件如下所示
{ "message" => "INFO - 12345 - TASK_END - end message", "sql_duration" => 46 }
添加了 sql_duration
字段,其中包含所有 sql 查询持续时间的总和。
示例 #2:没有开始事件编辑
- 如果您拥有与示例 #1 相同的日志,但没有开始日志
INFO - 12345 - SQL - sqlQuery1 - 12 INFO - 12345 - SQL - sqlQuery2 - 34 INFO - 12345 - TASK_END - end
- 您还可以使用稍微不同的配置聚合“sql 持续时间”
filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')" } } if [logger] == "TASK_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" end_of_task => true timeout => 120 } } }
- 最终事件与示例 #1 完全相同
- 关键点是“||=” ruby 运算符。它允许仅当尚未初始化 *sql_duration* 映射条目时才将其初始化为 0
示例 #3:没有结束事件编辑
第三种用例:您没有特定的结束事件。
一个典型的例子是聚合或跟踪用户行为。我们可以通过事件跟踪用户的 ID,但是一旦用户停止交互,事件就会停止传入。没有特定的事件表明用户交互的结束。
在这种情况下,我们可以启用选项 *push_map_as_event_on_timeout* 以便在发生超时时将聚合映射作为新事件推送。此外,我们可以启用 *timeout_code* 以在填充的超时事件上执行代码。我们还可以添加 *timeout_task_id_field*,以便我们可以关联 task_id,在本例中,task_id 将是用户的 ID。
- 给定以下日志
INFO - 12345 - Clicked One INFO - 12345 - Clicked Two INFO - 12345 - Clicked Three
- 您可以像这样聚合用户点击的次数
filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ] } aggregate { task_id => "%{user_id}" code => "map['clicks'] ||= 0; map['clicks'] += 1;" push_map_as_event_on_timeout => true timeout_task_id_field => "user_id" timeout => 600 # 10 minutes timeout timeout_tags => ['_aggregatetimeout'] timeout_code => "event.set('several_clicks', event.get('clicks') > 1)" } }
- 十分钟后,这将产生如下事件
{ "user_id": "12345", "clicks": 3, "several_clicks": true, "tags": [ "_aggregatetimeout" ] }
示例 #4:没有结束事件,并且任务一个接一个地到来编辑
第四种用例:与示例 #3 一样,您没有特定的结束事件,而且任务一个接一个地到来。
也就是说:任务不会交错。所有 task1 事件都来了,然后所有 task2 事件都来了,……
在这种情况下,您不想等待任务超时来刷新聚合映射。
- 一个典型的例子是从 jdbc 输入插件聚合结果。
- 假设您有以下 SQL 查询:
SELECT country_name, town_name FROM town
- 使用 jdbc 输入插件,您可以从以下位置获取这 3 个事件
{ "country_name": "France", "town_name": "Paris" } { "country_name": "France", "town_name": "Marseille" } { "country_name": "USA", "town_name": "New-York" }
- 您希望将这两个结果事件推送到 elasticsearch 中
{ "country_name": "France", "towns": [ {"town_name": "Paris"}, {"town_name": "Marseille"} ] } { "country_name": "USA", "towns": [ {"town_name": "New-York"} ] }
- 您可以使用
push_previous_map_as_event
聚合插件选项来做到这一点
filter { aggregate { task_id => "%{country_name}" code => " map['country_name'] ||= event.get('country_name') map['towns'] ||= [] map['towns'] << {'town_name' => event.get('town_name')} event.cancel() " push_previous_map_as_event => true timeout => 3 } }
- 关键点是,每次聚合插件检测到新的
country_name
时,它都会将先前的聚合映射作为新的 Logstash 事件推送,然后为下一个国家/地区创建一个新的空映射 - 当 3 秒超时到来时,最后一个聚合映射将作为新事件推送
- 初始事件(未聚合)将被删除,因为它们无用(感谢
event.cancel()
) - 最后一点:如果并非每个事件都满足某个字段(例如“town_postcode”字段),则
||=
运算符将允许您将第一个“非空”值推送到聚合映射中。示例:map['town_postcode'] ||= event.get('town_postcode')
示例 #5:没有结束事件,并尽快推送事件编辑
第五种用例:与示例 #3 一样,没有结束事件。
事件会无限期地到来,并且您希望在最后一次用户交互后尽快推送聚合映射,而无需等待 timeout
。
这允许将聚合事件更接近实时地推送。
一个典型的例子是聚合或跟踪用户行为。
我们可以通过事件跟踪用户的 ID,但是一旦用户停止交互,事件就会停止传入。
没有特定的事件表明用户交互的结束。
如果在指定的 `inactivity_timeout` 后没有收到指定用户(task_id)的事件,则用户交互将被视为已结束。
如果用户继续交互的时间超过 timeout
秒(从第一个事件开始),则聚合映射仍将在超时发生时被删除并作为新事件推送。
与示例 #3 的区别在于,事件将在用户停止交互 inactivity_timeout
秒后立即推送,而不是等待第一个事件后的 timeout
秒结束。
在这种情况下,我们可以启用选项 *push_map_as_event_on_timeout* 以便在发生非活动超时时将聚合映射作为新事件推送。
此外,我们可以启用 *timeout_code* 以在填充的超时事件上执行代码。
我们还可以添加 *timeout_task_id_field*,以便我们可以关联 task_id,在本例中,task_id 将是用户的 ID。
- 给定以下日志
INFO - 12345 - Clicked One INFO - 12345 - Clicked Two INFO - 12345 - Clicked Three
- 您可以像这样聚合用户点击的次数
filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ] } aggregate { task_id => "%{user_id}" code => "map['clicks'] ||= 0; map['clicks'] += 1;" push_map_as_event_on_timeout => true timeout_task_id_field => "user_id" timeout => 3600 # 1 hour timeout, user activity will be considered finished one hour after the first event, even if events keep coming inactivity_timeout => 300 # 5 minutes timeout, user activity will be considered finished if no new events arrive 5 minutes after the last event timeout_tags => ['_aggregatetimeout'] timeout_code => "event.set('several_clicks', event.get('clicks') > 1)" } }
- 在五分钟的不活动或第一个事件后一小时,这将产生如下事件
{ "user_id": "12345", "clicks": 3, "several_clicks": true, "tags": [ "_aggregatetimeout" ] }
工作原理编辑
- 过滤器需要一个“task_id”来关联同一任务的事件(日志行)
- 在任务开始时,过滤器会创建一个映射,并将其附加到 task_id
- 对于每个事件,您可以使用 *event* 和 *map* 执行代码(例如,将事件字段复制到映射)
- 在最终事件中,您可以执行最后一个代码(例如,将映射数据添加到最终事件)
- 在最终事件之后,附加到任务的映射将被删除(感谢
end_of_task => true
) - 聚合映射绑定到一个 task_id 值,该值绑定到一个 task_id 模式。因此,如果您有 2 个具有不同 task_id 模式的过滤器,即使您具有相同的 task_id 值,它们也不会共享相同的聚合映射。
- 在一个过滤器配置中,建议定义一个超时选项,以防止功能针对未终止的任务。它告诉过滤器删除过期的映射
- 如果未定义超时,则默认情况下,所有早于 1800 秒的映射都将自动删除
- 所有超时选项都必须在每个 task_id 模式(每个管道)仅在一个聚合过滤器中定义。超时选项包括:timeout、inactivity_timeout、timeout_code、push_map_as_event_on_timeout、push_previous_map_as_event、timeout_timestamp_field、timeout_task_id_field、timeout_tags
- 如果
code
执行引发异常,则会记录错误,并使用 *_aggregateexception* 标记事件
用例编辑
- 从任务日志中提取一些很酷的指标,并将它们推送到任务最终日志事件中(如示例 #1 和 #2 所示)
- 提取任何任务日志行中的错误信息,并将其推送到最终任务事件中(以便在有任何错误信息的情况下获取包含所有错误信息的最终事件)
- 将所有后端调用提取为列表,并将此列表推送到最终任务事件中(以获取任务配置文件)
- 提取在多行中记录的所有 http 标头,并将此列表推送到最终任务事件中(完整的 http 请求信息)
- 对于每个后端调用,收集多行中可用的调用详细信息,对其进行分析,最后标记最终后端调用日志行(错误、超时、业务警告等)
- 最后,任务 ID 可以是任何符合您需求的相关 ID:它可以是会话 ID、文件路径等
聚合过滤器配置选项编辑
此插件支持以下配置选项以及稍后描述的通用选项。
另请参阅通用选项,了解所有过滤器插件支持的选项列表。
aggregate_maps_path
编辑
- 值类型为 字符串
- 此设置没有默认值。
Logstash 停止时存储聚合映射并在 Logstash 启动时加载聚合映射的文件路径。
如果未定义,则聚合映射将不会在 Logstash 停止时存储,并且将会丢失。必须在每个管道中仅定义一个聚合过滤器(因为聚合映射在管道级别共享)。
示例
filter { aggregate { aggregate_maps_path => "/path/to/.aggregate_maps" } }
code
编辑
- 这是一个必填设置。
- 值类型为 字符串
- 此设置没有默认值。
要执行以使用当前事件更新聚合映射的代码。
或者相反,要执行以使用聚合映射更新事件的代码。
可用变量为
event
:当前 Logstash 事件
map
:与 task_id
关联的聚合映射,包含键/值对。数据结构是一个 Ruby 哈希
map_meta
:与聚合映射关联的元信息。它允许设置自定义 timeout
或 inactivity_timeout
。它还允许获取 creation_timestamp
、lastevent_timestamp
和 task_id
。
new_event_block
:用于发出新 Logstash 事件的块。有关如何使用它的信息,请参阅第二个示例。
当选项 push_map_as_event_on_timeout=true 时,如果在 code
块中设置 map_meta.timeout=0
,则聚合映射将立即作为新事件推送。
示例
filter { aggregate { code => "map['sql_duration'] += event.get('duration')" } }
要在代码执行期间创建要立即发出的其他事件,可以使用 new_event_block.call(event)
函数,如下例所示
filter { aggregate { code => " data = {:my_sql_duration => map['sql_duration']} generated_event = LogStash::Event.new(data) generated_event.set('my_other_field', 34) new_event_block.call(generated_event) " } }
函数 new_event_block.call
的参数必须是 LogStash::Event
类型。要创建这样的对象,可以使用同一类的构造函数:LogStash::Event.new()
。 LogStash::Event.new()
可以接收类型为 Ruby 哈希 的参数来初始化新事件字段。
inactivity_timeout
编辑
- 值类型为 数字
- 此设置没有默认值。
任务被视为过期之前的时间(以秒为单位)(自上次事件以来)。
当任务发生超时时,其聚合映射将被逐出。
如果 *push_map_as_event_on_timeout* 或 *push_previous_map_as_event* 设置为 true,则任务聚合映射将作为新的 Logstash 事件推送。
inactivity_timeout
可以为每个“task_id”模式定义。
inactivity_timeout
必须小于 timeout
。
map_action
编辑
- 值类型为 字符串
- 默认值为
"create_or_update"
告诉过滤器如何处理聚合映射。
"create"
:创建映射,并且仅当映射之前未创建时才执行代码
"update"
:不创建映射,并且仅当映射之前已创建时才执行代码
"create_or_update"
:如果之前未创建映射,则创建映射,在所有情况下都执行代码
push_map_as_event_on_timeout
编辑
- 值类型为 布尔值
- 默认值为
false
启用此选项后,每次检测到任务超时时,它都会将任务聚合映射作为新的 Logstash 事件推送。这使得能够在 Logstash 中检测和处理任务超时,还可以管理没有显式结束事件的任务。
push_previous_map_as_event
编辑
- 值类型为 布尔值
- 默认值为
false
启用此选项后,每次聚合插件检测到新的任务 ID 时,它都会将先前的聚合映射作为新的 Logstash 事件推送,然后为下一个任务创建一个新的空映射。
此选项仅在任务一个接一个地出现时才有效。这意味着:所有 task1 事件,然后是所有 task2 事件,等等……
task_id
编辑
- 这是一个必填设置。
- 值类型为 字符串
- 此设置没有默认值。
定义用于关联日志的任务 ID 的表达式。
此值必须唯一标识任务。
示例
filter { aggregate { task_id => "%{type}%{my_task_id}" } }
timeout
编辑
- 值类型为 数字
- 默认值为
1800
任务被视为过期之前的时间(以秒为单位)(自第一个事件以来)。
当任务发生超时时,其聚合映射将被逐出。
如果 *push_map_as_event_on_timeout* 或 *push_previous_map_as_event* 设置为 true,则任务聚合映射将作为新的 Logstash 事件推送。
可以为每个“task_id”模式定义超时。
timeout_code
编辑
- 值类型为 字符串
- 此设置没有默认值。
当 'push_map_as_event_on_timeout'
或 'push_previous_map_as_event'
设置为 true 时,要执行以完成超时生成的事件的代码。代码块将可以访问预先填充了聚合映射的新生成的超时事件。
如果设置了 'timeout_task_id_field'
,则还会使用 task_id 值填充事件
示例
filter { aggregate { timeout_code => "event.set('state', 'timeout')" } }
timeout_tags
编辑
- 值类型为 数组
- 默认值为
[]
定义在生成和产生超时事件时要添加的标签
示例
filter { aggregate { timeout_tags => ["aggregate_timeout"] } }
timeout_task_id_field
编辑
- 值类型为 字符串
- 此设置没有默认值。
此选项指示超时生成的事件的字段,将在其中设置当前“task_id”值。这有助于关联哪些任务已超时。
默认情况下,如果未设置此选项,则不会在超时生成的事件中设置任务 ID 值。
示例
filter { aggregate { timeout_task_id_field => "task_id" } }
timeout_timestamp_field
编辑
- 值类型为 字符串
- 此设置没有默认值。
默认情况下,超时是使用运行 Logstash 的系统时间计算的。
设置此选项后,将使用此选项中指示的事件时间戳字段计算超时。这意味着当第一个事件到达聚合过滤器并导致创建映射时,映射创建时间将等于此事件时间戳。然后,每次新事件到达聚合过滤器时,都会将事件时间戳与映射创建时间进行比较,以检查是否发生超时。
在使用选项 push_map_as_event_on_timeout => true
处理旧日志时,此选项特别有用。它允许根据旧日志上的超时生成聚合事件,其中系统时间不合适。
警告:为了使此选项正常工作,必须在第一个聚合过滤器上设置它。
示例
filter { aggregate { timeout_timestamp_field => "@timestamp" } }
通用选项编辑
所有过滤器插件都支持以下配置选项
设置 | 输入类型 | 必需 |
---|---|---|
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
add_field
编辑
- 值类型为 哈希
- 默认值为
{}
如果此过滤器成功,则向此事件添加任何任意字段。字段名称可以是动态的,并且可以使用 %{field}
包含事件的一部分。
示例
filter { aggregate { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" } } }
# You can also add multiple fields at once: filter { aggregate { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" "new_field" => "new_static_value" } } }
如果事件的字段 "somefield" == "hello"
,则此过滤器在成功时将添加字段 foo_hello
(如果存在),其值为上述值,并将 %{host}
部分替换为事件中的该值。第二个示例还将添加一个硬编码字段。
add_tag
编辑
- 值类型为 数组
- 默认值为
[]
如果此过滤器成功,则向事件添加任意标签。标签可以是动态的,并且可以使用 %{field}
语法包含事件的一部分。
示例
filter { aggregate { add_tag => [ "foo_%{somefield}" ] } }
# You can also add multiple tags at once: filter { aggregate { add_tag => [ "foo_%{somefield}", "taggedy_tag"] } }
如果事件的字段 "somefield" == "hello"
,则此过滤器在成功时将添加标签 foo_hello
(当然,第二个示例将添加 taggedy_tag
标签)。
id
编辑
- 值类型为 字符串
- 此设置没有默认值。
向插件配置添加唯一的 ID
。如果未指定 ID,Logstash 将生成一个 ID。强烈建议在配置中设置此 ID。当您有两个或多个相同类型的插件时,例如,如果您有 2 个聚合过滤器,这将特别有用。在这种情况下,添加命名 ID 将有助于在使用监控 API 时监控 Logstash。
filter { aggregate { id => "ABC" } }
id
字段中的变量替换仅支持环境变量,不支持使用密钥存储中的值。
remove_field
编辑
- 值类型为 数组
- 默认值为
[]
如果此过滤器成功,则从此事件中删除任意字段。字段名称可以是动态的,并且可以使用 %{field}。示例
filter { aggregate { remove_field => [ "foo_%{somefield}" ] } }
# You can also remove multiple fields at once: filter { aggregate { remove_field => [ "foo_%{somefield}", "my_extraneous_field" ] } }
如果事件具有字段 "somefield" == "hello"
,则此过滤器在成功时将删除名为 foo_hello
的字段(如果存在)。第二个示例将删除另一个非动态字段。
remove_tag
编辑
- 值类型为 数组
- 默认值为
[]
如果此过滤器成功,则从事件中删除任意标签。标签可以是动态的,并使用 %{field}
语法包含事件的部分内容。
示例
filter { aggregate { remove_tag => [ "foo_%{somefield}" ] } }
# You can also remove multiple tags at once: filter { aggregate { remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"] } }
如果事件具有字段 "somefield" == "hello"
,则此过滤器在成功时将删除标签 foo_hello
(如果存在)。第二个示例还将删除一个不需要的悲伤标签。