Jdbc_static 过滤器插件
编辑Jdbc_static 过滤器插件编辑
有关其他版本,请参阅 版本化插件文档。
获取帮助编辑
如果您对该插件有任何疑问,请在 Discuss 论坛上发布主题。如果您发现任何错误或有功能请求,请在 Github 上提交问题。有关 Elastic 支持的插件列表,请参阅 Elastic 支持矩阵。
描述编辑
此过滤器使用从远程数据库预加载的数据来丰富事件。
此过滤器最适合使用静态或不经常更改的参考数据(例如环境、用户和产品)来丰富事件。
此过滤器的工作原理是从远程数据库获取数据,将其缓存在本地内存中的 Apache Derby 数据库中,并使用查找来使用本地数据库中缓存的数据来丰富事件。您可以将过滤器设置为加载远程数据一次(对于静态数据),或者您可以安排定期运行远程加载(对于需要刷新的数据)。
要定义过滤器,您需要指定三个主要部分:local_db_objects、loaders 和 lookups。
- local_db_objects
- 定义用于构建本地数据库结构的列、类型和索引。列名和类型应与外部数据库匹配。根据需要定义尽可能多的这些对象以构建本地数据库结构。
- loaders
- 查询外部数据库以获取将在本地缓存的数据集。根据需要定义尽可能多的加载器以获取远程数据。每个加载器都应填充由
local_db_objects
定义的表。确保加载器 SQL 语句中的列名和数据类型与local_db_objects
下定义的列匹配。每个加载器都有一个独立的远程数据库连接。 - lookups
-
对本地数据库执行查找查询以丰富事件。根据需要定义尽可能多的查找,以便一次性从所有查找表中丰富事件。理想情况下,SQL 语句应仅返回一行。所有行都将转换为哈希对象,并存储在作为数组的目标字段中。
以下示例配置从远程数据库获取数据,将其缓存在本地数据库中,并使用查找来使用本地数据库中缓存的数据来丰富事件。
filter { jdbc_static { loaders => [ { id => "remote-servers" query => "select ip, descr from ref.local_ips order by ip" local_table => "servers" }, { id => "remote-users" query => "select firstname, lastname, userid from ref.local_users order by userid" local_table => "users" } ] local_db_objects => [ { name => "servers" index_columns => ["ip"] columns => [ ["ip", "varchar(15)"], ["descr", "varchar(255)"] ] }, { name => "users" index_columns => ["userid"] columns => [ ["firstname", "varchar(255)"], ["lastname", "varchar(255)"], ["userid", "int"] ] } ] local_lookups => [ { id => "local-servers" query => "SELECT descr as description FROM servers WHERE ip = :ip" parameters => {ip => "[from_ip]"} target => "server" }, { id => "local-users" query => "SELECT firstname, lastname FROM users WHERE userid = ? AND country = ?" prepared_parameters => ["[loggedin_userid]", "[user_nation]"] target => "user" default_hash => { firstname => nil lastname => nil } } ] # using add_field here to add & rename values to the event root add_field => { server_name => "%{[server][0][description]}" } add_field => { user_firstname => "%{[user][0][firstname]}" } add_field => { user_lastname => "%{[user][0][lastname]}" } remove_field => ["server", "user"] staging_directory => "/tmp/logstash/jdbc_static/import_data" loader_schedule => "* */2 * * *" # run loaders every 2 hours jdbc_user => "logstash" jdbc_password => "example" jdbc_driver_class => "org.postgresql.Driver" jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar" jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2" } } output { if "_jdbcstaticdefaultsused" in [tags] { # Print all the not found users stdout { } } }
查询外部数据库以获取将在本地缓存的数据集。
定义用于构建本地数据库结构的列、类型和索引。列名和类型应与外部数据库匹配。表定义的顺序很重要,应与加载器查询的顺序匹配。请参阅 加载器列和 local_db_object 顺序依赖性。
对本地数据库执行查找查询以丰富事件。
本地查找查询也可以使用预处理语句,其中参数遵循位置顺序。
指定将存储查找数据的事件字段。如果查找返回多列,则数据将作为 JSON 对象存储在字段中。
如果在数据库中找不到用户,则使用
local_lookups
default hash
设置中的数据创建事件,并使用tag_on_default_use
中设置的列表标记该事件。从 JSON 对象中获取数据并将其存储在顶级事件字段中,以便于在 Kibana 中进行分析。
这是一个完整的示例
input { generator { lines => [ '{"from_ip": "10.2.3.20", "app": "foobar", "amount": 32.95}', '{"from_ip": "10.2.3.30", "app": "barfoo", "amount": 82.95}', '{"from_ip": "10.2.3.40", "app": "bazfoo", "amount": 22.95}' ] count => 200 } } filter { json { source => "message" } jdbc_static { loaders => [ { id => "servers" query => "select ip, descr from ref.local_ips order by ip" local_table => "servers" } ] local_db_objects => [ { name => "servers" index_columns => ["ip"] columns => [ ["ip", "varchar(15)"], ["descr", "varchar(255)"] ] } ] local_lookups => [ { query => "select descr as description from servers WHERE ip = :ip" parameters => {ip => "[from_ip]"} target => "server" } ] staging_directory => "/tmp/logstash/jdbc_static/import_data" loader_schedule => "*/30 * * * *" jdbc_user => "logstash" jdbc_password => "logstash??" jdbc_driver_class => "org.postgresql.Driver" jdbc_driver_library => "/Users/guy/tmp/logstash-6.0.0/vendor/postgresql-42.1.4.jar" jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/ls_test_2" } } output { stdout { codec => rubydebug {metadata => true} } }
假设加载器从 Postgres 数据库中获取以下数据
select * from ref.local_ips order by ip; ip | descr -----------+----------------------- 10.2.3.10 | Authentication Server 10.2.3.20 | Payments Server 10.2.3.30 | Events Server 10.2.3.40 | Payroll Server 10.2.3.50 | Uploads Server
事件将根据 IP 值 enriched with a description of the server
{ "app" => "bazfoo", "sequence" => 0, "server" => [ [0] { "description" => "Payroll Server" } ], "amount" => 22.95, "@timestamp" => 2017-11-30T18:08:15.694Z, "@version" => "1", "host" => "Elastics-MacBook-Pro.local", "message" => "{\"from_ip\": \"10.2.3.40\", \"app\": \"bazfoo\", \"amount\": 22.95}", "from_ip" => "10.2.3.40" }
在多个管道中使用此插件编辑
Logstash 使用单个内存中的 Apache Derby 实例作为整个 JVM 的查找数据库引擎。因为每个插件实例在共享的 Derby 引擎中使用唯一的数据库,所以插件尝试创建和填充相同的表时应该不会发生冲突。无论插件是在单个管道中定义还是在多个管道中定义,都是如此。但是,在设置过滤器之后,您应该观察查找结果并查看日志以验证操作是否正确。
加载器列和 local_db_object 顺序依赖性编辑
出于加载器性能原因,加载机制使用 CSV 样式文件,该文件具有内置的 Derby 文件导入过程,用于将远程数据添加到本地数据库。检索到的列按原样写入 CSV 文件,并且文件导入过程期望与 local_db_object 设置中指定的列顺序一一对应。请确保此顺序已到位。
与 Elastic Common Schema (ECS) 的兼容性编辑
此插件与 Elastic Common Schema (ECS) 兼容。无论 ECS 兼容性如何,它的行为都是相同的,只是在启用 ECS 并且未设置 target
时会发出警告。
设置 target
选项以避免潜在的架构冲突。
Jdbc_static 过滤器配置选项编辑
此插件支持以下配置选项以及稍后描述的 通用选项。
设置 | 输入类型 | 必需 |
---|---|---|
是 |
||
是 |
||
有效的 文件系统路径 |
否 |
|
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
另请参阅 通用选项,以获取所有过滤器插件支持的选项列表。
jdbc_driver_class
编辑
- 这是一个必需设置。
- 值类型为 字符串
- 此设置没有默认值。
要加载的 JDBC 驱动程序类,例如“org.apache.derby.jdbc.ClientDriver”。
根据 问题 43,如果您使用的是 Oracle JDBC 驱动程序 (ojdbc6.jar),则正确的 jdbc_driver_class
是 "Java::oracle.jdbc.driver.OracleDriver"
。
jdbc_driver_library
编辑
- 值类型为 字符串
- 此设置没有默认值。
JDBC 驱动程序库路径,指向第三方驱动程序库。如果需要多个库,请在一个字符串中使用逗号分隔的路径。
如果未提供驱动程序类,则插件将在 Logstash Java 类路径中查找它。
staging_directory
编辑
- 值类型为 字符串
- 默认值是从 Ruby 临时目录 + plugin_name + "import_data" 派生的
- 例如
"/tmp/logstash/jdbc_static/import_data"
用于暂存批量加载数据的目录,应该有足够的磁盘空间来处理您希望用于丰富事件的数据。由于 Apache Derby 中存在一个未解决的错误,因此此插件的先前版本无法很好地处理加载超过数千行的 datasets。此设置引入了一种加载大型记录集的替代方法。当接收到每一行时,它会被存储到文件中,然后使用系统 *import table* 系统调用导入该文件。
如果发生 SQL 错误,则将值追加到 tags
字段。
loader_schedule
编辑
- 值类型为 字符串
- 此设置没有默认值。
您可以安排远程加载根据特定时间表定期运行。此计划语法由 rufus-scheduler 提供支持。语法类似于 cron,但有一些特定于 Rufus 的扩展(例如,时区支持)。有关此语法的更多信息,请参阅 解析 cronlines 和时间字符串。
示例
|
将在每天每小时的第 0 分钟和第 30 分钟执行。 |
|
将在 1 月到 3 月的每天凌晨 5 点的每一分钟执行。 |
|
将在每天每小时的第 0 分钟执行。 |
|
每天早上 6:00(UTC/GMT -5)执行。 |
使用 Logstash 交互式 shell 进行调试
bin/logstash -i irb irb(main):001:0> require 'rufus-scheduler' => true irb(main):002:0> Rufus::Scheduler.parse('*/10 * * * *') => #<Rufus::Scheduler::CronLine:0x230f8709 @timezone=nil, @weekdays=nil, @days=nil, @seconds=[0], @minutes=[0, 10, 20, 30, 40, 50], @hours=nil, @months=nil, @monthdays=nil, @original="*/10 * * * *"> irb(main):003:0> exit
上述调用返回的对象是 Rufus::Scheduler::CronLine
的实例,显示执行的秒、分等。
loaders
编辑
- 值类型为 数组
- 默认值为
[]
该数组应包含一个或多个哈希值。每个哈希值都根据下表进行验证。
设置 | 输入类型 | 必需 |
---|---|---|
id |
字符串 |
否 |
local_table |
字符串 |
是 |
query |
字符串 |
是 |
max_rows |
number |
否 |
jdbc_connection_string |
字符串 |
否 |
jdbc_driver_class |
字符串 |
否 |
jdbc_driver_library |
有效的 文件系统路径 |
否 |
jdbc_password |
密码 |
否 |
jdbc_user |
字符串 |
否 |
加载器字段说明
- id
- 可选的标识符。这用于标识生成错误消息和日志行的加载器。
- local_table
- 加载器将填充的本地查找数据库中的目标表。
- query
- 用于获取远程记录的 SQL 语句。使用 SQL 别名和强制转换来确保记录的列和数据类型与
local_db_objects
中定义的本地数据库中的表结构相匹配。 - max_rows
- 此设置的默认值为 100 万。由于查找数据库在内存中,因此它将占用 JVM 堆空间。如果查询返回数百万行,则应增加提供给 Logstash 的 JVM 内存或限制返回的行数,也许限制为事件数据中最常出现的那些行。
- jdbc_connection_string
- 如果未在加载器中设置,则此设置默认为插件级别的
jdbc_connection_string
设置。 - jdbc_driver_class
- 如果未在加载器中设置,则此设置默认为插件级别的
jdbc_driver_class
设置。 - jdbc_driver_library
- 如果未在加载器中设置,则此设置默认为插件级别的
jdbc_driver_library
设置。 - jdbc_password
- 如果未在加载器中设置,则此设置默认为插件级别的
jdbc_password
设置。 - jdbc_user
- 如果未在加载器中设置,则此设置默认为插件级别的
jdbc_user
设置。
local_db_objects
编辑
- 值类型为 数组
- 默认值为
[]
该数组应包含一个或多个哈希值。每个哈希值表示本地查找数据库的表架构。每个哈希值都根据下表进行验证。
设置 | 输入类型 | 必需 |
---|---|---|
name |
字符串 |
是 |
columns |
数组 |
是 |
index_columns |
number |
否 |
preserve_existing |
boolean |
否 |
Local_db_objects 字段说明
- name
- 要在数据库中创建的表的名称。
- columns
- 列规范数组。每个列规范都是一个恰好包含两个元素的数组,例如
["ip", "varchar(15)"]
。第一个元素是列名字符串。第二个元素是一个字符串,它是 Apache Derby SQL 类型。字符串内容在构建本地查找表时进行检查,而不是在验证设置时进行检查。因此,任何拼写错误的 SQL 类型字符串都会导致错误。 - index_columns
- 字符串数组。每个字符串都必须在
columns
设置中定义。索引名称将在内部生成。不支持唯一索引或排序索引。 - preserve_existing
- 此设置在为
true
时,将检查表是否已存在于本地查找数据库中。如果您在同一个 Logstash 实例中运行多个管道,并且多个管道正在使用此插件,则必须阅读页面顶部的多个管道重要注意事项。
local_lookups
编辑
- 值类型为 数组
- 默认值为
[]
该数组应包含一个或多个哈希值。每个哈希值表示一个查找充实。每个哈希值都根据下表进行验证。
设置 | 输入类型 | 必需 |
---|---|---|
id |
字符串 |
否 |
query |
字符串 |
是 |
parameters |
hash |
是 |
target |
字符串 |
否 |
default_hash |
hash |
否 |
tag_on_failure |
字符串 |
否 |
tag_on_default_use |
字符串 |
否 |
Local_lookups 字段说明
- id
- 可选的标识符。这用于标识生成错误消息和日志行的查找。如果省略此设置,则将使用默认 ID。
- query
- 为实现查找而执行的 SQL SELECT 语句。要使用参数,请使用命名参数语法,例如
"SELECT * FROM MYTABLE WHERE ID = :id"
。或者,可以使用?
符号作为预处理语句参数,在这种情况下,使用prepared_parameters
数组填充值。 - parameters
- 键/值哈希值或字典。键(LHS)是在 SQL 语句
SELECT * FROM sensors WHERE reference = :p1
中替换的文本。值(RHS)是事件中的字段名称。插件从事件中读取此键的值,并将该值替换到语句中,例如,parameters => { "p1" => "ref" }
。引用是自动的 - 您不需要在语句中添加引号。仅当您需要添加前缀/后缀或将两个事件字段值连接在一起以构建替换值时,才在 RHS 上使用字段插值语法。例如,假设有一个 IOT 消息,它有一个 ID 和一个位置,并且您有一个传感器表,其中有一列id-loc_id
。在这种情况下,您的参数哈希值将如下所示:parameters => { "p1" => "%{[id]}-%{[loc_id]}" }
。 - prepared_parameters
- 一个数组,其中位置与查询语法中
?
的位置相关。数组的值遵循与parameters
相同的语义。如果对prepared_parameters
进行了赋值,则强制过滤器使用 JDBC 的预处理语句来查询本地数据库。预处理语句提供了两个好处:一是性能方面,因为避免了 DBMS 为每次调用解析和编译 SQL 表达式;另一个好处是安全性方面,使用预处理语句可以避免基于查询字符串连接的 SQL 注入攻击。 - target
- 将接收查找数据的字段的可选名称。如果省略此设置,则使用
id
设置(或默认 ID)。查找的数据(转换为哈希值的結果数组)永远不会添加到事件的根目录中。如果要这样做,则应使用add_field
设置。这意味着您可以完全控制字段/值如何放置在事件的根目录中,例如,add_field => { user_firstname => "%{[user][0][firstname]}" }
- 其中[user]
是目标字段,[0]
是数组中的第一个结果,[firstname]
是结果哈希值中的键。 - default_hash
- 当查找未返回结果时,将放置在目标字段数组中的可选哈希值。如果需要确保配置中其他部分的后续引用实际引用了某些内容,请使用此设置。
- tag_on_failure
- 覆盖插件级别设置的可选字符串。这在定义多个查找时很有用。
- tag_on_default_use
- 覆盖插件级别设置的可选字符串。这在定义多个查找时很有用。
通用选项编辑
所有过滤器插件都支持以下配置选项
设置 | 输入类型 | 必需 |
---|---|---|
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
||
否 |
add_field
编辑
- 值类型为 哈希值
- 默认值为
{}
如果此过滤器成功,则将任意字段添加到此事件。字段名称可以是动态的,并且可以使用 %{field}
包含事件的一部分。
示例
filter { jdbc_static { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" } } }
# You can also add multiple fields at once: filter { jdbc_static { add_field => { "foo_%{somefield}" => "Hello world, from %{host}" "new_field" => "new_static_value" } } }
如果事件的字段 "somefield" == "hello"
,则此过滤器在成功时将添加字段 foo_hello
(如果存在),其值为上述值,并将 %{host}
部分替换为事件中的该值。第二个示例还将添加一个硬编码字段。
add_tag
编辑
- 值类型为 数组
- 默认值为
[]
如果此过滤器成功,则将任意标签添加到事件。标签可以是动态的,并且可以使用 %{field}
语法包含事件的一部分。
示例
filter { jdbc_static { add_tag => [ "foo_%{somefield}" ] } }
# You can also add multiple tags at once: filter { jdbc_static { add_tag => [ "foo_%{somefield}", "taggedy_tag"] } }
如果事件的字段 "somefield" == "hello"
,则此过滤器在成功时将添加标签 foo_hello
(当然,第二个示例将添加 taggedy_tag
标签)。
id
编辑
- 值类型为 字符串
- 此设置没有默认值。
向插件配置添加唯一的 ID
。如果未指定 ID,Logstash 将生成一个 ID。强烈建议在配置中设置此 ID。当您有两个或多个相同类型的插件时,例如,如果您有两个 jdbc_static 过滤器,这将特别有用。在这种情况下,添加命名 ID 将有助于在使用监控 API 时监控 Logstash。
filter { jdbc_static { id => "ABC" } }
id
字段中的变量替换仅支持环境变量,不支持使用密钥库中的值。
remove_field
编辑
- 值类型为 数组
- 默认值为
[]
如果此过滤器成功,则从此事件中删除任意字段。字段名称可以是动态的,并且可以使用 %{field} 示例包含事件的一部分
filter { jdbc_static { remove_field => [ "foo_%{somefield}" ] } }
# You can also remove multiple fields at once: filter { jdbc_static { remove_field => [ "foo_%{somefield}", "my_extraneous_field" ] } }
如果事件具有字段 "somefield" == "hello"
,则此过滤器在成功时将删除名称为 foo_hello
的字段(如果存在)。第二个示例将删除另一个非动态字段。
remove_tag
编辑
- 值类型为 数组
- 默认值为
[]
如果此过滤器成功,则从事件中删除任意标签。标签可以是动态的,并且可以使用 %{field}
语法包含事件的一部分。
示例
filter { jdbc_static { remove_tag => [ "foo_%{somefield}" ] } }
# You can also remove multiple tags at once: filter { jdbc_static { remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"] } }
如果事件具有字段 "somefield" == "hello"
,则此过滤器在成功时将删除标签 foo_hello
(如果存在)。第二个示例还将删除一个不需要的悲伤标签。