如何编写 Java 过滤器插件

如何编写 Java 过滤器插件

要为 Logstash 开发新的 Java 过滤器,您需要编写一个符合 Logstash Java 过滤器 API 的新 Java 类,将其打包,并使用 logstash-plugin 实用程序进行安装。我们将逐步介绍这些步骤。

设置您的环境

复制示例存储库

首先复制示例过滤器插件。插件 API 当前是 Logstash 代码库的一部分,因此您必须拥有可用的本地副本。您可以使用以下 git 命令获取 Logstash 代码库的副本

git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>

branch_name 应对应于包含 Java 插件 API 首选版本的 Logstash 版本。

Java 插件 API 的 GA 版本在 Logstash 代码库的 7.2 及更高版本的分支中可用。

为您的 Logstash 代码库本地副本指定 target_folder。如果您未指定 target_folder,则默认在您当前文件夹下创建一个名为 logstash 的新文件夹。

生成 .jar 文件

在您获取 Logstash 代码库的适当版本副本后,您需要对其进行编译以生成包含 Java 插件 API 的 .jar 文件。从您的 Logstash 代码库 ($LS_HOME) 的根目录中,您可以使用 ./gradlew assemble(如果您在 Windows 上运行,则使用 gradlew.bat assemble)进行编译。这应该会生成 $LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar,其中 xyz 指的是 Logstash 的版本。

成功编译 Logstash 后,您需要告诉您的 Java 插件在哪里可以找到 logstash-core-x.y.z.jar 文件。在您的插件项目的根文件夹中创建一个名为 gradle.properties 的新文件。该文件应该有一行

LOGSTASH_CORE_PATH=<target_folder>/logstash-core

其中 target_folder 是您的 Logstash 代码库本地副本的根文件夹。

编写插件代码

示例过滤器插件允许您配置每个事件中将被反转的字段。例如,如果过滤器配置为反转 day_of_week 字段,则 day_of_week: "Monday" 的事件将转换为 day_of_week: "yadnoM"。让我们看看示例过滤器中的主类

@LogstashPlugin(name = "java_filter_example")
public class JavaFilterExample implements Filter {

    public static final PluginConfigSpec<String> SOURCE_CONFIG =
            PluginConfigSpec.stringSetting("source", "message");

    private String id;
    private String sourceField;

    public JavaFilterExample(String id, Configuration config, Context context) {
        this.id = id;
        this.sourceField = config.get(SOURCE_CONFIG);
    }

    @Override
    public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener) {
        for (Event e : events) {
            Object f = e.getField(sourceField);
            if (f instanceof String) {
                e.setField(sourceField, StringUtils.reverse((String)f));
                matchListener.filterMatched(e);
            }
        }
        return events;
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema() {
        return Collections.singletonList(SOURCE_CONFIG);
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public void close() {
        this.sourceField = null;
        return;
    }
}

让我们逐步检查该类的每个部分。

类声明

@LogstashPlugin(name = "java_filter_example")
public class JavaFilterExample implements Filter {

关于类声明的注意事项

  • 所有 Java 插件都必须使用 @LogstashPlugin 注释进行注释。此外

    • 必须提供注释的 name 属性,并定义插件在 Logstash 管道定义中使用的名称。例如,此过滤器将在 Logstash 管道定义的 filter 部分中被引用为 filter { java_filter_example => { .... } }
    • name 属性的值必须与类的名称匹配,但不包括大小写和下划线。
  • 该类必须实现 co.elastic.logstash.api.Filter 接口。
  • Java 插件不能在 org.logstashco.elastic.logstash 包中创建,以防止与 Logstash 本身中的类发生潜在冲突。

插件设置

下面的代码片段包含设置定义和引用它的方法

public static final PluginConfigSpec<String> SOURCE_CONFIG =
        PluginConfigSpec.stringSetting("source", "message");

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
    return Collections.singletonList(SOURCE_CONFIG);
}

PluginConfigSpec 类允许开发人员指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。在此示例中,source 设置定义了每个事件中将被反转的字段的名称。这不是必需的设置,如果未明确设置,则其默认值将为 message

configSchema 方法必须返回插件支持的所有设置的列表。在 Java 插件项目的未来阶段,Logstash 执行引擎将验证所有必需的设置都存在,并且不存在任何不受支持的设置。

构造函数和初始化

private String id;
private String sourceField;

public JavaFilterExample(String id, Configuration config, Context context) {
    this.id = id;
    this.sourceField = config.get(SOURCE_CONFIG);
}

所有 Java 过滤器插件都必须有一个构造函数,该构造函数接受一个 String id 和一个 Configuration 以及一个 Context 参数。这是在运行时用于实例化它们的构造函数。所有插件设置的检索和验证都应在此构造函数中进行。在此示例中,每个事件中要反转的字段的名称从其设置中检索,并存储在本地变量中,以便稍后可以在 filter 方法中使用。

任何其他初始化也可能在构造函数中发生。如果在过滤器插件的配置或初始化中遇到任何不可恢复的错误,则应抛出一个描述性异常。该异常将被记录下来,并阻止 Logstash 启动。

Filter 方法

@Override
public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener) {
    for (Event e : events) {
        Object f = e.getField(sourceField);
        if (f instanceof String) {
            e.setField(sourceField, StringUtils.reverse((String)f));
            matchListener.filterMatched(e);
        }
    }
    return events;

最后,我们来看一下 filter 方法,该方法由 Logstash 执行引擎在事件流经事件处理管道时对批量的事件进行调用。要过滤的事件在 events 参数中提供,并且该方法应返回过滤后的事件的集合。过滤器可能会在事件流经管道时对事件执行各种操作,包括

  • 修改 — 事件中的字段可以通过过滤器添加、删除或更改。这是过滤器对事件执行各种类型的富化的最常见场景。在这种情况下,可以返回传入的 events 集合而不进行修改,因为集合中的事件是在适当位置修改的。
  • 删除 — 可以通过过滤器从事件管道中删除事件,以便后续的过滤器和输出不会收到它们。在这种情况下,必须在返回过滤后的事件集合之前,从中删除要删除的事件。
  • 创建 — 过滤器可以将新事件插入事件管道中,这些新事件只能由后续的过滤器和输出看到。在这种情况下,必须在返回过滤后的事件集合之前,将新事件添加到该集合中。
  • 观察 — 事件可能会通过过滤器而不发生更改地传递到事件管道中。这在过滤器基于事件管道中观察到的事件执行外部操作(例如,更新外部缓存)的场景中可能很有用。在这种情况下,可以返回传入的 events 集合而不进行修改,因为没有进行任何更改。

在上面的示例中,如果 source 字段的值是字符串值,则从每个事件中检索该值并将其反转。由于每个事件都是在适当位置修改的,因此可以返回传入的 events 集合。

matchListener 是过滤器指示哪些事件“匹配”的机制。过滤器(例如 add_fieldadd_tag)的常见操作仅应用于被指定为“匹配”的事件。一些过滤器(例如grok 过滤器)对构成匹配事件的条件有明确的定义,并且仅针对匹配的事件通知侦听器。其他过滤器(例如UUID 过滤器)没有特定的匹配条件,并且应针对每个过滤的事件通知侦听器。在此示例中,过滤器会通知匹配侦听器,任何在其 source 字段中具有 String 值并且因此能够被反转的事件。

getId 方法

@Override
public String getId() {
    return id;
}

对于过滤器插件,getId 方法应始终返回在实例化时通过构造函数提供给插件的 id。

close 方法

@Override
public void close() {
    // shutdown a resource that was instantiated during the filter initialization phase.
    this.sourceField = null;
    return;
}

过滤器插件可以使用其他资源来执行操作,例如创建新的数据库连接。实现 close 方法将允许插件在关闭管道时释放这些资源。

单元测试

最后,但同样重要的是,强烈建议进行单元测试。示例过滤器插件包含一个示例单元测试,您可以将其用作自己的模板。

打包和部署

Java 插件被打包为 Ruby gems,用于依赖管理和与 Ruby 插件的互操作性。一旦它们被打包为 gems,就可以像 Ruby 插件一样使用 logstash-plugin 实用程序进行安装。因为 Java 插件开发不应需要了解 Ruby 或其工具链,所以通过示例 Java 插件提供的 Gradle 构建文件中的自定义任务自动化了将 Java 插件打包为 Ruby gems 的过程。以下各节描述了如何配置和执行打包任务以及如何在 Logstash 中安装打包的 Java 插件。

配置 Gradle 打包任务

以下部分出现在示例 Java 插件提供的 build.gradle 文件的顶部附近

// ===========================================================================
// plugin info
// ===========================================================================
group                      'org.logstashplugins' // must match the package of the main plugin class
version                    "${file("VERSION").text.trim()}" // read from required VERSION file
description                = "Example Java filter implementation"
pluginInfo.licenses        = ['Apache-2.0'] // list of SPDX license IDs
pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
pluginInfo.authors         = ['Elasticsearch']
pluginInfo.email           = ['[email protected]']
pluginInfo.homepage        = "https://elastic.ac.cn/guide/en/logstash/current/index.html"
pluginInfo.pluginType      = "filter"
pluginInfo.pluginClass     = "JavaFilterExample"
pluginInfo.pluginName      = "java_filter_example"
// ===========================================================================

您应该为您的插件配置上述值。

  • version 值将自动从插件代码库根目录中的 VERSION 文件中读取。
  • pluginInfo.pluginType 应设置为 inputfiltercodecoutput 之一。
  • pluginInfo.pluginName 必须与主插件类上 @LogstashPlugin 注释中指定的名称匹配。Gradle 打包任务将对此进行验证,如果它们不匹配,则返回错误。

运行 Gradle 打包任务

要将插件打包为 Ruby gem,需要一些 Ruby 源代码文件,以及一个 gemspec 文件和一个 Gemfile。这些 Ruby 文件仅用于定义 Ruby gem 的结构,或者在 Logstash 启动时注册 Java 插件。它们在运行时事件处理期间不会被使用。Gradle 打包任务会根据上述部分配置的值自动生成所有这些文件。

您可以使用以下命令运行 Gradle 打包任务

./gradlew gem

对于 Windows 平台:请在命令中将 ./gradlew 替换为 gradlew.bat (如果适用)。

该任务将在插件代码库的根目录中生成一个 gem 文件,名称为 logstash-{plugintype}-<pluginName>-<version>.gem

在 Logstash 中安装 Java 插件

将 Java 插件打包为 Ruby gem 后,可以使用以下命令将其安装到 Logstash 中

bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem

对于 Windows 平台:请在命令中将正斜杠替换为反斜杠(如果适用)。

使用 Java 过滤器插件运行 Logstash

以下是一个最小的 Logstash 配置,可用于测试 Java 过滤器插件是否已正确安装并正常工作。

input {
  generator { message => "Hello world!" count => 1 }
}
filter {
  java_filter_example {}
}
output {
  stdout { codec => rubydebug }
}

将上述 Logstash 配置复制到一个文件(例如 java_filter.conf)。使用以下命令启动 Logstash

bin/logstash -f /path/to/java_filter.conf

使用上述配置,Logstash 的预期输出(不包括初始化)为

{
      "sequence" => 0,
      "@version" => "1",
       "message" => "!dlrow olleH",
    "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ,
          "host" => "<yourHostName>"
}

反馈

如果您对 Logstash 中 Java 插件的支持有任何反馈,请在我们的 主要 Github 问题 中评论,或在 Logstash 论坛 中发帖。