Jdbc_streaming 过滤器插件

编辑

Jdbc_streaming 过滤器插件

编辑

有关其他版本,请参阅版本化插件文档

获取帮助

编辑

有关插件的问题,请在 Discuss 论坛中打开一个主题。对于错误或功能请求,请在 Github 中打开一个 issue。有关 Elastic 支持的插件列表,请参阅 Elastic 支持矩阵

描述

编辑

此过滤器执行 SQL 查询并将结果集存储在指定为 target 的字段中。它将在具有过期的 LRU 缓存中本地缓存结果。

例如,您可以根据事件中的 ID 加载行。

filter {
  jdbc_streaming {
    jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mydatabase"
    jdbc_user => "me"
    jdbc_password => "secret"
    statement => "select * from WORLD.COUNTRY WHERE Code = :code"
    parameters => { "code" => "country_code"}
    target => "country_details"
  }
}

预处理语句

编辑

使用服务器端预处理语句可以加快执行时间,因为服务器会优化查询计划和执行。

并非所有可通过 JDBC 访问的技术都支持预处理语句。

随着预处理语句支持的引入,代码执行路径和一些新设置也随之改变。大多数现有设置仍然有用,但还有一些关于预处理语句的新设置需要了解。

使用布尔设置 use_prepared_statements 启用此执行模式。

使用 prepared_statement_name 设置为预处理语句指定一个名称,它会在本地和远程标识预处理语句,并且在您的配置和数据库中应该是唯一的。

使用 prepared_statement_bind_values 数组设置来指定绑定值。通常,这些值是从您的事件中间接提取的,即数组中的字符串是指您事件中的字段名称。您还可以使用数字或字符串等常量值,但要确保任何字符串常量(例如“en”或“de”的区域设置常量)也不是事件字段名称。最好对字段使用带括号的字段引用语法,对常量使用普通字符串,例如 prepared_statement_bind_values => ["[src_ip]", "tokyo"],

有 3 种可能的参数方案。插值、字段引用和常量。当您为字段值添加前缀、后缀或连接字段值以创建数据库中存在的值时,使用插值,例如“%{username}@%{domain}”→“[email protected]”,“%{distance}km”→“42km”。对于精确的字段值,使用字段引用,例如“[srcip]”→“192.168.1.2”。当数据库列包含对多个相似记录进行切片或分类的值时,使用常量,例如语言翻译。

布尔设置 prepared_statement_warn_on_constant_usage,默认为 true,控制当检测到常量可能缺少带括号的字段引用语法时,您是否会看到记录的 WARN 消息。如果您已正确设置了字段引用和常量,则应将 prepared_statement_warn_on_constant_usage 设置为 false。此设置和代码检查应在未来的 Logstash 主要版本中弃用。

statement(或 statement_path)设置仍然保存 SQL 语句,但是要使用绑定变量,您必须使用 ? 字符作为占位符,其顺序与 prepared_statement_bind_values 数组中的顺序完全相同。某些技术可能需要设置连接字符串属性,请参阅下面的 MySQL 示例。

示例

filter {
  jdbc_streaming {
    jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mydatabase?cachePrepStmts=true&prepStmtCacheSize=250&prepStmtCacheSqlLimit=2048&useServerPrepStmts=true"
    jdbc_user => "me"
    jdbc_password => "secret"
    statement => "select * from WORLD.COUNTRY WHERE Code = ?"
    use_prepared_statements => true
    prepared_statement_name => "lookup_country_info"
    prepared_statement_bind_values => ["[country_code]"]
    target => "country_details"
  }
}

Jdbc_streaming 过滤器配置选项

编辑

此插件支持以下配置选项以及稍后描述的通用选项

另请参阅通用选项,其中列出了所有过滤器插件支持的选项。

 

cache_expiration

编辑
  • 值类型为数字
  • 默认值为5.0

任何条目应保留在缓存中的最小秒数。默认为 5 秒。

数值。您可以使用小数,例如:cache_expiration => 0.25。如果存在瞬态 jdbc 错误,缓存将为给定的参数集存储空结果并绕过 jbdc 查找。这会将 default_hash 合并到事件中,直到缓存条目过期。然后,将再次尝试使用相同的参数进行 jdbc 查找。相反,当缓存包含有效结果时,在 cache_expiration 期间,不会注意到任何会导致 jdbc 错误的外部问题。

cache_size

编辑
  • 值类型为数字
  • 默认值为500

将存储的最大缓存条目数。默认为 500 个条目。将删除最近最少使用的条目。

default_hash

编辑
  • 值类型为哈希
  • 默认值为{}

定义在查找未能返回匹配行时使用的默认对象。确保此对象的键名与语句中的列匹配。

jdbc_connection_string

编辑
  • 这是一个必需的设置。
  • 值类型为字符串
  • 此设置没有默认值。

JDBC 连接字符串

jdbc_driver_class

编辑
  • 这是一个必需的设置。
  • 值类型为字符串
  • 此设置没有默认值。

要加载的 JDBC 驱动程序类,例如“oracle.jdbc.OracleDriver”或“org.apache.derby.jdbc.ClientDriver”

jdbc_driver_library

编辑
  • 值类型为路径
  • 此设置没有默认值。

第三方驱动程序库的 JDBC 驱动程序库路径。

jdbc_password

编辑
  • 值类型为密码
  • 此设置没有默认值。

JDBC 密码

jdbc_user

编辑
  • 值类型为字符串
  • 此设置没有默认值。

JDBC 用户

jdbc_validate_connection

编辑

连接池配置。在使用前验证连接。

jdbc_validation_timeout

编辑
  • 值类型为数字
  • 默认值为3600

连接池配置。验证连接的频率(以秒为单位)。

parameters

编辑
  • 值类型为哈希
  • 默认值为{}

查询参数的哈希,例如 { "id" => "id_field" }

prepared_statement_bind_values

编辑
  • 值类型为数组
  • 默认值为[]

预处理语句的绑定值数组。使用字段引用和常量。有关更多信息,请参阅预处理语句部分。

prepared_statement_name

编辑

赋予预处理语句的名称。它在您的配置和数据库中必须是唯一的。如果 use_prepared_statements 为 true,则需要提供此名称。

prepared_statement_warn_on_constant_usage

编辑

一个标志,用于控制当在 prepared_statement_bind_values 中检测到可能是作为字段引用的字符串常量时,是否记录警告。

sequel_opts

编辑
  • 值类型为哈希
  • 默认值为{}

通用/特定于供应商的 Sequel 配置选项

可选连接池配置 max_connections 的一个示例 - 连接池的最大连接数

可以在此文档页面中找到特定于供应商的选项示例:https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc

statement

编辑
  • 这是一个必需的设置。
  • 值类型为字符串
  • 此设置没有默认值。

要执行的语句。要使用参数,请使用命名参数语法,例如“SELECT * FROM MYTABLE WHERE ID = :id”。

tag_on_default_use

编辑
  • 值类型为数组
  • 默认值为["_jdbcstreamingdefaultsused"]

如果没有找到记录并且使用了默认值,则将值追加到 tags 字段。

tag_on_failure

编辑
  • 值类型为数组
  • 默认值为["_jdbcstreamingfailure"]

如果发生 sql 错误,则将值追加到 tags 字段。

target

编辑
  • 这是一个必需的设置。
  • 值类型为字符串
  • 此设置没有默认值。

定义用于存储提取结果的目标字段。如果存在,则覆盖该字段。

use_cache

编辑

启用或禁用缓存,布尔值 true 或 false。默认为 true。

use_prepared_statements

编辑

设置为 true 时,启用预处理语句的使用

通用选项

编辑

所有过滤器插件都支持这些配置选项

add_field

编辑
  • 值类型为 哈希
  • 默认值为{}

如果此过滤器成功,请将任何任意字段添加到此事件。字段名称可以是动态的,并且可以使用 %{field} 包含事件的一部分。

示例

    filter {
      jdbc_streaming {
        add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
      }
    }
    # You can also add multiple fields at once:
    filter {
      jdbc_streaming {
        add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时会添加字段 foo_hello (如果存在),其值为上述值,并且 %{host} 部分会被事件中的相应值替换。第二个示例还会添加一个硬编码字段。

add_tag

编辑
  • 值类型为 数组
  • 默认值为[]

如果此过滤器成功,则会向事件添加任意标签。标签可以是动态的,并且可以使用 %{field} 语法包含事件的部分内容。

示例

    filter {
      jdbc_streaming {
        add_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also add multiple tags at once:
    filter {
      jdbc_streaming {
        add_tag => [ "foo_%{somefield}", "taggedy_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时会添加标签 foo_hello (第二个示例当然会添加一个 taggedy_tag 标签)。

enable_metric

编辑

禁用或启用此特定插件实例的指标记录。默认情况下,我们会记录所有可以记录的指标,但您可以禁用特定插件的指标收集。

  • 值类型为 字符串
  • 此设置没有默认值。

向插件配置添加唯一的 ID。如果未指定 ID,Logstash 将生成一个。强烈建议在配置中设置此 ID。当您有两个或多个相同类型的插件时,例如,如果您有 2 个 jdbc_streaming 过滤器,则此功能特别有用。在这种情况下添加命名 ID 将有助于在使用监控 API 时监控 Logstash。

    filter {
      jdbc_streaming {
        id => "ABC"
      }
    }

id 字段中的变量替换仅支持环境变量,不支持使用来自密钥存储的值。

periodic_flush

编辑

定期调用过滤器刷新方法。可选。

remove_field

编辑
  • 值类型为 数组
  • 默认值为[]

如果此过滤器成功,则会从此事件中删除任意字段。字段名称可以是动态的,并且可以使用 %{field} 包含事件的部分内容。示例

    filter {
      jdbc_streaming {
        remove_field => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple fields at once:
    filter {
      jdbc_streaming {
        remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时会删除名为 foo_hello 的字段(如果存在)。第二个示例会删除一个额外的非动态字段。

remove_tag

编辑
  • 值类型为 数组
  • 默认值为[]

如果此过滤器成功,则会从事件中删除任意标签。标签可以是动态的,并且可以使用 %{field} 语法包含事件的部分内容。

示例

    filter {
      jdbc_streaming {
        remove_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple tags at once:
    filter {
      jdbc_streaming {
        remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时会删除标签 foo_hello(如果存在)。第二个示例还会删除一个令人难过的、不需要的标签。