正在加载

如何编写 Java 输出插件

要为 Logstash 开发新的 Java 输出,您需要编写符合 Logstash Java Outputs API 的新 Java 类,对其进行打包,然后使用 logstash-plugin 实用程序安装它。我们将逐步介绍每个步骤。

首先复制示例输出插件。插件 API 目前是 Logstash 代码库的一部分,因此您必须拥有可用的本地副本。您可以使用以下 git 命令获取 Logstash 代码库的副本

git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>

branch_name 应对应于包含 Java 插件 API 首选修订版的 Logstash 版本。

注意

Java 插件 API 的 GA 版本在 Logstash 代码库的 7.2 及更高版本的分支中可用。

为 Logstash 代码库的本地副本指定 target_folder。如果您未指定 target_folder,则默认为当前文件夹下名为 logstash 的新文件夹。

获取 Logstash 代码库的适当修订版后,您需要编译它以生成包含 Java 插件 API 的 .jar 文件。 从 Logstash 代码库的根目录 ($LS_HOME) 中,您可以使用 ./gradlew assemble(如果在 Windows 上运行,则使用 gradlew.bat assemble)对其进行编译。 这应生成 $LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar,其中 xyz 指的是 Logstash 的版本。

成功编译 Logstash 后,您需要告诉 Java 插件在哪里可以找到 logstash-core-x.y.z.jar 文件。在插件项目的根文件夹中创建一个名为 gradle.properties 的新文件。该文件应包含一行

LOGSTASH_CORE_PATH=<target_folder>/logstash-core

其中 target_folder 是 Logstash 代码库本地副本的根文件夹。

示例输出插件使用事件的 toString 方法将事件打印到控制台。 让我们看看示例输出中的主类

@LogstashPlugin(name = "java_output_example")
public class JavaOutputExample implements Output {

    public static final PluginConfigSpec<String> PREFIX_CONFIG =
            PluginConfigSpec.stringSetting("prefix", "");

    private final String id;
    private String prefix;
    private PrintStream printer;
    private final CountDownLatch done = new CountDownLatch(1);
    private volatile boolean stopped = false;

    public JavaOutputExample(final String id, final Configuration configuration, final Context context) {
        this(id, configuration, context, System.out);
    }

    JavaOutputExample(final String id, final Configuration config, final Context context, OutputStream targetStream) {
        this.id = id;
        prefix = config.get(PREFIX_CONFIG);
        printer = new PrintStream(targetStream);
    }

    @Override
    public void output(final Collection<Event> events) {
      Iterator<Event> z = events.iterator();
      while (z.hasNext() && !stopped) {
          String s = prefix + z.next();
          printer.println(s);
        }
    }

    @Override
    public void stop() {
        stopped = true;
        done.countDown();
    }

    @Override
    public void awaitStop() throws InterruptedException {
        done.await();
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema() {
        return Collections.singletonList(PREFIX_CONFIG);
    }

    @Override
    public String getId() {
        return id;
    }
}

让我们逐步检查该类的每个部分。

@LogstashPlugin(name="java_output_example")
public class JavaOutputExample implements Output {

关于类声明的说明

  • 所有 Java 插件都必须使用 @LogstashPlugin 注释进行注释。 此外

    • 必须提供注释的 name 属性,并定义插件的名称,就像它将在 Logstash 管道定义中使用一样。 例如,此输出将在 Logstash 管道定义的输出部分中被引用为 output { java_output_example => { .... } }
    • name 属性的值必须与类的名称匹配,不包括大小写和下划线。
  • 该类必须实现 co.elastic.logstash.api.Output 接口。

  • Java 插件可能不在 org.logstashco.elastic.logstash 包中创建,以防止与 Logstash 本身中的类发生潜在冲突。

以下代码段包含设置定义和引用它的方法

public static final PluginConfigSpec<String> PREFIX_CONFIG =
        PluginConfigSpec.stringSetting("prefix", "");

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
    return Collections.singletonList(PREFIX_CONFIG);
}

PluginConfigSpec 类允许开发人员指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。 在此示例中,prefix 设置定义了一个可选前缀,以包含在事件的输出中。 该设置不是必需的,如果未显式设置,则默认为空字符串。

configSchema 方法必须返回插件支持的所有设置的列表。 在 Java 插件项目的未来阶段,Logstash 执行引擎将验证所有必需设置是否存在,并且不存在任何不受支持的设置。

private final String id;
private String prefix;
private PrintStream printer;

public JavaOutputExample(final String id, final Configuration configuration, final Context context) {
    this(configuration, context, System.out);
}

JavaOutputExample(final String id, final Configuration config, final Context context, OutputStream targetStream) {
    this.id = id;
    prefix = config.get(PREFIX_CONFIG);
    printer = new PrintStream(targetStream);
}

所有 Java 输出插件都必须有一个构造函数,该构造函数接受 String id 以及 ConfigurationContext 参数。 这是将在运行时用于实例化它们的构造函数。 所有插件设置的检索和验证都应在此构造函数中进行。 在此示例中,检索 prefix 设置的值并将其存储在局部变量中,以供以后在 output 方法中使用。 在此示例中,定义了第二个包私有构造函数,该构造函数对于使用 System.out 之外的 Stream 进行单元测试非常有用。

任何其他初始化也可能发生在构造函数中。 如果在输出插件的配置或初始化中遇到任何无法恢复的错误,则应抛出一个描述性异常。 该异常将被记录,并将阻止 Logstash 启动。

@Override
public void output(final Collection<Event> events) {
    Iterator<Event> z = events.iterator();
    while (z.hasNext() && !stopped) {
        String s = prefix + z.next();
        printer.println(s);
    }
}

输出可以将事件发送到本地接收器(例如控制台或文件),或发送到远程系统(例如 Elasticsearch 或其他外部系统)。 在此示例中,事件将打印到本地控制台。

private final CountDownLatch done = new CountDownLatch(1);
private volatile boolean stopped;

@Override
public void stop() {
    stopped = true;
    done.countDown();
}

@Override
public void awaitStop() throws InterruptedException {
    done.await();
}

stop 方法通知输出停止发送事件。 停止机制可以通过任何符合 API 契约的方式来实现,尽管 volatile boolean 标志对于许多用例都非常有效。 因为此输出示例非常简单,所以其 output 方法不检查停止标志。

输出以异步和协作方式停止。 使用 awaitStop 方法阻塞直到输出完成停止过程。 请注意,此方法不应像 stop 方法那样指示输出停止。 awaitStop 机制可以通过任何符合 API 契约的方式来实现,尽管 CountDownLatch 对于许多用例都非常有效。

@Override
public String getId() {
    return id;
}

对于输出插件,getId 方法应始终返回在实例化时通过其构造函数提供给插件的 id。

最后但同样重要的是,强烈建议进行单元测试。 示例输出插件包含一个示例单元测试,您可以将其用作您自己的模板。

Java 插件打包为 Ruby gem,用于依赖项管理和与 Ruby 插件的互操作性。 一旦它们被打包为 gem,就可以像 Ruby 插件一样使用 logstash-plugin 实用程序安装它们。 因为 Java 插件开发不需要了解 Ruby 或其工具链,所以将 Java 插件打包为 Ruby gem 的过程已通过示例 Java 插件提供的 Gradle 构建文件中的自定义任务实现自动化。 以下各节描述了如何配置和执行该打包任务以及如何在 Logstash 中安装打包的 Java 插件。

以下部分出现在示例 Java 插件提供的 build.gradle 文件顶部附近

// ===========================================================================
// plugin info
// ===========================================================================
group                      'org.logstashplugins'
version                    "${file("VERSION").text.trim()}"
description                = "Example Java filter implementation"
pluginInfo.licenses        = ['Apache-2.0']
pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
pluginInfo.authors         = ['Elasticsearch']
pluginInfo.email           = ['info@elastic.co']
pluginInfo.homepage        = "https://elastic.ac.cn/guide/en/logstash/current/index.html"
pluginInfo.pluginType      = "filter"
pluginInfo.pluginClass     = "JavaFilterExample"
pluginInfo.pluginName      = "java_filter_example"
// ===========================================================================
  1. 必须与主插件类的包匹配
  2. 从所需的 VERSION 文件读取
  3. SPDX 许可证 ID 列表

您应该为您的插件配置以上值。

  • version 值将自动从插件代码库根目录中的 VERSION 文件中读取。
  • pluginInfo.pluginType 应设置为 inputfiltercodecoutput 之一。
  • pluginInfo.pluginName 必须与主插件类上的 @LogstashPlugin 注释中指定的名称匹配。 Gradle 打包任务将验证这一点,如果不匹配,则返回错误。

需要多个 Ruby 源文件以及 gemspec 文件和 Gemfile 才能将插件打包为 Ruby gem。 这些 Ruby 文件仅用于定义 Ruby gem 结构或在 Logstash 启动时注册 Java 插件。 它们不在运行时事件处理期间使用。 Gradle 打包任务会自动根据上面各节中配置的值生成所有这些文件。

您可以使用以下命令运行 Gradle 打包任务

./gradlew gem

对于 Windows 平台:在命令中,将 gradlew.bat 替换为 ./gradlew(如果适用)。

该任务将在插件代码库的根目录中生成一个 gem 文件,名称为 logstash-{{plugintype}}-<pluginName>-<version>.gem

将 Java 插件打包为 Ruby gem 后,您可以使用以下命令将其安装在 Logstash 中

bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem

对于 Windows 平台:在命令中,将反斜杠替换为正斜杠(如果适用)。

以下是一个最小的 Logstash 配置,可用于测试 Java 输出插件是否已正确安装并运行。

input {
  generator { message => "Hello world!" count => 1 }
}
output {
  java_output_example {}
}

将上面的 Logstash 配置复制到诸如 java_output.conf 的文件中。 然后应使用以下命令启动 Logstash

bin/logstash -f /path/to/java_output.conf

使用上述配置,预期的 Logstash 输出(不包括初始化)是

{"@timestamp":"yyyy-MM-ddTHH:mm:ss.SSSZ","message":"Hello world!","@version":"1","host":"<yourHostname>","sequence":0}

如果您对Logstash中的Java插件支持有任何反馈,请在我们的主要Github问题中发表评论,或在Logstash论坛中发帖。

© . All rights reserved.