Jdbc_static 过滤器插件

编辑

Jdbc_static 过滤器插件

编辑

对于其他版本,请参阅 版本化插件文档

获取帮助

编辑

有关插件的问题,请在 Discuss 论坛中发起主题。对于错误或功能请求,请在 Github 中提交问题。有关 Elastic 支持的插件列表,请参阅 Elastic 支持矩阵

描述

编辑

此过滤器使用从远程数据库预加载的数据来丰富事件。

此过滤器最适合使用静态或不经常更改的参考数据(例如环境、用户和产品)来丰富事件。

此过滤器的工作原理是从远程数据库获取数据,将其缓存在本地内存中的 Apache Derby 数据库中,并使用查找来使用缓存在本地数据库中的数据丰富事件。您可以设置过滤器以加载一次远程数据(对于静态数据),或者您可以安排定期运行远程加载(对于需要刷新的数据)。

要定义过滤器,您需要指定三个主要部分:local_db_objects、loaders 和 lookups。

local_db_objects
定义用于构建本地数据库结构的列、类型和索引。列名和类型应与外部数据库匹配。根据需要定义尽可能多的这些对象以构建本地数据库结构。
loaders
查询外部数据库以获取将缓存在本地的数据集。根据需要定义尽可能多的加载器以获取远程数据。每个加载器都应填充由 local_db_objects 定义的表。确保加载器 SQL 语句中的列名和数据类型与 local_db_objects 下定义的列匹配。每个加载器都有一个独立的远程数据库连接。
lookups

对本地数据库执行查找查询以丰富事件。根据需要定义尽可能多的查找以从一次传递中的所有查找表中丰富事件。理想情况下,SQL 语句应只返回一行。任何行都将转换为 Hash 对象,并存储在一个作为数组的目标字段中。

以下示例配置从远程数据库获取数据,将其缓存在本地数据库中,并使用查找来使用缓存在本地数据库中的数据丰富事件。

filter {
  jdbc_static {
    loaders => [ 
      {
        id => "remote-servers"
        query => "select ip, descr from ref.local_ips order by ip"
        local_table => "servers"
      },
      {
        id => "remote-users"
        query => "select firstname, lastname, userid from ref.local_users order by userid"
        local_table => "users"
      }
    ]
    local_db_objects => [ 
      {
        name => "servers"
        index_columns => ["ip"]
        columns => [
          ["ip", "varchar(15)"],
          ["descr", "varchar(255)"]
        ]
      },
      {
        name => "users"
        index_columns => ["userid"]
        columns => [
          ["firstname", "varchar(255)"],
          ["lastname", "varchar(255)"],
          ["userid", "int"]
        ]
      }
    ]
    local_lookups => [ 
      {
        id => "local-servers"
        query => "SELECT descr as description FROM servers WHERE ip = :ip"
        parameters => {ip => "[from_ip]"}
        target => "server"
      },
      {
        id => "local-users"
        query => "SELECT firstname, lastname FROM users WHERE userid = ? AND country = ?"
        prepared_parameters => ["[loggedin_userid]", "[user_nation]"] 
        target => "user" 
        default_hash => { 
          firstname => nil
          lastname => nil
        }
      }
    ]
    # using add_field here to add & rename values to the event root
    add_field => { server_name => "%{[server][0][description]}" } 
    add_field => { user_firstname => "%{[user][0][firstname]}" }
    add_field => { user_lastname => "%{[user][0][lastname]}" }
    remove_field => ["server", "user"]
    staging_directory => "/tmp/logstash/jdbc_static/import_data"
    loader_schedule => "* */2 * * *" # run loaders every 2 hours
    jdbc_user => "logstash"
    jdbc_password => "example"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2"
  }
}

output {
  if "_jdbcstaticdefaultsused" in [tags] {
    # Print all the not found users
    stdout { }
  }
}

查询外部数据库以获取将缓存在本地的数据集。

定义用于构建本地数据库结构的列、类型和索引。列名和类型应与外部数据库匹配。表定义的顺序很重要,应与加载器查询的顺序匹配。请参阅 加载器列和 local_db_object 顺序依赖关系

对本地数据库执行查找查询以丰富事件。

本地查找查询还可以使用准备好的语句,其中参数遵循位置顺序。

指定将存储查找数据的事件字段。如果查找返回多个列,则数据将作为 JSON 对象存储在字段中。

当在数据库中找不到用户时,将使用来自 local_lookups default hash 设置的数据创建事件,并且事件将使用 tag_on_default_use 中设置的列表进行标记。

获取 JSON 对象中的数据并将其存储在顶级事件字段中,以便在 Kibana 中更轻松地进行分析。

以下是一个完整的示例

input {
  generator {
    lines => [
      '{"from_ip": "10.2.3.20", "app": "foobar", "amount": 32.95}',
      '{"from_ip": "10.2.3.30", "app": "barfoo", "amount": 82.95}',
      '{"from_ip": "10.2.3.40", "app": "bazfoo", "amount": 22.95}'
    ]
    count => 200
  }
}

filter {
  json {
    source => "message"
  }

  jdbc_static {
    loaders => [
      {
        id => "servers"
        query => "select ip, descr from ref.local_ips order by ip"
        local_table => "servers"
      }
    ]
    local_db_objects => [
      {
        name => "servers"
        index_columns => ["ip"]
        columns => [
          ["ip", "varchar(15)"],
          ["descr", "varchar(255)"]
        ]
      }
    ]
    local_lookups => [
      {
        query => "select descr as description from servers WHERE ip = :ip"
        parameters => {ip => "[from_ip]"}
        target => "server"
      }
    ]
    staging_directory => "/tmp/logstash/jdbc_static/import_data"
    loader_schedule => "*/30 * * * *"
    jdbc_user => "logstash"
    jdbc_password => "logstash??"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/Users/guy/tmp/logstash-6.0.0/vendor/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/ls_test_2"
  }
}

output {
  stdout {
    codec => rubydebug {metadata => true}
  }
}

假设加载器从 Postgres 数据库获取以下数据

select * from ref.local_ips order by ip;
    ip     |         descr
-----------+-----------------------
 10.2.3.10 | Authentication Server
 10.2.3.20 | Payments Server
 10.2.3.30 | Events Server
 10.2.3.40 | Payroll Server
 10.2.3.50 | Uploads Server

事件根据 IP 值使用服务器的描述进行丰富

{
           "app" => "bazfoo",
      "sequence" => 0,
        "server" => [
        [0] {
            "description" => "Payroll Server"
        }
    ],
        "amount" => 22.95,
    "@timestamp" => 2017-11-30T18:08:15.694Z,
      "@version" => "1",
          "host" => "Elastics-MacBook-Pro.local",
       "message" => "{\"from_ip\": \"10.2.3.40\", \"app\": \"bazfoo\", \"amount\": 22.95}",
       "from_ip" => "10.2.3.40"
}

在多个管道中使用此插件

编辑

Logstash 使用单个内存中的 Apache Derby 实例作为整个 JVM 的查找数据库引擎。因为每个插件实例都在共享的 Derby 引擎中使用唯一的数据库,所以插件尝试创建和填充相同表时不应发生冲突。无论插件是在单个管道中定义还是在多个管道中定义,情况都是如此。但是,设置过滤器后,您应该监视查找结果并查看日志以验证操作是否正确。

加载器列和 local_db_object 顺序依赖关系

编辑

出于加载器性能原因,加载机制使用带有内置 Derby 文件导入过程的 CSV 样式文件将远程数据添加到本地数据库中。检索到的列按原样写入 CSV 文件,并且文件导入过程期望与 local_db_object 设置中指定的列顺序一一对应。请确保此顺序已就位。

与 Elastic Common Schema (ECS) 的兼容性

编辑

此插件与 Elastic Common Schema (ECS) 兼容。除了在启用 ECS 且未设置 target 时发出警告外,其行为与 ECS 兼容性无关。

设置 target 选项以避免潜在的模式冲突。

Jdbc_static 过滤器配置选项

编辑

此插件支持以下配置选项以及稍后描述的 通用选项

另请参阅 通用选项,了解所有过滤器插件支持的选项列表。

 

jdbc_connection_string

编辑
  • 这是必需的设置。
  • 值类型为 字符串
  • 此设置没有默认值。

JDBC 连接字符串。

jdbc_driver_class

编辑
  • 这是必需的设置。
  • 值类型为 字符串
  • 此设置没有默认值。

要加载的 JDBC 驱动程序类,例如“org.apache.derby.jdbc.ClientDriver”。

根据 问题 43,如果您使用的是 Oracle JDBC 驱动程序 (ojdbc6.jar),则正确的 jdbc_driver_class"Java::oracle.jdbc.driver.OracleDriver"

jdbc_driver_library

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

指向第三方驱动程序库的 JDBC 驱动程序库路径。如果需要多个库,请在一个字符串中使用逗号分隔的路径。

如果未提供驱动程序类,则插件会在 Logstash Java 类路径中查找它。

jdbc_password

编辑
  • 值类型为 密码
  • 此设置没有默认值。

JDBC 密码。

jdbc_user

编辑
  • 这是必需的设置。
  • 值类型为 字符串
  • 此设置没有默认值。

JDBC 用户。

tag_on_default_use

编辑
  • 值类型为 数组
  • 默认值为 ["_jdbcstaticdefaultsused"]

如果未找到记录并使用了默认值,则将值追加到 tags 字段。

tag_on_failure

编辑
  • 值类型为 数组
  • 默认值为 ["_jdbcstaticfailure"]

如果发生 SQL 错误,则将值追加到 tags 字段。

staging_directory

编辑
  • 值类型为 字符串
  • 默认值来自 Ruby 临时目录 + 插件名称 + "import_data"
  • 例如 "/tmp/logstash/jdbc_static/import_data"

用于暂存批量加载数据的目录,应有足够的磁盘空间来处理您希望用于丰富事件的数据。由于 Apache Derby 中存在一个未解决的错误,此插件的先前版本在加载超过几千行的数据集方面处理得不好。此设置引入了一种加载大型记录集的替代方法。接收每行时,它都会被发送到文件,然后使用系统导入表系统调用导入该文件。

如果发生 SQL 错误,则将值追加到 tags 字段。

loader_schedule

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

您可以根据特定计划安排定期运行远程加载。此计划语法由 rufus-scheduler 提供支持。语法类似于 cron,但有一些特定于 Rufus 的扩展(例如,时区支持)。有关此语法的更多信息,请参阅 解析 cron 行和时间字符串

示例

*/30 * * * *

将在每天每小时的第 0 分钟和第 30 分钟执行。

* 5 * 1-3 *

将在 1 月到 3 月的每天凌晨 5 点的每一分钟执行。

0 * * * *

将在每天每小时的第 0 分钟执行。

0 6 * * * America/Chicago

将在每天上午 6:00(UTC/GMT -5)执行。

使用 Logstash 交互式 shell 进行调试

bin/logstash -i irb
irb(main):001:0> require 'rufus-scheduler'
=> true
irb(main):002:0> Rufus::Scheduler.parse('*/10 * * * *')
=> #<Rufus::Scheduler::CronLine:0x230f8709 @timezone=nil, @weekdays=nil, @days=nil, @seconds=[0], @minutes=[0, 10, 20, 30, 40, 50], @hours=nil, @months=nil, @monthdays=nil, @original="*/10 * * * *">
irb(main):003:0> exit

上述调用返回的对象(Rufus::Scheduler::CronLine 的实例)显示了执行的秒数、分钟数等。

loaders

编辑
  • 值类型为 数组
  • 默认值为 []

数组应包含一个或多个哈希。每个哈希都根据下表进行验证。

设置 输入类型 必填

id

字符串

local_table

字符串

query

字符串

max_rows

数字

jdbc_connection_string

字符串

jdbc_driver_class

字符串

jdbc_driver_library

有效的文件系统路径

jdbc_password

密码

jdbc_user

字符串

加载器字段描述

id
可选标识符。用于识别正在生成错误消息和日志行的加载器。
local_table
加载器将填充的本地查找数据库中的目标表。
query
执行以获取远程记录的 SQL 语句。使用 SQL 别名和强制转换以确保记录的列和数据类型与本地数据库中定义的表结构(如 local_db_objects 中定义)匹配。
max_rows
此设置的默认值为 100 万。因为查找数据库位于内存中,所以它将占用 JVM 堆空间。如果查询返回数百万行,则应增加分配给 Logstash 的 JVM 内存或限制返回的行数,也许限制为事件数据中最常出现的那些行。
jdbc_connection_string
如果在加载器中未设置,则此设置默认为插件级别的 jdbc_connection_string 设置。
jdbc_driver_class
如果在加载器中未设置,则此设置默认为插件级别的 jdbc_driver_class 设置。
jdbc_driver_library
如果在加载器中未设置,则此设置默认为插件级别的 jdbc_driver_library 设置。
jdbc_password
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_password 设置。
jdbc_user
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_user 设置。

local_db_objects

编辑
  • 值类型为 数组
  • 默认值为 []

该数组应包含一个或多个哈希。每个哈希代表本地查找数据库的表模式。每个哈希根据下表进行验证。

设置 输入类型 必填

名称

字符串

数组

索引列

数字

保留现有

布尔值

Local_db_objects 字段描述

名称
要在数据库中创建的表的名称。
列规范数组。每个列规范都是一个恰好包含两个元素的数组,例如 ["ip", "varchar(15)"]。第一个元素是列名称字符串。第二个元素是一个字符串,表示 Apache Derby SQL 类型。在构建本地查找表时检查字符串内容,而不是在验证设置时检查。因此,任何拼写错误的 SQL 类型字符串都会导致错误。
索引列
字符串数组。每个字符串都必须在 columns 设置中定义。索引名称将在内部生成。不支持唯一或排序索引。
保留现有
此设置在 true 时,检查表是否已存在于本地查找数据库中。如果您在 Logstash 的同一实例中运行多个管道,并且多个管道正在使用此插件,则必须阅读页面顶部的关于多个管道的重要说明。

local_lookups

编辑
  • 值类型为 数组
  • 默认值为 []

该数组应包含一个或多个哈希。每个哈希代表一个查找增强。每个哈希根据下表进行验证。

设置 输入类型 必填

id

字符串

query

字符串

参数

哈希

目标

字符串

默认哈希

哈希

tag_on_failure

字符串

tag_on_default_use

字符串

Local_lookups 字段描述

id
可选标识符。用于标识正在生成错误消息和日志行的查找。如果您省略此设置,则会使用默认 ID。
query
执行以实现查找的 SQL SELECT 语句。要使用参数,请使用命名参数语法,例如 "SELECT * FROM MYTABLE WHERE ID = :id"。或者,可以使用 ? 作为预准备语句参数,在这种情况下,prepared_parameters 数组用于填充值
参数
键/值哈希或字典。键(LHS)是 SQL 语句 SELECT * FROM sensors WHERE reference = :p1 中替换的文本。值(RHS)是事件中的字段名称。插件从事件中的此键读取值,并将该值替换到语句中,例如,parameters => { "p1" => "ref" }。引用是自动的 - 您无需在语句中添加引号。仅当您需要添加前缀/后缀或连接两个事件字段值以构建替换值时,才在 RHS 上使用字段插值语法。例如,假设一个物联网消息具有 id 和位置,并且您有一个传感器表,该表有一列 id-loc_id。在这种情况下,您的参数哈希将如下所示:parameters => { "p1" => "%{[id]}-%{[loc_id]}" }
prepared_parameters
一个数组,其中位置与查询语法中 ? 的位置相关。数组的值遵循与 parameters 相同的语义。如果 prepared_parameters 被赋值,则过滤器将强制使用 JDBC 的预准备语句来查询本地数据库。预准备语句提供了两个好处:一个是在性能方面,因为避免了 DBMS 为每次调用解析和编译 SQL 表达式;另一个好处是在安全方面,使用预准备语句避免了基于查询字符串连接的 SQL 注入攻击。
目标
将接收查找数据的字段的可选名称。如果您省略此设置,则使用 id 设置(或默认 ID)。查找数据(转换为哈希的结果数组)永远不会添加到事件的根目录。如果您想这样做,则应使用 add_field 设置。这意味着您可以完全控制如何在事件的根目录中放置字段/值,例如,add_field => { user_firstname => "%{[user][0][firstname]}" } - 其中 [user] 是目标字段,[0] 是数组中的第一个结果,[firstname] 是结果哈希中的键。
默认哈希
查找返回无结果时将放入目标字段数组中的可选哈希。如果您需要确保配置其他部分中的后续引用实际上是指某些内容,请使用此设置。
tag_on_failure
覆盖插件级别设置的可选字符串。这在定义多个查找时很有用。
tag_on_default_use
覆盖插件级别设置的可选字符串。这在定义多个查找时很有用。

常用选项

编辑

所有过滤器插件都支持这些配置选项

add_field

编辑
  • 值类型为 哈希
  • 默认值为 {}

如果此过滤器成功,则将任何任意字段添加到此事件。字段名称可以是动态的,并包含使用 %{field} 的事件部分。

示例

    filter {
      jdbc_static {
        add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
      }
    }
    # You can also add multiple fields at once:
    filter {
      jdbc_static {
        add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将添加字段 foo_hello(如果存在),其值为上述值,并且 %{host} 部分替换为事件中的该值。第二个示例还将添加一个硬编码字段。

add_tag

编辑
  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则将任意标签添加到事件。标签可以是动态的,并包含使用 %{field} 语法的事件部分。

示例

    filter {
      jdbc_static {
        add_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also add multiple tags at once:
    filter {
      jdbc_static {
        add_tag => [ "foo_%{somefield}", "taggedy_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将添加一个标签 foo_hello(第二个示例当然会添加一个 taggedy_tag 标签)。

enable_metric

编辑

禁用或启用此特定插件实例的指标日志记录。默认情况下,我们记录所有可以记录的指标,但您可以禁用特定插件的指标收集。

  • 值类型为 字符串
  • 此设置没有默认值。

在插件配置中添加唯一的 ID。如果未指定 ID,Logstash 将生成一个。强烈建议在配置中设置此 ID。当您有两个或多个相同类型的插件时,这尤其有用,例如,如果您有两个 jdbc_static 过滤器。在这种情况下,添加命名 ID 将有助于在使用监控 API 时监控 Logstash。

    filter {
      jdbc_static {
        id => "ABC"
      }
    }

id 字段中的变量替换仅支持环境变量,不支持使用密钥存储中的值。

periodic_flush

编辑

定期调用过滤器刷新方法。可选。

remove_field

编辑
  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则从此事件中删除任意字段。字段名称可以是动态的,并包含使用 %{field} 示例的事件部分

    filter {
      jdbc_static {
        remove_field => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple fields at once:
    filter {
      jdbc_static {
        remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将删除名称为 foo_hello 的字段(如果存在)。第二个示例将删除一个额外的非动态字段。

remove_tag

编辑
  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则从事件中删除任意标签。标签可以是动态的,并包含使用 %{field} 语法的事件部分。

示例

    filter {
      jdbc_static {
        remove_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple tags at once:
    filter {
      jdbc_static {
        remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将删除标签 foo_hello(如果存在)。第二个示例还将删除一个令人悲伤的、不需要的标签。