Jdbc_streaming 过滤器插件

编辑

Jdbc_streaming 过滤器插件

编辑

对于其他版本,请参阅 版本化插件文档

获取帮助

编辑

如有任何关于插件的问题,请在 Discuss 论坛中发起主题讨论。对于错误或功能请求,请在 Github 中提交问题。有关 Elastic 支持的插件列表,请查阅 Elastic 支持矩阵

描述

编辑

此过滤器执行 SQL 查询并将结果集存储在指定为 target 的字段中。它将在本地 LRU 缓存中缓存结果,并设置过期时间。

例如,您可以根据事件中的 ID 加载一行。

filter {
  jdbc_streaming {
    jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mydatabase"
    jdbc_user => "me"
    jdbc_password => "secret"
    statement => "select * from WORLD.COUNTRY WHERE Code = :code"
    parameters => { "code" => "country_code"}
    target => "country_details"
  }
}

预处理语句

编辑

使用服务器端预处理语句可以加快执行时间,因为服务器会优化查询计划和执行。

并非所有可通过 JDBC 访问的技术都支持预处理语句。

随着预处理语句支持的引入,带来了不同的代码执行路径和一些新的设置。大多数现有设置仍然有用,但预处理语句有一些新的设置需要了解。

使用布尔设置 use_prepared_statements 启用此执行模式。

使用 prepared_statement_name 设置指定预处理语句的名称,这在本地和远程标识预处理语句,并且在您的配置和数据库中应该唯一。

使用 prepared_statement_bind_values 数组设置指定绑定值。通常,这些值是从您的事件中间接提取的,即数组中的字符串引用事件中的字段名称。您也可以使用常数值(如数字或字符串),但请确保任何字符串常量(例如“en”或“de”的语言环境常量)不是事件字段名称。最好对字段使用带括号的字段引用语法,对常量使用普通字符串,例如 prepared_statement_bind_values => ["[src_ip]", "tokyo"],

有三种可能的参数方案:插值、字段引用和常量。当您在字段值前面添加前缀、后缀或连接字段值以创建数据库中存在的字段时,使用插值,例如 %{username}@%{domain} → [email protected],%{distance}km → "42km"。对精确的字段值使用字段引用,例如 [srcip] → "192.168.1.2"。当数据库列包含对许多类似记录进行切片或分类的值时,使用常量,例如语言翻译。

布尔设置 prepared_statement_warn_on_constant_usage(默认为 true)控制是否会看到记录的警告消息,该消息会在检测到常量可能缺少带括号的字段引用语法时发出警告。如果您已正确设置字段引用和常量,则应将 prepared_statement_warn_on_constant_usage 设置为 false。此设置和代码检查将在未来的主要 Logstash 版本中弃用。

statement(或 statement_path)设置仍然包含 SQL 语句,但要使用绑定变量,必须使用 ? 字符作为占位符,其顺序与 prepared_statement_bind_values 数组中的顺序完全相同。某些技术可能需要设置连接字符串属性,请参阅下面的 MySQL 示例。

示例

filter {
  jdbc_streaming {
    jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mydatabase?cachePrepStmts=true&prepStmtCacheSize=250&prepStmtCacheSqlLimit=2048&useServerPrepStmts=true"
    jdbc_user => "me"
    jdbc_password => "secret"
    statement => "select * from WORLD.COUNTRY WHERE Code = ?"
    use_prepared_statements => true
    prepared_statement_name => "lookup_country_info"
    prepared_statement_bind_values => ["[country_code]"]
    target => "country_details"
  }
}

Jdbc_streaming 过滤器配置选项

编辑

此插件支持以下配置选项以及稍后描述的 常用选项

另请参阅 常用选项,以获取所有过滤器插件支持的选项列表。

 

cache_expiration

编辑
  • 值类型为 数字
  • 默认值为 5.0

任何条目在缓存中至少应保留的秒数。默认为 5 秒。

数值。您可以使用小数,例如:cache_expiration => 0.25。如果存在瞬态 jdbc 错误,缓存将为给定的参数集存储空结果并绕过 jbdc 查找。这会将 default_hash 合并到事件中,直到缓存条目过期。然后将再次尝试对相同参数进行 jdbc 查找。相反,当缓存包含有效结果时,任何会导致 jdbc 错误的外部问题在 cache_expiration 期间都不会被注意到。

cache_size

编辑
  • 值类型为 数字
  • 默认值为 500

将存储的缓存条目的最大数量。默认为 500 个条目。将逐出最近最少使用的条目。

default_hash

编辑
  • 值类型为 哈希
  • 默认值为 {}

定义在查找失败返回匹配行时使用的默认对象。确保此对象的键名与语句中的列匹配。

jdbc_connection_string

编辑
  • 这是一个必填设置。
  • 值类型为 字符串
  • 此设置没有默认值。

JDBC 连接字符串

jdbc_driver_class

编辑
  • 这是一个必填设置。
  • 值类型为 字符串
  • 此设置没有默认值。

要加载的 JDBC 驱动程序类,例如“oracle.jdbc.OracleDriver”或“org.apache.derby.jdbc.ClientDriver”

jdbc_driver_library

编辑
  • 值类型为 路径
  • 此设置没有默认值。

指向第三方驱动程序库的 JDBC 驱动程序库路径。

jdbc_password

编辑
  • 值类型为 密码
  • 此设置没有默认值。

JDBC 密码

jdbc_user

编辑
  • 值类型为 字符串
  • 此设置没有默认值。

JDBC 用户

jdbc_validate_connection

编辑

连接池配置。在使用前验证连接。

jdbc_validation_timeout

编辑
  • 值类型为 数字
  • 默认值为 3600

连接池配置。验证连接的频率(以秒为单位)。

parameters

编辑
  • 值类型为 哈希
  • 默认值为 {}

查询参数的哈希,例如 { "id" => "id_field" }

prepared_statement_bind_values

编辑
  • 值类型为 数组
  • 默认值为 []

预处理语句的绑定值数组。使用字段引用和常量。有关更多信息,请参阅有关 预处理语句 的部分。

prepared_statement_name

编辑

赋予预处理语句的名称。它必须在您的配置和数据库中唯一。如果 use_prepared_statements 为 true,则需要提供此名称。

prepared_statement_warn_on_constant_usage

编辑

一个标志,用于控制是否在 prepared_statement_bind_values 中检测到可能意在作为字段引用的字符串常量时记录警告。

sequel_opts

编辑
  • 值类型为 哈希
  • 默认值为 {}

通用/特定于供应商的 Sequel 配置选项

可选连接池配置的示例 max_connections - 连接池的最大连接数

特定于供应商的选项示例可以在此文档页面中找到:https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc

statement

编辑
  • 这是一个必填设置。
  • 值类型为 字符串
  • 此设置没有默认值。

要执行的语句。要使用参数,请使用命名参数语法,例如“SELECT * FROM MYTABLE WHERE ID = :id”。

tag_on_default_use

编辑
  • 值类型为 数组
  • 默认值为 ["_jdbcstreamingdefaultsused"]

如果未找到记录并使用了默认值,则将值追加到 tags 字段。

tag_on_failure

编辑
  • 值类型为 数组
  • 默认值为 ["_jdbcstreamingfailure"]

如果发生 sql 错误,则将值追加到 tags 字段。

target

编辑
  • 这是一个必填设置。
  • 值类型为 字符串
  • 此设置没有默认值。

定义存储提取结果的目标字段。如果字段已存在,则会被覆盖。

use_cache

编辑

启用或禁用缓存,布尔值 true 或 false。默认为 true。

use_prepared_statements

编辑

设置为 true 时,启用预处理语句的使用。

常用选项

编辑

所有过滤器插件都支持这些配置选项。

add_field

编辑

如果此过滤器成功,则向此事件添加任意字段。字段名称可以是动态的,并包含使用 %{field} 的事件部分。

示例

    filter {
      jdbc_streaming {
        add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
      }
    }
    # You can also add multiple fields at once:
    filter {
      jdbc_streaming {
        add_field => {
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时会添加字段 foo_hello(如果存在),其值为上述值,并将 %{host} 部分替换为事件中的该值。第二个示例还会添加一个硬编码字段。

add_tag

编辑
  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则向事件添加任意标签。标签可以是动态的,并包含使用 %{field} 语法的事件部分。

示例

    filter {
      jdbc_streaming {
        add_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also add multiple tags at once:
    filter {
      jdbc_streaming {
        add_tag => [ "foo_%{somefield}", "taggedy_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时会添加标签 foo_hello(第二个示例当然会添加 taggedy_tag 标签)。

enable_metric

编辑

禁用或启用此特定插件实例的指标日志记录。默认情况下,我们会记录所有可以记录的指标,但您可以禁用特定插件的指标收集。

  • 值类型为 字符串
  • 此设置没有默认值。

向插件配置添加唯一的 ID。如果未指定 ID,Logstash 将生成一个。强烈建议在您的配置中设置此 ID。当您有两个或多个相同类型的插件时,这尤其有用,例如,如果您有两个 jdbc_streaming 过滤器。在这种情况下,添加命名 ID 将有助于在使用监控 API 时监控 Logstash。

    filter {
      jdbc_streaming {
        id => "ABC"
      }
    }

id 字段中的变量替换仅支持环境变量,不支持使用密钥存储中的值。

periodic_flush

编辑

定期调用过滤器的刷新方法。可选。

remove_field

编辑
  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则从此事件中删除任意字段。字段名称可以是动态的,并包含使用 %{field} 的事件部分 示例

    filter {
      jdbc_streaming {
        remove_field => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple fields at once:
    filter {
      jdbc_streaming {
        remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时会删除名称为 foo_hello 的字段(如果存在)。第二个示例将删除一个额外的非动态字段。

remove_tag

编辑
  • 值类型为 数组
  • 默认值为 []

如果此过滤器成功,则从事件中删除任意标签。标签可以是动态的,并包含使用 %{field} 语法的事件部分。

示例

    filter {
      jdbc_streaming {
        remove_tag => [ "foo_%{somefield}" ]
      }
    }
    # You can also remove multiple tags at once:
    filter {
      jdbc_streaming {
        remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
      }
    }

如果事件具有字段 "somefield" == "hello",则此过滤器在成功时会删除标签 foo_hello(如果存在)。第二个示例还会删除一个令人沮丧的、不需要的标签。