聚合过滤器插件

编辑
  • 插件版本:v2.10.0
  • 发布日期:2021-10-11
  • 更新日志

对于其他版本,请参阅版本化插件文档

获取帮助

编辑

有关插件的问题,请在Discuss论坛中开一个主题。对于错误或功能请求,请在Github中打开一个 issue。有关 Elastic 支持的插件列表,请参阅Elastic 支持矩阵

描述

编辑

此过滤器的目的是聚合属于同一任务的多个事件(通常是日志行)中可用的信息,并将聚合的信息推送到最终任务事件中。

您应该非常小心地将 Logstash 过滤器工作线程设置为 1(-w 1 标志),以便此过滤器正常工作,否则事件可能会乱序处理,并出现意外结果。

示例 #1

编辑
  • 对于以下给定的日志
 INFO - 12345 - TASK_START - start
 INFO - 12345 - SQL - sqlQuery1 - 12
 INFO - 12345 - SQL - sqlQuery2 - 34
 INFO - 12345 - TASK_END - end
  • 您可以使用此配置来聚合整个任务的 “sql duration”
 filter {
   grok {
     match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ]
   }

   if [logger] == "TASK_START" {
     aggregate {
       task_id => "%{taskid}"
       code => "map['sql_duration'] = 0"
       map_action => "create"
     }
   }

   if [logger] == "SQL" {
     aggregate {
       task_id => "%{taskid}"
       code => "map['sql_duration'] += event.get('duration')"
       map_action => "update"
     }
   }

   if [logger] == "TASK_END" {
     aggregate {
       task_id => "%{taskid}"
       code => "event.set('sql_duration', map['sql_duration'])"
       map_action => "update"
       end_of_task => true
       timeout => 120
     }
   }
 }
  • 最终事件如下所示
{
  "message" => "INFO - 12345 - TASK_END - end message",
  "sql_duration" => 46
}

添加了字段 sql_duration,其中包含所有 SQL 查询持续时间的总和。

示例 #2:无开始事件

编辑
  • 如果您有与示例 #1 相同的日志,但没有开始日志
 INFO - 12345 - SQL - sqlQuery1 - 12
 INFO - 12345 - SQL - sqlQuery2 - 34
 INFO - 12345 - TASK_END - end
  • 您还可以使用稍微不同的配置来聚合 “sql duration”
 filter {
   grok {
     match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ]
   }

   if [logger] == "SQL" {
     aggregate {
       task_id => "%{taskid}"
       code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')"
     }
   }

   if [logger] == "TASK_END" {
     aggregate {
       task_id => "%{taskid}"
       code => "event.set('sql_duration', map['sql_duration'])"
       end_of_task => true
       timeout => 120
     }
   }
 }
  • 最终事件与示例 #1 完全相同
  • 关键是 “||=” Ruby 运算符。它允许仅当此映射条目尚未初始化时才将 _sql_duration_ 映射条目初始化为 0

示例 #3:无结束事件

编辑

第三种用例:您没有特定的结束事件。

一个典型的案例是聚合或跟踪用户行为。我们可以通过事件按用户的 ID 跟踪用户,但是一旦用户停止交互,事件就会停止进入。没有特定的事件指示用户交互的结束。

在这种情况下,我们可以启用选项 _push_map_as_event_on_timeout_ 以便在发生超时时将聚合映射作为新事件推送。此外,我们可以启用 _timeout_code_ 以便在填充的超时事件上执行代码。我们还可以添加 _timeout_task_id_field_,以便我们可以关联 task_id,在本例中,它是用户的 ID。

  • 对于以下日志
INFO - 12345 - Clicked One
INFO - 12345 - Clicked Two
INFO - 12345 - Clicked Three
  • 您可以像这样聚合用户点击的次数
filter {
  grok {
    match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ]
  }

  aggregate {
    task_id => "%{user_id}"
    code => "map['clicks'] ||= 0; map['clicks'] += 1;"
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "user_id"
    timeout => 600 # 10 minutes timeout
    timeout_tags => ['_aggregatetimeout']
    timeout_code => "event.set('several_clicks', event.get('clicks') > 1)"
  }
}
  • 十分钟后,这将产生一个如下所示的事件
{
  "user_id": "12345",
  "clicks": 3,
  "several_clicks": true,
    "tags": [
       "_aggregatetimeout"
    ]
}

示例 #4:无结束事件,任务一个接一个地出现

编辑

第四种用例:与示例 #3 类似,您没有特定的结束事件,而且任务一个接一个地出现。

也就是说:任务不会交错。所有 task1 事件都会到来,然后所有 task2 事件都会到来,……

在这种情况下,您不需要等待任务超时来刷新聚合映射。

  • 一个典型的案例是聚合来自 JDBC 输入插件的结果。
  • 假设您有以下 SQL 查询:SELECT country_name, town_name FROM town
  • 使用 JDBC 输入插件,您从以下位置获得这 3 个事件
  { "country_name": "France", "town_name": "Paris" }
  { "country_name": "France", "town_name": "Marseille" }
  { "country_name": "USA", "town_name": "New-York" }
  • 并且您希望以下 2 个结果事件将它们推送到 Elasticsearch
  { "country_name": "France", "towns": [ {"town_name": "Paris"}, {"town_name": "Marseille"} ] }
  { "country_name": "USA", "towns": [ {"town_name": "New-York"} ] }
  • 您可以使用 push_previous_map_as_event 聚合插件选项来执行此操作
   filter {
     aggregate {
       task_id => "%{country_name}"
       code => "
         map['country_name'] ||= event.get('country_name')
         map['towns'] ||= []
         map['towns'] << {'town_name' => event.get('town_name')}
         event.cancel()
       "
       push_previous_map_as_event => true
       timeout => 3
     }
   }
  • 关键是每次聚合插件检测到新的 country_name 时,它会将之前的聚合映射作为新的 Logstash 事件推送,然后为下一个国家/地区创建新的空映射
  • 当 3 秒超时到来时,最后一个聚合映射将作为新事件推送
  • 初始事件(未聚合)由于无用而被删除(感谢 event.cancel()
  • 最后一点:如果并非每个事件都填充了某个字段(例如 “town_postcode” 字段),||= 运算符将允许您将第一个“非 null”值推送到聚合映射中。示例:map['town_postcode'] ||= event.get('town_postcode')

示例 #5:无结束事件,并尽快推送事件

编辑

第五种用例:与示例 #3 类似,没有结束事件。

事件会无限期地持续到达,并且您希望在上次用户交互后尽快推送聚合映射,而无需等待 timeout

这允许将聚合事件更接近实时地推送。

一个典型的案例是聚合或跟踪用户行为。

我们可以通过事件按用户的 ID 跟踪用户,但是一旦用户停止交互,事件就会停止进入。

没有特定的事件指示用户交互的结束。

当指定用户(task_id)在指定的 inactivity_timeout` 之后没有收到事件时,将认为用户交互已结束。

如果用户持续交互的时间超过 timeout 秒(自第一个事件起),则当超时发生时,聚合映射仍然会被删除并作为新事件推送。

与示例 #3 的不同之处在于,事件将在用户停止交互 inactivity_timeout 秒后立即推送,而不是等待自第一个事件起 timeout 秒结束。

在这种情况下,我们可以启用选项 _push_map_as_event_on_timeout_,以便在不活动超时发生时将聚合映射作为新事件推送。

此外,我们可以启用 _timeout_code_ 以便在填充的超时事件上执行代码。

我们还可以添加 _timeout_task_id_field_,以便我们可以关联 task_id,在本例中,它是用户的 ID。

  • 对于以下日志
INFO - 12345 - Clicked One
INFO - 12345 - Clicked Two
INFO - 12345 - Clicked Three
  • 您可以像这样聚合用户点击的次数
filter {
 grok {
   match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ]
 }
 aggregate {
   task_id => "%{user_id}"
   code => "map['clicks'] ||= 0; map['clicks'] += 1;"
   push_map_as_event_on_timeout => true
   timeout_task_id_field => "user_id"
   timeout => 3600 # 1 hour timeout, user activity will be considered finished one hour after the first event, even if events keep coming
   inactivity_timeout => 300 # 5 minutes timeout, user activity will be considered finished if no new events arrive 5 minutes after the last event
   timeout_tags => ['_aggregatetimeout']
   timeout_code => "event.set('several_clicks', event.get('clicks') > 1)"
 }
}
  • 不活动五分钟或自第一个事件起一小时后,这将产生一个如下所示的事件
{
 "user_id": "12345",
 "clicks": 3,
 "several_clicks": true,
   "tags": [
      "_aggregatetimeout"
   ]
}

工作原理

编辑
  • 过滤器需要一个 “task_id” 来关联同一任务的事件(日志行)
  • 在任务开始时,过滤器会创建一个映射,附加到 task_id
  • 对于每个事件,您可以使用 _event_ 和 _map_ 执行代码(例如,将事件字段复制到映射)
  • 在最终事件中,您可以执行最后一段代码(例如,将映射数据添加到最终事件)
  • 在最终事件之后,与任务关联的映射将被删除(感谢 end_of_task => true
  • 一个聚合映射绑定到一个 task_id 值,该值绑定到一个 task_id 模式。因此,如果您有 2 个具有不同 task_id 模式的过滤器,即使您具有相同的 task_id 值,它们也不会共享相同的聚合映射。
  • 在一个过滤器配置中,建议定义一个超时选项来保护功能免受未终止任务的影响。它告诉过滤器删除过期的映射
  • 如果未定义超时,则默认情况下,所有超过 1800 秒的映射都会自动删除
  • 所有超时选项都必须在每个 task_id 模式(每个管道)的唯一聚合过滤器中定义。超时选项包括:timeout、inactivity_timeout、timeout_code、push_map_as_event_on_timeout、push_previous_map_as_event、timeout_timestamp_field、timeout_task_id_field、timeout_tags
  • 如果 code 执行引发异常,则会记录错误,并将事件标记为 _aggregateexception_

用例

编辑
  • 从任务日志中提取一些很酷的指标,并将它们推送到任务最终日志事件中(如示例 #1 和 #2 中所示)
  • 在任何任务日志行中提取错误信息,并将其推送到最终任务事件中(以获取包含所有错误信息的最终事件(如果有))
  • 提取所有后端调用作为列表,并将此列表推送到最终任务事件中(以获取任务配置文件)
  • 提取在多个行中记录的所有 HTTP 标头,以将此列表推送到最终任务事件中(完整的 HTTP 请求信息)
  • 对于每个后端调用,收集多个行上可用的调用详细信息,分析它,并最终标记最终后端调用日志行(错误、超时、业务警告等)
  • 最后,任务 ID 可以是匹配您需要的任何关联 ID:它可以是会话 ID、文件路径等。

聚合过滤器配置选项

编辑

此插件支持以下配置选项以及稍后描述的通用选项

另请参阅通用选项,以获取所有过滤器插件支持的选项列表。

 

aggregate_maps_path

编辑
  • 值类型是字符串
  • 此设置没有默认值。

Logstash 停止时存储聚合映射并在 Logstash 启动时从中加载的文件的路径。

如果未定义,则聚合映射不会在 Logstash 停止时存储,并且会丢失。必须在每个管道的唯一聚合过滤器中定义(因为聚合映射在管道级别共享)。

示例

    filter {
      aggregate {
        aggregate_maps_path => "/path/to/.aggregate_maps"
      }
    }

code

编辑
  • 这是一个必需的设置。
  • 值类型是字符串
  • 此设置没有默认值。

使用当前事件执行以更新聚合映射的代码。

或者相反,执行使用聚合映射更新事件的代码。

可用变量包括

event:当前 Logstash 事件

map:与 task_id 关联的聚合映射,包含键/值对。数据结构是 Ruby Hash

map_meta:与聚合映射关联的元信息。它允许设置自定义的 timeoutinactivity_timeout。它还允许获取 creation_timestamplastevent_timestamptask_id

new_event_block:用于发出新的 Logstash 事件的块。请参阅第二个示例,了解如何使用它。

当选项 push_map_as_event_on_timeout=true 时,如果在 code 块中设置 map_meta.timeout=0,则聚合映射会立即作为新事件推送。

示例

    filter {
      aggregate {
        code => "map['sql_duration'] += event.get('duration')"
      }
    }

要在代码执行期间创建要立即发出的其他事件,可以使用 new_event_block.call(event) 函数,如以下示例所示

    filter {
      aggregate {
        code => "
            data = {:my_sql_duration => map['sql_duration']}
            generated_event = LogStash::Event.new(data)
            generated_event.set('my_other_field', 34)
            new_event_block.call(generated_event)
        "
      }
    }

函数 new_event_block.call 的参数必须是 LogStash::Event 类型。要创建这样的对象,可以使用同一类的构造函数:LogStash::Event.new()LogStash::Event.new() 可以接收一个 ruby Hash 类型的参数来初始化新的事件字段。

end_of_task

编辑
  • 值类型是 boolean
  • 默认值是 false

告知过滤器任务已结束,因此在代码执行后删除聚合映射。

inactivity_timeout

编辑
  • 值类型是 number
  • 此设置没有默认值。

自上次事件起,任务被视为过期的秒数。

当任务超时时,其聚合映射将被清除。

如果 push_map_as_event_on_timeoutpush_previous_map_as_event 设置为 true,则任务聚合映射将作为新的 Logstash 事件推送。

inactivity_timeout 可以为每个 “task_id” 模式定义。

inactivity_timeout 必须小于 timeout

map_action

编辑
  • 值类型是字符串
  • 默认值是 "create_or_update"

告知过滤器如何处理聚合映射。

"create":创建映射,并且仅当之前未创建映射时才执行代码

"update":不创建映射,并且仅当之前已创建映射时才执行代码

"create_or_update":如果之前未创建映射则创建映射,在所有情况下都执行代码

push_map_as_event_on_timeout

编辑
  • 值类型是 boolean
  • 默认值是 false

启用此选项后,每次检测到任务超时时,它会将任务聚合映射作为新的 Logstash 事件推送。这使得可以在 Logstash 中检测和处理任务超时,还可以管理没有明确结束事件的任务。

push_previous_map_as_event

编辑
  • 值类型是 boolean
  • 默认值是 false

启用此选项后,每次聚合插件检测到新的任务 ID 时,它都会将之前的聚合映射作为新的 Logstash 事件推送,然后为下一个任务创建一个新的空映射。

此选项仅在任务一个接一个地出现时才能正常工作。这意味着:所有 task1 事件,然后所有 task2 事件,等等…

task_id

编辑
  • 这是一个必需的设置。
  • 值类型是字符串
  • 此设置没有默认值。

定义用于关联日志的任务 ID 的表达式。

此值必须唯一标识任务。

示例

    filter {
      aggregate {
        task_id => "%{type}%{my_task_id}"
      }
    }

timeout

编辑
  • 值类型是 number
  • 默认值是 1800

自第一个事件起,任务被视为过期的秒数。

当任务超时时,其聚合映射将被清除。

如果 push_map_as_event_on_timeoutpush_previous_map_as_event 设置为 true,则任务聚合映射将作为新的 Logstash 事件推送。

可以为每个 “task_id” 模式定义超时。

timeout_code

编辑
  • 值类型是字符串
  • 此设置没有默认值。

'push_map_as_event_on_timeout''push_previous_map_as_event' 设置为 true 时,执行此代码以完成生成的超时事件。代码块将可以访问预先填充了聚合映射的新生成的超时事件。

如果设置了 'timeout_task_id_field',则该事件也会填充 task_id 值

示例

    filter {
      aggregate {
        timeout_code => "event.set('state', 'timeout')"
      }
    }

timeout_tags

编辑
  • 值类型是 array
  • 默认值是 []

定义生成超时事件并产生时要添加的标签

示例

    filter {
      aggregate {
        timeout_tags => ["aggregate_timeout"]
      }
    }

timeout_task_id_field

编辑
  • 值类型是字符串
  • 此设置没有默认值。

此选项指示将在其中设置当前 “task_id” 值的超时生成事件的字段。这可以帮助关联哪些任务已超时。

默认情况下,如果未设置此选项,则不会将任务 id 值设置到超时生成事件中。

示例

    filter {
      aggregate {
        timeout_task_id_field => "task_id"
      }
    }

timeout_timestamp_field

编辑
  • 值类型是字符串
  • 此设置没有默认值。

默认情况下,超时是使用 Logstash 正在运行的系统时间计算的。

当设置此选项时,超时是使用此选项中指示的事件时间戳字段计算的。这意味着当第一个事件到达聚合过滤器并导致映射创建时,映射创建时间将等于此事件时间戳。然后,每次新事件到达聚合过滤器时,都会将事件时间戳与映射创建时间进行比较,以检查是否发生超时。

当使用选项 push_map_as_event_on_timeout => true 处理旧日志时,此选项特别有用。它允许基于旧日志的超时生成聚合事件,其中系统时间不合适。

警告:为了使此选项正常工作,必须将其设置在第一个聚合过滤器上。

示例

    filter {
      aggregate {
        timeout_timestamp_field => "@timestamp"
      }
    }

常用选项

编辑

所有过滤器插件都支持这些配置选项

add_field

编辑
  • 值类型是 hash
  • 默认值是 {}

如果此过滤器成功,则将任何任意字段添加到此事件。字段名称可以是动态的,并且可以使用 %{field} 包括事件的一部分。

示例

    filter {
      aggregate {
        add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
      }
    }
    # You can also add multiple fields at once:
    filter {
      aggregate {
        add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时,如果存在,将添加字段 foo_hello,其值为以上值,并且将 %{host} 部分替换为事件中的该值。第二个示例还将添加一个硬编码字段。

add_tag

编辑
  • 值类型是 array
  • 默认值是 []

如果此过滤器成功,则将任意标签添加到事件。标签可以是动态的,并且可以使用 %{field} 语法包含事件的一部分。

示例

    filter {
      aggregate {
        add_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also add multiple tags at once:
    filter {
      aggregate {
        add_tag => [ "foo_%{somefield}", "taggedy_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将添加标签 foo_hello (第二个示例当然会添加一个 taggedy_tag 标签)。

enable_metric

编辑
  • 值类型是 boolean
  • 默认值是 true

禁用或启用此特定插件实例的指标日志记录。默认情况下,我们会记录所有可以记录的指标,但您可以禁用特定插件的指标收集。

  • 值类型是 string
  • 此设置没有默认值。

为插件配置添加唯一的 ID。如果未指定 ID,Logstash 将生成一个 ID。强烈建议您在配置中设置此 ID。当您有两个或多个相同类型的插件时,这尤其有用,例如,如果您有 2 个聚合过滤器。在这种情况下添加命名的 ID 将有助于在使用监控 API 时监控 Logstash。

    filter {
      aggregate {
        id => "ABC"
      }
    }

id 字段中的变量替换仅支持环境变量,不支持使用来自秘密存储的值。

periodic_flush

编辑
  • 值类型是 boolean
  • 默认值是 false

定期调用过滤器刷新方法。可选。

remove_field

编辑
  • 值类型是 array
  • 默认值是 []

如果此过滤器成功,则从此事件中删除任意字段。字段名称可以是动态的,并且可以使用 %{field} 包括事件的一部分 例如

    filter {
      aggregate {
        remove_field => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple fields at once:
    filter {
      aggregate {
        remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时,如果存在,将删除名称为 foo_hello 的字段。第二个示例将删除一个额外的非动态字段。

remove_tag

编辑
  • 值类型是 array
  • 默认值是 []

如果此过滤器成功,则从此事件中删除任意标签。标签可以是动态的,并且可以使用 %{field} 语法包含事件的一部分。

示例

    filter {
      aggregate {
        remove_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple tags at once:
    filter {
      aggregate {
        remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将删除标签 foo_hello (如果存在)。第二个示例也将删除一个悲伤的、不需要的标签。