如何编写 Java 过滤器插件
要为 Logstash 开发新的 Java 过滤器,您需要编写一个新的 Java 类,该类符合 Logstash Java Filters API,对其进行打包,然后使用 logstash-plugin 实用程序安装它。我们将逐步完成这些步骤。
首先复制示例过滤器插件。插件 API 目前是 Logstash 代码库的一部分,因此您必须拥有可用的本地副本。您可以使用以下 git
命令获取 Logstash 代码库的副本
git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>
branch_name
应该对应于 Logstash 的版本,其中包含 Java 插件 API 的首选修订版。
Java 插件 API 的 GA 版本可在 Logstash 代码库的 7.2
及更高版本的分支中使用。
为 Logstash 代码库的本地副本指定 target_folder
。如果您未指定 target_folder
,则它默认为当前文件夹下的一个名为 logstash
的新文件夹。
获得适当修订版本的 Logstash 代码库的副本后,您需要对其进行编译以生成包含 Java 插件 API 的 .jar 文件。从 Logstash 代码库的根目录 ($LS_HOME) 中,您可以使用 ./gradlew assemble
(如果在 Windows 上运行,则使用 gradlew.bat assemble
)对其进行编译。这应该会生成 $LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar
,其中 x
、y
和 z
指的是 Logstash 的版本。
成功编译 Logstash 后,您需要告诉 Java 插件在哪里可以找到 logstash-core-x.y.z.jar
文件。在插件项目的根文件夹中创建一个名为 gradle.properties
的新文件。该文件应该只有一行
LOGSTASH_CORE_PATH=<target_folder>/logstash-core
其中 target_folder
是 Logstash 代码库的本地副本的根文件夹。
示例过滤器插件允许配置每个事件中将被反转的字段。例如,如果过滤器配置为反转 day_of_week
字段,则具有 day_of_week: "Monday"
的事件将转换为 day_of_week: "yadnoM"
。让我们看看该示例过滤器中的主类
@LogstashPlugin(name = "java_filter_example")
public class JavaFilterExample implements Filter {
public static final PluginConfigSpec<String> SOURCE_CONFIG =
PluginConfigSpec.stringSetting("source", "message");
private String id;
private String sourceField;
public JavaFilterExample(String id, Configuration config, Context context) {
this.id = id;
this.sourceField = config.get(SOURCE_CONFIG);
}
@Override
public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener) {
for (Event e : events) {
Object f = e.getField(sourceField);
if (f instanceof String) {
e.setField(sourceField, StringUtils.reverse((String)f));
matchListener.filterMatched(e);
}
}
return events;
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Collections.singletonList(SOURCE_CONFIG);
}
@Override
public String getId() {
return this.id;
}
@Override
public void close() {
this.sourceField = null;
return;
}
}
让我们逐步检查该类的每个部分。
@LogstashPlugin(name = "java_filter_example")
public class JavaFilterExample implements Filter {
关于类声明的注意事项
所有 Java 插件都必须使用
@LogstashPlugin
注解进行注释。此外- 必须提供注解的
name
属性,并定义插件的名称,因为它将在 Logstash 管道定义中使用。例如,此过滤器将在 Logstash 管道定义的 filter 部分中引用为filter { java_filter_example => { .... } }
name
属性的值必须与类的名称匹配,不包括大小写和下划线。
- 必须提供注解的
该类必须实现
co.elastic.logstash.api.Filter
接口。Java 插件不能在
org.logstash
或co.elastic.logstash
包中创建,以防止与 Logstash 本身中的类发生潜在冲突。
下面的代码片段包含设置定义和引用它的方法
public static final PluginConfigSpec<String> SOURCE_CONFIG =
PluginConfigSpec.stringSetting("source", "message");
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Collections.singletonList(SOURCE_CONFIG);
}
PluginConfigSpec
类允许开发人员指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。在此示例中,source
设置定义了每个事件中将被反转的字段的名称。它不是必需的设置,如果未显式设置,则其默认值为 message
。
configSchema
方法必须返回插件支持的所有设置的列表。在 Java 插件项目的未来阶段,Logstash 执行引擎将验证是否所有必需的设置都存在,并且不存在不受支持的设置。
private String id;
private String sourceField;
public JavaFilterExample(String id, Configuration config, Context context) {
this.id = id;
this.sourceField = config.get(SOURCE_CONFIG);
}
所有 Java 过滤器插件都必须有一个构造函数,该构造函数接受一个 String
id 和一个 Configuration
和 Context
参数。这是在运行时实例化它们时将使用的构造函数。所有插件设置的检索和验证都应在此构造函数中进行。在此示例中,从其设置中检索要在每个事件中反转的字段的名称,并将其存储在局部变量中,以便稍后在 filter
方法中使用。
任何其他初始化也可能发生在构造函数中。如果在过滤器插件的配置或初始化中遇到任何无法恢复的错误,则应抛出一个描述性的异常。该异常将被记录下来,并且会阻止 Logstash 启动。
@Override
public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener) {
for (Event e : events) {
Object f = e.getField(sourceField);
if (f instanceof String) {
e.setField(sourceField, StringUtils.reverse((String)f));
matchListener.filterMatched(e);
}
}
return events;
最后,我们来到 filter
方法,该方法由 Logstash 执行引擎在事件处理管道中流动时在批处理事件上调用。要过滤的事件在 events
参数中提供,并且该方法应返回过滤后的事件的集合。过滤器可以对事件执行各种操作,因为它们流经管道,包括
- 突变 — 事件中的字段可以由过滤器添加、删除或更改。这是对事件执行各种丰富功能的过滤器最常见的场景。在此场景中,可以返回未经修改的传入
events
集合,因为集合中的事件已就地突变。 - 删除 — 可以通过过滤器从事件管道中删除事件,以便后续过滤器和输出不会收到它们。在此场景中,要删除的事件必须从过滤后的事件集合中删除,然后才能返回。
- 创建 — 过滤器可以将新事件插入到事件管道中,这些事件仅由后续过滤器和输出看到。在此场景中,必须将新事件添加到过滤后的事件集合中,然后才能返回。
- 观察 — 事件可以通过过滤器在事件管道中不变地传递。这在过滤器基于事件管道中观察到的事件执行外部操作(例如,更新外部缓存)的场景中可能很有用。在此场景中,可以返回未经修改的传入
events
集合,因为未进行任何更改。
在上面的示例中,从每个事件中检索 source
字段的值,如果它是字符串值,则将其反转。因为每个事件都就地突变,所以可以返回传入的 events
集合。
matchListener
是过滤器指示哪些事件“匹配”的机制。仅当事件被指定为“匹配”时,才会应用过滤器的常见操作,例如 add_field
和 add_tag
。某些过滤器,例如 grok 过滤器,对构成匹配事件的内容有明确的定义,并且仅针对匹配事件通知侦听器。其他过滤器,例如 UUID 过滤器,没有特定的匹配条件,并且应为每个过滤的事件通知侦听器。在此示例中,过滤器为其 source
字段中具有 String
值并因此能够被反转的任何事件通知匹配侦听器。
@Override
public String getId() {
return id;
}
对于过滤器插件,getId
方法应始终返回在实例化时通过其构造函数提供给插件的 ID。
@Override
public void close() {
// shutdown a resource that was instantiated during the filter initialization phase.
this.sourceField = null;
return;
}
过滤器插件可以使用其他资源来执行操作,例如创建新的数据库连接。实现 close
方法将允许插件在关闭管道时释放这些资源。
最后,但绝对不是最不重要的是,强烈建议进行单元测试。示例过滤器插件包含一个 示例单元测试,您可以将其用作您自己的模板。
Java 插件打包为 Ruby gem,用于依赖项管理以及与 Ruby 插件的互操作性。一旦它们被打包为 gem,就可以像 Ruby 插件一样使用 logstash-plugin
实用程序进行安装。因为 Java 插件开发不应需要 Ruby 或其工具链的知识,所以通过示例 Java 插件提供的 Gradle 构建文件中的自定义任务自动化了将 Java 插件打包为 Ruby gem 的过程。以下各节介绍如何配置和执行该打包任务,以及如何在 Logstash 中安装打包的 Java 插件。
以下部分出现在示例 Java 插件提供的 build.gradle
文件的顶部附近
// ===========================================================================
// plugin info
// ===========================================================================
group 'org.logstashplugins'
version "${file("VERSION").text.trim()}"
description = "Example Java filter implementation"
pluginInfo.licenses = ['Apache-2.0']
pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
pluginInfo.authors = ['Elasticsearch']
pluginInfo.email = ['info@elastic.co']
pluginInfo.homepage = "https://elastic.ac.cn/guide/en/logstash/current/index.html"
pluginInfo.pluginType = "filter"
pluginInfo.pluginClass = "JavaFilterExample"
pluginInfo.pluginName = "java_filter_example"
// ===========================================================================
- 必须与主插件类的包匹配
- 从必需的 VERSION 文件读取
- SPDX 许可证 ID 列表
您应该为您的插件配置上述值。
version
值将自动从插件代码库根目录中的VERSION
文件中读取。pluginInfo.pluginType
应设置为input
、filter
、codec
或output
之一。pluginInfo.pluginName
必须与主插件类上的@LogstashPlugin
注解中指定的名称匹配。Gradle 打包任务将验证这一点,如果它们不匹配,则返回错误。
要将插件打包成 Ruby gem,需要几个 Ruby 源文件以及一个 gemspec
文件和一个 Gemfile
。这些 Ruby 文件仅用于定义 Ruby gem 结构或在 Logstash 启动时注册 Java 插件。它们不在运行时事件处理中使用。Gradle 打包任务会根据上面章节中配置的值自动生成所有这些文件。
您可以使用以下命令运行 Gradle 打包任务
./gradlew gem
对于 Windows 平台:在命令中,根据需要将 ./gradlew
替换为 gradlew.bat
。
该任务将在插件代码库的根目录中生成一个 gem 文件,名称为 logstash-{{plugintype}}-<pluginName>-<version>.gem
将 Java 插件打包为 Ruby gem 后,可以使用以下命令将其安装到 Logstash 中
bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem
对于 Windows 平台:在命令中,根据需要将正斜杠替换为反斜杠。
以下是一个最小的 Logstash 配置,可用于测试 Java 过滤器插件是否已正确安装并正常运行。
input {
generator { message => "Hello world!" count => 1 }
}
filter {
java_filter_example {}
}
output {
stdout { codec => rubydebug }
}
将上述 Logstash 配置复制到诸如 java_filter.conf
之类的文件中。使用以下命令启动 Logstash
bin/logstash -f /path/to/java_filter.conf
使用上述配置,预期的 Logstash 输出(不包括初始化)是
{
"sequence" => 0,
"@version" => "1",
"message" => "!dlrow olleH",
"@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ,
"host" => "<yourHostName>"
}
如果您对 Logstash 中的 Java 插件支持有任何反馈,请在我们的 主 Github 问题上发表评论,或在 Logstash 论坛中发帖。