如何编写Java输出插件

如何编写Java输出插件

要为Logstash开发新的Java输出,需要编写一个符合Logstash Java Outputs API的新Java类,将其打包,并使用logstash-plugin实用程序安装它。我们将逐步介绍这些步骤。

设置你的环境

复制示例仓库

首先复制示例输出插件。插件API目前是Logstash代码库的一部分,因此你必须有一个可用的本地副本。你可以使用以下git命令获取Logstash代码库的副本:

git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>

branch_name应该对应于包含首选版本的Java插件API的Logstash版本。

Java插件API的GA版本可在Logstash代码库的7.2及更高版本分支中找到。

指定Logstash代码库本地副本的target_folder。如果不指定target_folder,则默认为当前文件夹下名为logstash的新文件夹。

生成.jar文件

获得相应版本的Logstash代码库副本后,需要编译它以生成包含Java插件API的.jar文件。从Logstash代码库的根目录($LS_HOME),可以使用./gradlew assemble(或在Windows上运行则为gradlew.bat assemble)进行编译。这应该会生成$LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar,其中xyz代表Logstash的版本。

成功编译Logstash后,需要告诉你的Java插件在哪里可以找到logstash-core-x.y.z.jar文件。在插件项目的根文件夹中创建一个名为gradle.properties的新文件。该文件应该只有一行:

LOGSTASH_CORE_PATH=<target_folder>/logstash-core

其中target_folder是Logstash代码库本地副本的根文件夹。

编写插件代码

示例输出插件使用事件的toString方法将事件打印到控制台。让我们看看示例输出中的主类:

@LogstashPlugin(name = "java_output_example")
public class JavaOutputExample implements Output {

    public static final PluginConfigSpec<String> PREFIX_CONFIG =
            PluginConfigSpec.stringSetting("prefix", "");

    private final String id;
    private String prefix;
    private PrintStream printer;
    private final CountDownLatch done = new CountDownLatch(1);
    private volatile boolean stopped = false;

    public JavaOutputExample(final String id, final Configuration configuration, final Context context) {
        this(id, configuration, context, System.out);
    }

    JavaOutputExample(final String id, final Configuration config, final Context context, OutputStream targetStream) {
        this.id = id;
        prefix = config.get(PREFIX_CONFIG);
        printer = new PrintStream(targetStream);
    }

    @Override
    public void output(final Collection<Event> events) {
      Iterator<Event> z = events.iterator();
      while (z.hasNext() && !stopped) {
          String s = prefix + z.next();
          printer.println(s);
        }
    }

    @Override
    public void stop() {
        stopped = true;
        done.countDown();
    }

    @Override
    public void awaitStop() throws InterruptedException {
        done.await();
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema() {
        return Collections.singletonList(PREFIX_CONFIG);
    }

    @Override
    public String getId() {
        return id;
    }
}

让我们逐步分析该类的每个部分。

类声明

@LogstashPlugin(name="java_output_example")
public class JavaOutputExample implements Output {

关于类声明的说明

  • 所有Java插件都必须使用@LogstashPlugin注解进行注释。此外:

    • 必须提供注解的name属性,它定义了插件的名称,这将在Logstash管道定义中使用。例如,此输出将在Logstash管道定义的输出部分中引用为output { java_output_example => { .... } }
    • name属性的值必须与类名匹配,不区分大小写和下划线。
  • 该类必须实现co.elastic.logstash.api.Output接口。
  • 为了避免与Logstash本身的类发生潜在冲突,不能在org.logstashco.elastic.logstash包中创建Java插件。

插件设置

下面的代码片段包含设置定义和引用它的方法:

public static final PluginConfigSpec<String> PREFIX_CONFIG =
        PluginConfigSpec.stringSetting("prefix", "");

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
    return Collections.singletonList(PREFIX_CONFIG);
}

PluginConfigSpec类允许开发者指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。在这个例子中,prefix设置定义了一个可选的前缀,包含在事件的输出中。此设置不是必需的,如果未显式设置,则默认为空字符串。

configSchema方法必须返回插件支持的所有设置的列表。在Java插件项目的未来阶段,Logstash执行引擎将验证所有必需的设置都存在,并且不存在任何不支持的设置。

构造函数和初始化

private final String id;
private String prefix;
private PrintStream printer;

public JavaOutputExample(final String id, final Configuration configuration, final Context context) {
    this(configuration, context, System.out);
}

JavaOutputExample(final String id, final Configuration config, final Context context, OutputStream targetStream) {
    this.id = id;
    prefix = config.get(PREFIX_CONFIG);
    printer = new PrintStream(targetStream);
}

所有Java输出插件都必须有一个构造函数,它接受一个String id和一个ConfigurationContext参数。这是在运行时用于实例化它们的构造函数。所有插件设置的检索和验证都应该在此构造函数中发生。在这个例子中,prefix设置的值被检索并存储在一个局部变量中,以便稍后在output方法中使用。在这个例子中,定义了第二个包私有构造函数,它对于使用Stream(而不是System.out)进行单元测试很有用。

任何额外的初始化也可能发生在构造函数中。如果在输出插件的配置或初始化中遇到任何不可恢复的错误,则应该抛出一个描述性异常。该异常将被记录,并将阻止Logstash启动。

输出方法

@Override
public void output(final Collection<Event> events) {
    Iterator<Event> z = events.iterator();
    while (z.hasNext() && !stopped) {
        String s = prefix + z.next();
        printer.println(s);
    }
}

输出可以将事件发送到本地接收器(如控制台或文件)或远程系统(如Elasticsearch或其他外部系统)。在这个例子中,事件被打印到本地控制台。

stop和awaitStop方法

private final CountDownLatch done = new CountDownLatch(1);
private volatile boolean stopped;

@Override
public void stop() {
    stopped = true;
    done.countDown();
}

@Override
public void awaitStop() throws InterruptedException {
    done.await();
}

stop方法通知输出停止发送事件。stop机制可以通过任何遵守API约定方式实现,尽管对于许多用例而言,volatile boolean标志效果很好。因为这个输出示例非常简单,所以它的output方法不检查stop标志。

输出异步和协作地停止。使用awaitStop方法阻塞,直到输出完成停止过程。请注意,此方法不应stop方法那样发出停止输出的信号。awaitStop机制可以通过任何遵守API约定方式实现,尽管CountDownLatch对于许多用例而言效果很好。

getId方法

@Override
public String getId() {
    return id;
}

对于输出插件,getId方法应该始终返回在实例化时通过其构造函数提供给插件的id。

单元测试

最后,但同样重要的是,强烈建议进行单元测试。示例输出插件包含一个示例单元测试,你可以将其用作你自己的模板。

打包和部署

Java插件被打包为Ruby gems,用于依赖项管理和与Ruby插件的互操作性。一旦它们被打包为gems,就可以像Ruby插件一样使用logstash-plugin实用程序安装它们。因为Java插件开发不需要了解Ruby或其工具链,所以将Java插件打包为Ruby gems的过程已通过示例Java插件提供的Gradle构建文件中的自定义任务实现自动化。以下部分描述了如何配置和执行该打包任务,以及如何在Logstash中安装打包的Java插件。

配置Gradle打包任务

以下部分出现在示例Java插件提供的build.gradle文件的顶部附近:

// ===========================================================================
// plugin info
// ===========================================================================
group                      'org.logstashplugins' // must match the package of the main plugin class
version                    "${file("VERSION").text.trim()}" // read from required VERSION file
description                = "Example Java filter implementation"
pluginInfo.licenses        = ['Apache-2.0'] // list of SPDX license IDs
pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
pluginInfo.authors         = ['Elasticsearch']
pluginInfo.email           = ['[email protected]']
pluginInfo.homepage        = "https://elastic.ac.cn/guide/en/logstash/current/index.html"
pluginInfo.pluginType      = "filter"
pluginInfo.pluginClass     = "JavaFilterExample"
pluginInfo.pluginName      = "java_filter_example"
// ===========================================================================

你应该为你的插件配置上面的值。

  • version值将自动从插件代码库根目录下的VERSION文件中读取。
  • pluginInfo.pluginType应设置为inputfiltercodecoutput之一。
  • pluginInfo.pluginName必须与主插件类上的@LogstashPlugin注解中指定的名称匹配。Gradle打包任务将验证这一点,如果它们不匹配,则返回错误。

运行Gradle打包任务

需要几个Ruby源文件以及gemspec文件和Gemfile才能将插件打包为Ruby gem。这些Ruby文件仅用于定义Ruby gem结构或在Logstash启动时注册Java插件。它们在运行时事件处理期间未使用。Gradle打包任务会根据上面配置的值自动生成所有这些文件。

可以使用以下命令运行Gradle打包任务:

./gradlew gem

对于Windows平台:在命令中根据需要将gradlew.bat替换为./gradlew

该任务将在插件代码库的根目录中生成一个名为logstash-{plugintype}-<pluginName>-<version>.gem的gem文件。

在Logstash中安装Java插件

将Java插件打包为Ruby gem后,可以使用以下命令在Logstash中安装它:

bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem

对于Windows平台:在命令中根据需要将反斜杠替换为正斜杠。

使用Java输出插件运行Logstash

以下是一个最小的Logstash配置,可用于测试Java输出插件是否正确安装并运行。

input {
  generator { message => "Hello world!" count => 1 }
}
output {
  java_output_example {}
}

将上述Logstash配置复制到一个文件(例如java_output.conf)。然后应使用以下命令启动Logstash:

bin/logstash -f /path/to/java_output.conf

使用上述配置的预期Logstash输出(不包括初始化)为:

{"@timestamp":"yyyy-MM-ddTHH:mm:ss.SSSZ","message":"Hello world!","@version":"1","host":"<yourHostname>","sequence":0}

反馈

如果你对Logstash中的Java插件支持有任何反馈,请评论我们的主要Github问题或在Logstash论坛中发帖。