Jdbc_static 过滤器插件

编辑

Jdbc_static 过滤器插件编辑

有关其他版本,请参阅 版本化插件文档

获取帮助编辑

如果您对该插件有任何疑问,请在 Discuss 论坛上发布主题。如果您发现任何错误或有功能请求,请在 Github 上提交问题。有关 Elastic 支持的插件列表,请参阅 Elastic 支持矩阵

描述编辑

此过滤器使用从远程数据库预加载的数据来丰富事件。

此过滤器最适合使用静态或不经常更改的参考数据(例如环境、用户和产品)来丰富事件。

此过滤器的工作原理是从远程数据库获取数据,将其缓存在本地内存中的 Apache Derby 数据库中,并使用查找来使用本地数据库中缓存的数据来丰富事件。您可以将过滤器设置为加载远程数据一次(对于静态数据),或者您可以安排定期运行远程加载(对于需要刷新的数据)。

要定义过滤器,您需要指定三个主要部分:local_db_objects、loaders 和 lookups。

local_db_objects
定义用于构建本地数据库结构的列、类型和索引。列名和类型应与外部数据库匹配。根据需要定义尽可能多的这些对象以构建本地数据库结构。
loaders
查询外部数据库以获取将在本地缓存的数据集。根据需要定义尽可能多的加载器以获取远程数据。每个加载器都应填充由 local_db_objects 定义的表。确保加载器 SQL 语句中的列名和数据类型与 local_db_objects 下定义的列匹配。每个加载器都有一个独立的远程数据库连接。
lookups

对本地数据库执行查找查询以丰富事件。根据需要定义尽可能多的查找,以便一次性从所有查找表中丰富事件。理想情况下,SQL 语句应仅返回一行。所有行都将转换为哈希对象,并存储在作为数组的目标字段中。

以下示例配置从远程数据库获取数据,将其缓存在本地数据库中,并使用查找来使用本地数据库中缓存的数据来丰富事件。

filter {
  jdbc_static {
    loaders => [ 
      {
        id => "remote-servers"
        query => "select ip, descr from ref.local_ips order by ip"
        local_table => "servers"
      },
      {
        id => "remote-users"
        query => "select firstname, lastname, userid from ref.local_users order by userid"
        local_table => "users"
      }
    ]
    local_db_objects => [ 
      {
        name => "servers"
        index_columns => ["ip"]
        columns => [
          ["ip", "varchar(15)"],
          ["descr", "varchar(255)"]
        ]
      },
      {
        name => "users"
        index_columns => ["userid"]
        columns => [
          ["firstname", "varchar(255)"],
          ["lastname", "varchar(255)"],
          ["userid", "int"]
        ]
      }
    ]
    local_lookups => [ 
      {
        id => "local-servers"
        query => "SELECT descr as description FROM servers WHERE ip = :ip"
        parameters => {ip => "[from_ip]"}
        target => "server"
      },
      {
        id => "local-users"
        query => "SELECT firstname, lastname FROM users WHERE userid = ? AND country = ?"
        prepared_parameters => ["[loggedin_userid]", "[user_nation]"] 
        target => "user" 
        default_hash => { 
          firstname => nil
          lastname => nil
        }
      }
    ]
    # using add_field here to add & rename values to the event root
    add_field => { server_name => "%{[server][0][description]}" } 
    add_field => { user_firstname => "%{[user][0][firstname]}" }
    add_field => { user_lastname => "%{[user][0][lastname]}" }
    remove_field => ["server", "user"]
    staging_directory => "/tmp/logstash/jdbc_static/import_data"
    loader_schedule => "* */2 * * *" # run loaders every 2 hours
    jdbc_user => "logstash"
    jdbc_password => "example"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2"
  }
}

output {
  if "_jdbcstaticdefaultsused" in [tags] {
    # Print all the not found users
    stdout { }
  }
}

查询外部数据库以获取将在本地缓存的数据集。

定义用于构建本地数据库结构的列、类型和索引。列名和类型应与外部数据库匹配。表定义的顺序很重要,应与加载器查询的顺序匹配。请参阅 加载器列和 local_db_object 顺序依赖性

对本地数据库执行查找查询以丰富事件。

本地查找查询也可以使用预处理语句,其中参数遵循位置顺序。

指定将存储查找数据的事件字段。如果查找返回多列,则数据将作为 JSON 对象存储在字段中。

如果在数据库中找不到用户,则使用 local_lookups default hash 设置中的数据创建事件,并使用 tag_on_default_use 中设置的列表标记该事件。

从 JSON 对象中获取数据并将其存储在顶级事件字段中,以便于在 Kibana 中进行分析。

这是一个完整的示例

input {
  generator {
    lines => [
      '{"from_ip": "10.2.3.20", "app": "foobar", "amount": 32.95}',
      '{"from_ip": "10.2.3.30", "app": "barfoo", "amount": 82.95}',
      '{"from_ip": "10.2.3.40", "app": "bazfoo", "amount": 22.95}'
    ]
    count => 200
  }
}

filter {
  json {
    source => "message"
  }

  jdbc_static {
    loaders => [
      {
        id => "servers"
        query => "select ip, descr from ref.local_ips order by ip"
        local_table => "servers"
      }
    ]
    local_db_objects => [
      {
        name => "servers"
        index_columns => ["ip"]
        columns => [
          ["ip", "varchar(15)"],
          ["descr", "varchar(255)"]
        ]
      }
    ]
    local_lookups => [
      {
        query => "select descr as description from servers WHERE ip = :ip"
        parameters => {ip => "[from_ip]"}
        target => "server"
      }
    ]
    staging_directory => "/tmp/logstash/jdbc_static/import_data"
    loader_schedule => "*/30 * * * *"
    jdbc_user => "logstash"
    jdbc_password => "logstash??"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/Users/guy/tmp/logstash-6.0.0/vendor/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/ls_test_2"
  }
}

output {
  stdout {
    codec => rubydebug {metadata => true}
  }
}

假设加载器从 Postgres 数据库中获取以下数据

select * from ref.local_ips order by ip;
    ip     |         descr
-----------+-----------------------
 10.2.3.10 | Authentication Server
 10.2.3.20 | Payments Server
 10.2.3.30 | Events Server
 10.2.3.40 | Payroll Server
 10.2.3.50 | Uploads Server

事件将根据 IP 值 enriched with a description of the server

{
           "app" => "bazfoo",
      "sequence" => 0,
        "server" => [
        [0] {
            "description" => "Payroll Server"
        }
    ],
        "amount" => 22.95,
    "@timestamp" => 2017-11-30T18:08:15.694Z,
      "@version" => "1",
          "host" => "Elastics-MacBook-Pro.local",
       "message" => "{\"from_ip\": \"10.2.3.40\", \"app\": \"bazfoo\", \"amount\": 22.95}",
       "from_ip" => "10.2.3.40"
}

在多个管道中使用此插件编辑

Logstash 使用单个内存中的 Apache Derby 实例作为整个 JVM 的查找数据库引擎。因为每个插件实例在共享的 Derby 引擎中使用唯一的数据库,所以插件尝试创建和填充相同的表时应该不会发生冲突。无论插件是在单个管道中定义还是在多个管道中定义,都是如此。但是,在设置过滤器之后,您应该观察查找结果并查看日志以验证操作是否正确。

加载器列和 local_db_object 顺序依赖性编辑

出于加载器性能原因,加载机制使用 CSV 样式文件,该文件具有内置的 Derby 文件导入过程,用于将远程数据添加到本地数据库。检索到的列按原样写入 CSV 文件,并且文件导入过程期望与 local_db_object 设置中指定的列顺序一一对应。请确保此顺序已到位。

与 Elastic Common Schema (ECS) 的兼容性编辑

此插件与 Elastic Common Schema (ECS) 兼容。无论 ECS 兼容性如何,它的行为都是相同的,只是在启用 ECS 并且未设置 target 时会发出警告。

设置 target 选项以避免潜在的架构冲突。

Jdbc_static 过滤器配置选项编辑

此插件支持以下配置选项以及稍后描述的 通用选项

另请参阅 通用选项,以获取所有过滤器插件支持的选项列表。

 

jdbc_connection_string编辑

  • 这是一个必需设置。
  • 值类型为 字符串
  • 此设置没有默认值。

JDBC 连接字符串。

jdbc_driver_class编辑

  • 这是一个必需设置。
  • 值类型为 字符串
  • 此设置没有默认值。

要加载的 JDBC 驱动程序类,例如“org.apache.derby.jdbc.ClientDriver”。

根据 问题 43,如果您使用的是 Oracle JDBC 驱动程序 (ojdbc6.jar),则正确的 jdbc_driver_class"Java::oracle.jdbc.driver.OracleDriver"

jdbc_driver_library编辑

  • 值类型为 字符串
  • 此设置没有默认值。

JDBC 驱动程序库路径,指向第三方驱动程序库。如果需要多个库,请在一个字符串中使用逗号分隔的路径。

如果未提供驱动程序类,则插件将在 Logstash Java 类路径中查找它。

jdbc_password编辑

  • 值类型为 密码
  • 此设置没有默认值。

JDBC 密码。

jdbc_user编辑

  • 这是一个必需设置。
  • 值类型为 字符串
  • 此设置没有默认值。

JDBC 用户。

tag_on_default_use编辑

  • 值类型为 数组
  • 默认值为 ["_jdbcstaticdefaultsused"]

如果未找到记录并使用了默认值,则将值追加到 tags 字段。

tag_on_failure编辑

  • 值类型为 数组
  • 默认值为 ["_jdbcstaticfailure"]

如果发生 SQL 错误,则将值追加到 tags 字段。

staging_directory编辑

  • 值类型为 字符串
  • 默认值是从 Ruby 临时目录 + plugin_name + "import_data" 派生的
  • 例如 "/tmp/logstash/jdbc_static/import_data"

用于暂存批量加载数据的目录,应该有足够的磁盘空间来处理您希望用于丰富事件的数据。由于 Apache Derby 中存在一个未解决的错误,因此此插件的先前版本无法很好地处理加载超过数千行的 datasets。此设置引入了一种加载大型记录集的替代方法。当接收到每一行时,它会被存储到文件中,然后使用系统 *import table* 系统调用导入该文件。

如果发生 SQL 错误,则将值追加到 tags 字段。

loader_schedule编辑

  • 值类型为 字符串
  • 此设置没有默认值。

您可以安排远程加载根据特定时间表定期运行。此计划语法由 rufus-scheduler 提供支持。语法类似于 cron,但有一些特定于 Rufus 的扩展(例如,时区支持)。有关此语法的更多信息,请参阅 解析 cronlines 和时间字符串

示例

*/30 * * * *

将在每天每小时的第 0 分钟和第 30 分钟执行。

* 5 * 1-3 *

将在 1 月到 3 月的每天凌晨 5 点的每一分钟执行。

0 * * * *

将在每天每小时的第 0 分钟执行。

0 6 * * * America/Chicago

每天早上 6:00(UTC/GMT -5)执行。

使用 Logstash 交互式 shell 进行调试

bin/logstash -i irb
irb(main):001:0> require 'rufus-scheduler'
=> true
irb(main):002:0> Rufus::Scheduler.parse('*/10 * * * *')
=> #<Rufus::Scheduler::CronLine:0x230f8709 @timezone=nil, @weekdays=nil, @days=nil, @seconds=[0], @minutes=[0, 10, 20, 30, 40, 50], @hours=nil, @months=nil, @monthdays=nil, @original="*/10 * * * *">
irb(main):003:0> exit

上述调用返回的对象是 Rufus::Scheduler::CronLine 的实例,显示执行的秒、分等。

loaders编辑

  • 值类型为 数组
  • 默认值为 []

该数组应包含一个或多个哈希值。每个哈希值都根据下表进行验证。

设置 输入类型 必需

id

字符串

local_table

字符串

query

字符串

max_rows

number

jdbc_connection_string

字符串

jdbc_driver_class

字符串

jdbc_driver_library

有效的 文件系统路径

jdbc_password

密码

jdbc_user

字符串

加载器字段说明

id
可选的标识符。这用于标识生成错误消息和日志行的加载器。
local_table
加载器将填充的本地查找数据库中的目标表。
query
用于获取远程记录的 SQL 语句。使用 SQL 别名和强制转换来确保记录的列和数据类型与 local_db_objects 中定义的本地数据库中的表结构相匹配。
max_rows
此设置的默认值为 100 万。由于查找数据库在内存中,因此它将占用 JVM 堆空间。如果查询返回数百万行,则应增加提供给 Logstash 的 JVM 内存或限制返回的行数,也许限制为事件数据中最常出现的那些行。
jdbc_connection_string
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_connection_string 设置。
jdbc_driver_class
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_driver_class 设置。
jdbc_driver_library
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_driver_library 设置。
jdbc_password
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_password 设置。
jdbc_user
如果未在加载器中设置,则此设置默认为插件级别的 jdbc_user 设置。

local_db_objects编辑

  • 值类型为 数组
  • 默认值为 []

该数组应包含一个或多个哈希值。每个哈希值表示本地查找数据库的表架构。每个哈希值都根据下表进行验证。

设置 输入类型 必需

name

字符串

columns

数组

index_columns

number

preserve_existing

boolean

Local_db_objects 字段说明

name
要在数据库中创建的表的名称。
columns
列规范数组。每个列规范都是一个恰好包含两个元素的数组,例如 ["ip", "varchar(15)"]。第一个元素是列名字符串。第二个元素是一个字符串,它是 Apache Derby SQL 类型。字符串内容在构建本地查找表时进行检查,而不是在验证设置时进行检查。因此,任何拼写错误的 SQL 类型字符串都会导致错误。
index_columns
字符串数组。每个字符串都必须在 columns 设置中定义。索引名称将在内部生成。不支持唯一索引或排序索引。
preserve_existing
此设置在为 true 时,将检查表是否已存在于本地查找数据库中。如果您在同一个 Logstash 实例中运行多个管道,并且多个管道正在使用此插件,则必须阅读页面顶部的多个管道重要注意事项。

local_lookups编辑

  • 值类型为 数组
  • 默认值为 []

该数组应包含一个或多个哈希值。每个哈希值表示一个查找充实。每个哈希值都根据下表进行验证。

设置 输入类型 必需

id

字符串

query

字符串

parameters

hash

target

字符串

default_hash

hash

tag_on_failure

字符串

tag_on_default_use

字符串

Local_lookups 字段说明

id
可选的标识符。这用于标识生成错误消息和日志行的查找。如果省略此设置,则将使用默认 ID。
query
为实现查找而执行的 SQL SELECT 语句。要使用参数,请使用命名参数语法,例如 "SELECT * FROM MYTABLE WHERE ID = :id"。或者,可以使用 ? 符号作为预处理语句参数,在这种情况下,使用 prepared_parameters 数组填充值。
parameters
键/值哈希值或字典。键(LHS)是在 SQL 语句 SELECT * FROM sensors WHERE reference = :p1 中替换的文本。值(RHS)是事件中的字段名称。插件从事件中读取此键的值,并将该值替换到语句中,例如,parameters => { "p1" => "ref" }。引用是自动的 - 您不需要在语句中添加引号。仅当您需要添加前缀/后缀或将两个事件字段值连接在一起以构建替换值时,才在 RHS 上使用字段插值语法。例如,假设有一个 IOT 消息,它有一个 ID 和一个位置,并且您有一个传感器表,其中有一列 id-loc_id。在这种情况下,您的参数哈希值将如下所示:parameters => { "p1" => "%{[id]}-%{[loc_id]}" }
prepared_parameters
一个数组,其中位置与查询语法中 ? 的位置相关。数组的值遵循与 parameters 相同的语义。如果对 prepared_parameters 进行了赋值,则强制过滤器使用 JDBC 的预处理语句来查询本地数据库。预处理语句提供了两个好处:一是性能方面,因为避免了 DBMS 为每次调用解析和编译 SQL 表达式;另一个好处是安全性方面,使用预处理语句可以避免基于查询字符串连接的 SQL 注入攻击。
target
将接收查找数据的字段的可选名称。如果省略此设置,则使用 id 设置(或默认 ID)。查找的数据(转换为哈希值的結果数组)永远不会添加到事件的根目录中。如果要这样做,则应使用 add_field 设置。这意味着您可以完全控制字段/值如何放置在事件的根目录中,例如,add_field => { user_firstname => "%{[user][0][firstname]}" } - 其中 [user] 是目标字段,[0] 是数组中的第一个结果,[firstname] 是结果哈希值中的键。
default_hash
当查找未返回结果时,将放置在目标字段数组中的可选哈希值。如果需要确保配置中其他部分的后续引用实际引用了某些内容,请使用此设置。
tag_on_failure
覆盖插件级别设置的可选字符串。这在定义多个查找时很有用。
tag_on_default_use
覆盖插件级别设置的可选字符串。这在定义多个查找时很有用。

通用选项编辑

所有过滤器插件都支持以下配置选项

add_field编辑

如果此过滤器成功,则将任意字段添加到此事件。字段名称可以是动态的,并且可以使用 %{field} 包含事件的一部分。

示例

    filter {
      jdbc_static {
        add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
      }
    }
    # You can also add multiple fields at once:
    filter {
      jdbc_static {
        add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
      }
    }

如果事件的字段 "somefield" == "hello",则此过滤器在成功时将添加字段 foo_hello(如果存在),其值为上述值,并将 %{host} 部分替换为事件中的该值。第二个示例还将添加一个硬编码字段。

add_tag编辑

  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则将任意标签添加到事件。标签可以是动态的,并且可以使用 %{field} 语法包含事件的一部分。

示例

    filter {
      jdbc_static {
        add_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also add multiple tags at once:
    filter {
      jdbc_static {
        add_tag => [ "foo_%{somefield}", "taggedy_tag"]
      }
    }

如果事件的字段 "somefield" == "hello",则此过滤器在成功时将添加标签 foo_hello(当然,第二个示例将添加 taggedy_tag 标签)。

enable_metric编辑

禁用或启用此特定插件实例的指标记录。默认情况下,我们会记录所有可以记录的指标,但您可以禁用特定插件的指标收集。

id编辑

  • 值类型为 字符串
  • 此设置没有默认值。

向插件配置添加唯一的 ID。如果未指定 ID,Logstash 将生成一个 ID。强烈建议在配置中设置此 ID。当您有两个或多个相同类型的插件时,例如,如果您有两个 jdbc_static 过滤器,这将特别有用。在这种情况下,添加命名 ID 将有助于在使用监控 API 时监控 Logstash。

    filter {
      jdbc_static {
        id => "ABC"
      }
    }

id 字段中的变量替换仅支持环境变量,不支持使用密钥库中的值。

periodic_flush编辑

定期调用过滤器刷新方法。可选。

remove_field编辑

  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则从此事件中删除任意字段。字段名称可以是动态的,并且可以使用 %{field} 示例包含事件的一部分

    filter {
      jdbc_static {
        remove_field => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple fields at once:
    filter {
      jdbc_static {
        remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将删除名称为 foo_hello 的字段(如果存在)。第二个示例将删除另一个非动态字段。

remove_tag编辑

  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则从事件中删除任意标签。标签可以是动态的,并且可以使用 %{field} 语法包含事件的一部分。

示例

    filter {
      jdbc_static {
        remove_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple tags at once:
    filter {
      jdbc_static {
        remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时将删除标签 foo_hello(如果存在)。第二个示例还将删除一个不需要的悲伤标签。