Jdbc_static 过滤器插件

编辑

Jdbc_static 过滤器插件

编辑

有关其他版本,请参阅版本化的插件文档

获取帮助

编辑

有关插件的问题,请在Discuss论坛中打开一个主题。对于错误或功能请求,请在Github中打开一个问题。有关 Elastic 支持的插件列表,请查阅Elastic 支持矩阵

描述

编辑

此过滤器使用从远程数据库预加载的数据来丰富事件。

此过滤器最适合使用静态或不经常更改的参考数据(例如环境、用户和产品)来丰富事件。

此过滤器的工作方式是从远程数据库提取数据,将其缓存在本地的内存中Apache Derby数据库中,并使用查找来使用本地数据库中缓存的数据丰富事件。您可以设置过滤器以加载一次远程数据(对于静态数据),也可以安排定期运行远程加载(对于需要刷新的数据)。

要定义过滤器,您需要指定三个主要部分:local_db_objects、loaders 和 lookups。

local_db_objects
定义用于构建本地数据库结构的列、类型和索引。列名称和类型应与外部数据库匹配。定义尽可能多的这些对象,以构建本地数据库结构。
loaders
查询外部数据库以提取将在本地缓存的数据集。定义尽可能多的加载器以提取远程数据。每个加载器应填充由 local_db_objects定义的表。确保加载器 SQL 语句中的列名称和数据类型与 local_db_objects下定义的列匹配。每个加载器都有一个独立的远程数据库连接。
lookups

对本地数据库执行查找查询以丰富事件。定义尽可能多的查找,以便在一次传递中从所有查找表中丰富事件。理想情况下,SQL 语句应仅返回一行。任何行都将转换为 Hash 对象并存储在作为 Array 的目标字段中。

以下示例配置从远程数据库中提取数据,将其缓存在本地数据库中,并使用查找来使用本地数据库中缓存的数据丰富事件。

filter {
  jdbc_static {
    loaders => [ 
      {
        id => "remote-servers"
        query => "select ip, descr from ref.local_ips order by ip"
        local_table => "servers"
      },
      {
        id => "remote-users"
        query => "select firstname, lastname, userid from ref.local_users order by userid"
        local_table => "users"
      }
    ]
    local_db_objects => [ 
      {
        name => "servers"
        index_columns => ["ip"]
        columns => [
          ["ip", "varchar(15)"],
          ["descr", "varchar(255)"]
        ]
      },
      {
        name => "users"
        index_columns => ["userid"]
        columns => [
          ["firstname", "varchar(255)"],
          ["lastname", "varchar(255)"],
          ["userid", "int"]
        ]
      }
    ]
    local_lookups => [ 
      {
        id => "local-servers"
        query => "SELECT descr as description FROM servers WHERE ip = :ip"
        parameters => {ip => "[from_ip]"}
        target => "server"
      },
      {
        id => "local-users"
        query => "SELECT firstname, lastname FROM users WHERE userid = ? AND country = ?"
        prepared_parameters => ["[loggedin_userid]", "[user_nation]"] 
        target => "user" 
        default_hash => { 
          firstname => nil
          lastname => nil
        }
      }
    ]
    # using add_field here to add & rename values to the event root
    add_field => { server_name => "%{[server][0][description]}" } 
    add_field => { user_firstname => "%{[user][0][firstname]}" }
    add_field => { user_lastname => "%{[user][0][lastname]}" }
    remove_field => ["server", "user"]
    staging_directory => "/tmp/logstash/jdbc_static/import_data"
    loader_schedule => "* */2 * * *" # run loaders every 2 hours
    jdbc_user => "logstash"
    jdbc_password => "example"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2"
  }
}

output {
  if "_jdbcstaticdefaultsused" in [tags] {
    # Print all the not found users
    stdout { }
  }
}

查询外部数据库以提取将在本地缓存的数据集。

定义用于构建本地数据库结构的列、类型和索引。列名称和类型应与外部数据库匹配。表定义的顺序很重要,应与加载器查询的顺序匹配。请参阅加载器列和 local_db_object 顺序依赖关系

对本地数据库执行查找查询以丰富事件。

本地查找查询也可以使用预处理语句,其中参数遵循位置顺序。

指定将存储查找数据的事件字段。如果查找返回多个列,则数据将作为 JSON 对象存储在该字段中。

当在数据库中找不到用户时,将使用local_lookups default hash 设置中的数据创建事件,并且该事件将使用 tag_on_default_use 中设置的列表进行标记。

从 JSON 对象中获取数据并将其存储在顶层事件字段中,以便在 Kibana 中进行更轻松的分析。

这是一个完整的示例

input {
  generator {
    lines => [
      '{"from_ip": "10.2.3.20", "app": "foobar", "amount": 32.95}',
      '{"from_ip": "10.2.3.30", "app": "barfoo", "amount": 82.95}',
      '{"from_ip": "10.2.3.40", "app": "bazfoo", "amount": 22.95}'
    ]
    count => 200
  }
}

filter {
  json {
    source => "message"
  }

  jdbc_static {
    loaders => [
      {
        id => "servers"
        query => "select ip, descr from ref.local_ips order by ip"
        local_table => "servers"
      }
    ]
    local_db_objects => [
      {
        name => "servers"
        index_columns => ["ip"]
        columns => [
          ["ip", "varchar(15)"],
          ["descr", "varchar(255)"]
        ]
      }
    ]
    local_lookups => [
      {
        query => "select descr as description from servers WHERE ip = :ip"
        parameters => {ip => "[from_ip]"}
        target => "server"
      }
    ]
    staging_directory => "/tmp/logstash/jdbc_static/import_data"
    loader_schedule => "*/30 * * * *"
    jdbc_user => "logstash"
    jdbc_password => "logstash??"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/Users/guy/tmp/logstash-6.0.0/vendor/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/ls_test_2"
  }
}

output {
  stdout {
    codec => rubydebug {metadata => true}
  }
}

假设加载器从 Postgres 数据库中提取以下数据

select * from ref.local_ips order by ip;
    ip     |         descr
-----------+-----------------------
 10.2.3.10 | Authentication Server
 10.2.3.20 | Payments Server
 10.2.3.30 | Events Server
 10.2.3.40 | Payroll Server
 10.2.3.50 | Uploads Server

这些事件根据 IP 的值丰富了服务器的描述

{
           "app" => "bazfoo",
      "sequence" => 0,
        "server" => [
        [0] {
            "description" => "Payroll Server"
        }
    ],
        "amount" => 22.95,
    "@timestamp" => 2017-11-30T18:08:15.694Z,
      "@version" => "1",
          "host" => "Elastics-MacBook-Pro.local",
       "message" => "{\"from_ip\": \"10.2.3.40\", \"app\": \"bazfoo\", \"amount\": 22.95}",
       "from_ip" => "10.2.3.40"
}

将此插件与多个管道一起使用

编辑

Logstash 使用单个内存中的 Apache Derby 实例作为整个 JVM 的查找数据库引擎。由于每个插件实例都在共享的 Derby 引擎中使用唯一的数据库,因此插件尝试创建和填充相同表时应该不会有冲突。无论插件是在单个管道中还是在多个管道中定义,都是如此。但是,在设置过滤器后,您应查看查找结果并查看日志以验证操作是否正确。

加载器列和 local_db_object 顺序依赖关系

编辑

出于加载器性能原因,加载机制使用 CSV 样式的文件和内置的 Derby 文件导入过程,将远程数据添加到本地数据库。检索到的列按原样写入 CSV 文件,并且文件导入过程期望与 local_db_object 设置中指定的列顺序一一对应。请确保此顺序到位。

与 Elastic Common Schema (ECS) 的兼容性

编辑

此插件与Elastic Common Schema (ECS)兼容。它的行为与 ECS 兼容性无关,只是在启用 ECS 且未设置target 时发出警告。

设置 target 选项以避免潜在的架构冲突。

Jdbc_static 过滤器配置选项

编辑

此插件支持以下配置选项以及稍后描述的常用选项

另请参阅常用选项,了解所有过滤器插件支持的选项列表。

 

jdbc_connection_string

编辑
  • 这是一个必需的设置。
  • 值类型为字符串
  • 此设置没有默认值。

JDBC 连接字符串。

jdbc_driver_class

编辑
  • 这是一个必需的设置。
  • 值类型为字符串
  • 此设置没有默认值。

要加载的 JDBC 驱动程序类,例如“org.apache.derby.jdbc.ClientDriver”。

根据Issue 43,如果您使用 Oracle JDBC 驱动程序 (ojdbc6.jar),则正确的 jdbc_driver_class"Java::oracle.jdbc.driver.OracleDriver"

jdbc_driver_library

编辑
  • 值类型为字符串
  • 此设置没有默认值。

第三方驱动程序库的 JDBC 驱动程序库路径。如果您需要多个库,请在一个字符串中使用逗号分隔的路径。

如果未提供驱动程序类,则插件会在 Logstash Java 类路径中查找它。

jdbc_password

编辑
  • 值类型为密码
  • 此设置没有默认值。

JDBC 密码。

jdbc_user

编辑
  • 这是一个必需的设置。
  • 值类型为字符串
  • 此设置没有默认值。

JDBC 用户。

tag_on_default_use

编辑
  • 值类型为数组
  • 默认值为 ["_jdbcstaticdefaultsused"]

如果未找到记录并且使用了默认值,则将值附加到 tags 字段。

tag_on_failure

编辑
  • 值类型为数组
  • 默认值为 ["_jdbcstaticfailure"]

如果发生 SQL 错误,则将值附加到 tags 字段。

staging_directory

编辑
  • 值类型为字符串
  • 默认值派生自 Ruby 临时目录 + plugin_name + "import_data"
  • 例如,"/tmp/logstash/jdbc_static/import_data"

用于暂存数据的目录,以进行批量加载,应有足够的磁盘空间来处理您希望用于丰富事件的数据。由于 Apache Derby 中的一个未解决的错误,此插件的早期版本无法很好地处理超过数千行的加载数据集。此设置引入了一种加载大型记录集的可替代方法。接收到每一行时,都会将其假脱机到文件,然后使用系统导入表系统调用导入该文件。

如果发生 SQL 错误,则将值附加到 tags 字段。

loader_schedule

编辑
  • 值类型为字符串
  • 此设置没有默认值。

您可以根据特定计划定期安排远程加载运行。此调度语法由rufus-scheduler提供支持。该语法类似于 cron,其中有一些 Rufus 特有的扩展(例如,时区支持)。有关此语法的更多信息,请参阅解析 cron 行和时间字符串

示例

*/30 * * * *

将在每天每小时的第 0 和第 30 分钟执行。

* 5 * 1-3 *

将在 1 月至 3 月每天的凌晨 5 点每分钟执行。

0 * * * *

将在每天每小时的第 0 分钟执行。

0 6 * * * America/Chicago

将在每天的上午 6:00(UTC/GMT -5)执行。

使用 Logstash 交互式 Shell 进行调试

bin/logstash -i irb
irb(main):001:0> require 'rufus-scheduler'
=> true
irb(main):002:0> Rufus::Scheduler.parse('*/10 * * * *')
=> #<Rufus::Scheduler::CronLine:0x230f8709 @timezone=nil, @weekdays=nil, @days=nil, @seconds=[0], @minutes=[0, 10, 20, 30, 40, 50], @hours=nil, @months=nil, @monthdays=nil, @original="*/10 * * * *">
irb(main):003:0> exit

上述调用返回的对象,即 Rufus::Scheduler::CronLine 的实例,显示了执行的秒、分等。

loaders

编辑
  • 值类型为数组
  • 默认值为 []

该数组应包含一个或多个哈希。每个哈希都会根据下表进行验证。

设置 输入类型 必需

id

字符串

local_table

字符串

query

字符串

max_rows

数字

jdbc_connection_string

字符串

jdbc_driver_class

字符串

jdbc_driver_library

有效的的文件系统路径

jdbc_password

密码

jdbc_user

字符串

加载器字段描述

id
可选标识符。此标识符用于标识生成错误消息和日志行的加载器。
local_table
加载器将填充的本地查找数据库中的目标表。
query
执行以提取远程记录的 SQL 语句。使用 SQL 别名和强制转换以确保记录的列和数据类型与 local_db_objects 中定义的本地数据库中的表结构匹配。
max_rows
此设置的默认值为 100 万。由于查找数据库在内存中,因此它将占用 JVM 堆空间。如果查询返回数百万行,则应增加为 Logstash 提供的 JVM 内存或限制返回的行数,或许限制为事件数据中最常找到的行数。
jdbc_connection_string
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_connection_string 设置。
jdbc_driver_class
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_driver_class 设置。
jdbc_driver_library
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_driver_library 设置。
jdbc_password
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_password 设置。
jdbc_user
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_user 设置。

local_db_objects

编辑
  • 值类型为数组
  • 默认值为 []

该数组应包含一个或多个哈希。每个哈希表示本地查找数据库的表架构。每个哈希都会根据下表进行验证。

设置 输入类型 必需

name

字符串

数组

索引列

数字

保留现有

布尔值

Local_db_objects 字段描述

name
要在数据库中创建的表的名称。
列规范的数组。每个列规范都是一个包含两个元素的数组,例如 ["ip", "varchar(15)"]。第一个元素是列名字符串。第二个元素是一个字符串,它是 Apache Derby SQL 类型。字符串内容在构建本地查找表时检查,而不是在验证设置时检查。因此,任何拼写错误的 SQL 类型字符串都会导致错误。
索引列
字符串数组。每个字符串都必须在 columns 设置中定义。索引名称将在内部生成。不支持唯一或排序的索引。
保留现有
当此设置设置为 true 时,将检查本地查找数据库中是否已存在该表。如果您的同一 Logstash 实例中运行多个管道,并且多个管道都使用此插件,则您必须阅读页面顶部的重要的多管道通知。

local_lookups

编辑
  • 值类型为数组
  • 默认值为 []

该数组应包含一个或多个哈希值。每个哈希值表示一个查找扩充。每个哈希值都将根据下表进行验证。

设置 输入类型 必需

id

字符串

query

字符串

参数

哈希

目标

字符串

默认哈希

哈希

tag_on_failure

字符串

tag_on_default_use

字符串

Local_lookups 字段描述

id
一个可选的标识符。用于标识生成错误消息和日志行的查找。如果省略此设置,则将使用默认 ID。
query
为了实现查找而执行的 SQL SELECT 语句。要使用参数,请使用命名参数语法,例如 "SELECT * FROM MYTABLE WHERE ID = :id"。或者,可以使用 ? 符号作为预处理语句参数,在这种情况下,将使用 prepared_parameters 数组填充值
参数
键/值哈希或字典。键(LHS)是在 SQL 语句 SELECT * FROM sensors WHERE reference = :p1 中替换的文本。值 (RHS) 是事件中的字段名称。插件从事件中读取此键的值,并将该值替换到语句中,例如,parameters => { "p1" => "ref" }。引用是自动的 - 您无需在语句中放置引号。只有在需要添加前缀/后缀或将两个事件字段值连接在一起以构建替换值时,才在 RHS 上使用字段插值语法。例如,假设一个 IOT 消息有一个 id 和一个位置,并且您有一个传感器表,该表的列为 id-loc_id。在这种情况下,您的参数哈希应如下所示: parameters => { "p1" => "%{[id]}-%{[loc_id]}" }
prepared_parameters
一个数组,其中位置与查询语法中 ? 的位置相关。数组的值遵循与 parameters 相同的语义。如果 prepared_parameters 被赋值,则强制过滤器使用 JDBC 的预处理语句查询本地数据库。预处理语句提供两个好处:一个是性能方面的,因为它避免了 DBMS 为每次调用解析和编译 SQL 表达式;另一个好处是安全性方面的,使用预处理语句可以避免基于查询字符串连接的 SQL 注入攻击。
目标
一个可选的字段名称,用于接收查找的数据。如果省略此设置,则使用 id 设置(或默认 ID)。查找的数据(转换为哈希的结果数组)永远不会添加到事件的根。如果要执行此操作,则应使用 add_field 设置。这意味着您可以完全控制如何将字段/值放入事件的根目录,例如,add_field => { user_firstname => "%{[user][0][firstname]}" } - 其中 [user] 是目标字段,[0] 是数组中的第一个结果,[firstname] 是结果哈希中的键。
默认哈希
一个可选的哈希,当查找未返回任何结果时,该哈希将放入目标字段数组中。如果您需要确保配置的其他部分中以后的引用实际上引用了某些内容,请使用此设置。
tag_on_failure
一个可选字符串,用于覆盖插件级别的设置。这在定义多个查找时非常有用。
tag_on_default_use
一个可选字符串,用于覆盖插件级别的设置。这在定义多个查找时非常有用。

通用选项

编辑

所有过滤器插件都支持这些配置选项

add_field

编辑
  • 值类型为 hash
  • 默认值为 {}

如果此过滤器成功,则将任何任意字段添加到此事件。字段名称可以是动态的,并且可以使用 %{field} 包含事件的部分内容。

示例

    filter {
      jdbc_static {
        add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
      }
    }
    # You can also add multiple fields at once:
    filter {
      jdbc_static {
        add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将添加字段 foo_hello(如果存在),其值如上所示,并且 %{host} 部分将替换为事件中的该值。第二个示例还将添加一个硬编码字段。

add_tag

编辑
  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则将任意标记添加到事件。标记可以是动态的,并且可以使用 %{field} 语法包含事件的部分内容。

示例

    filter {
      jdbc_static {
        add_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also add multiple tags at once:
    filter {
      jdbc_static {
        add_tag => [ "foo_%{somefield}", "taggedy_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将添加标记 foo_hello(并且第二个示例当然会添加一个 taggedy_tag 标记)。

enable_metric

编辑

禁用或启用此特定插件实例的指标日志记录。默认情况下,我们会记录所有可以记录的指标,但是您可以禁用特定插件的指标收集。

  • 值类型为 字符串
  • 此设置没有默认值。

向插件配置添加唯一的 ID。如果未指定 ID,则 Logstash 将生成一个 ID。强烈建议在配置中设置此 ID。当您有两个或多个相同类型的插件时,这尤其有用,例如,如果您有 2 个 jdbc_static 过滤器。在这种情况下,添加命名 ID 将有助于在使用监控 API 时监控 Logstash。

    filter {
      jdbc_static {
        id => "ABC"
      }
    }

id 字段中的变量替换仅支持环境变量,并且不支持使用密钥存储中的值。

periodic_flush

编辑

定期调用过滤器刷新方法。可选。

remove_field

编辑
  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则从此事件中删除任意字段。字段名称可以是动态的,并且可以使用 %{field} 包含事件的部分内容。示例

    filter {
      jdbc_static {
        remove_field => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple fields at once:
    filter {
      jdbc_static {
        remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将删除名称为 foo_hello 的字段(如果存在)。第二个示例将删除一个额外的非动态字段。

remove_tag

编辑
  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则从此事件中删除任意标记。标记可以是动态的,并且可以使用 %{field} 语法包含事件的部分内容。

示例

    filter {
      jdbc_static {
        remove_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple tags at once:
    filter {
      jdbc_static {
        remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将删除标记 foo_hello(如果存在)。第二个示例还将删除一个悲伤的、不需要的标记。