如何编写 Java 输入插件

如何编写 Java 输入插件

要为 Logstash 开发新的 Java 输入,您需要编写一个符合 Logstash Java 输入 API 的新 Java 类,将其打包,并使用 logstash-plugin 实用程序安装它。我们将逐步介绍这些步骤。

设置您的环境

复制示例仓库

首先复制示例输入插件。插件 API 当前是 Logstash 代码库的一部分,因此您必须拥有其本地副本。您可以使用以下 git 命令获取 Logstash 代码库的副本

git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>

branch_name 应对应于包含 Java 插件 API 首选版本的 Logstash 版本。

Java 插件 API 的 GA 版本在 Logstash 代码库的 7.2 及更高版本分支中可用。

为 Logstash 代码库的本地副本指定 target_folder。如果您未指定 target_folder,则默认值是当前文件夹下的一个名为 logstash 的新文件夹。

生成 .jar 文件

在获得 Logstash 代码库的相应修订版副本后,您需要对其进行编译,以生成包含 Java 插件 API 的 .jar 文件。从 Logstash 代码库的根目录 ($LS_HOME) 中,您可以使用 ./gradlew assemble (如果在 Windows 上运行,则使用 gradlew.bat assemble)对其进行编译。这将生成 $LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar,其中 xyz 表示 Logstash 的版本。

成功编译 Logstash 后,您需要告诉您的 Java 插件在哪里可以找到 logstash-core-x.y.z.jar 文件。在插件项目的根文件夹中创建一个名为 gradle.properties 的新文件。该文件应该只有一行

LOGSTASH_CORE_PATH=<target_folder>/logstash-core

其中 target_folder 是 Logstash 代码库本地副本的根文件夹。

编写插件代码

示例输入插件在终止之前生成可配置数量的简单事件。让我们看一下示例输入中的主类。

@LogstashPlugin(name="java_input_example")
public class JavaInputExample implements Input {

    public static final PluginConfigSpec<Long> EVENT_COUNT_CONFIG =
            PluginConfigSpec.numSetting("count", 3);

    public static final PluginConfigSpec<String> PREFIX_CONFIG =
            PluginConfigSpec.stringSetting("prefix", "message");

    private String id;
    private long count;
    private String prefix;
    private final CountDownLatch done = new CountDownLatch(1);
    private volatile boolean stopped;


    public JavaInputExample(String id, Configuration config, Context context) {
            this.id = id;
        count = config.get(EVENT_COUNT_CONFIG);
        prefix = config.get(PREFIX_CONFIG);
    }

    @Override
    public void start(Consumer<Map<String, Object>> consumer) {
        int eventCount = 0;
        try {
            while (!stopped && eventCount < count) {
                eventCount++;
                consumer.accept.push(Collections.singletonMap("message",
                        prefix + " " + StringUtils.center(eventCount + " of " + count, 20)));
            }
        } finally {
            stopped = true;
            done.countDown();
        }
    }

    @Override
    public void stop() {
        stopped = true; // set flag to request cooperative stop of input
    }

    @Override
    public void awaitStop() throws InterruptedException {
        done.await(); // blocks until input has stopped
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema() {
        return Arrays.asList(EVENT_COUNT_CONFIG, PREFIX_CONFIG);
    }

    @Override
    public String getId() {
        return this.id;
    }
}

让我们逐步检查该类的每个部分。

类声明

@LogstashPlugin(name="java_input_example")
public class JavaInputExample implements Input {

关于类声明的注意事项

  • 所有 Java 插件都必须使用 @LogstashPlugin 注解进行注解。此外

    • 必须提供注解的 name 属性,并定义插件在 Logstash 管道定义中使用的名称。例如,此输入在 Logstash 管道定义的输入部分中会被引用为 input { java_input_example => { .... } }
    • name 属性的值必须与类名匹配,不包括大小写和下划线。
  • 该类必须实现 co.elastic.logstash.api.Input 接口。
  • Java 插件不能在 org.logstashco.elastic.logstash 包中创建,以防止与 Logstash 本身中的类发生潜在冲突。

插件设置

以下代码片段包含设置定义和引用它的方法。

public static final PluginConfigSpec<Long> EVENT_COUNT_CONFIG =
        PluginConfigSpec.numSetting("count", 3);

public static final PluginConfigSpec<String> PREFIX_CONFIG =
        PluginConfigSpec.stringSetting("prefix", "message");

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
    return Arrays.asList(EVENT_COUNT_CONFIG, PREFIX_CONFIG);
}

PluginConfigSpec 类允许开发人员指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。在此示例中,count 设置定义将生成的事件数,而 prefix 设置定义要在事件字段中包含的可选前缀。这两个设置都不是必需的,如果未显式设置,则这些设置分别默认为 3message

configSchema 方法必须返回插件支持的所有设置的列表。在 Java 插件项目的未来阶段中,Logstash 执行引擎将验证所有必需的设置都存在,并且不存在任何不受支持的设置。

构造函数和初始化

private String id;
private long count;
private String prefix;

public JavaInputExample(String id, Configuration config, Context context) {
    this.id = id;
    count = config.get(EVENT_COUNT_CONFIG);
    prefix = config.get(PREFIX_CONFIG);
}

所有 Java 输入插件都必须具有一个构造函数,该构造函数接受 String id 和 ConfigurationContext 参数。这是在运行时用于实例化它们的构造函数。所有插件设置的检索和验证应在此构造函数中进行。在此示例中,检索两个插件设置的值并将其存储在局部变量中,以供稍后在 start 方法中使用。

任何其他初始化也可能发生在构造函数中。如果在输入插件的配置或初始化中遇到任何不可恢复的错误,则应抛出描述性异常。该异常将被记录,并会阻止 Logstash 启动。

Start 方法

@Override
public void start(Consumer<Map<String, Object>> consumer) {
    int eventCount = 0;
    try {
        while (!stopped && eventCount < count) {
            eventCount++;
            consumer.accept.push(Collections.singletonMap("message",
                    prefix + " " + StringUtils.center(eventCount + " of " + count, 20)));
        }
    } finally {
        stopped = true;
        done.countDown();
    }
}

start 方法开始输入中的事件生成循环。输入是灵活的,并且可以通过多种不同的机制生成事件,包括

  • 拉取机制,例如定期查询外部数据库
  • 推送机制,例如客户端发送到本地网络端口的事件
  • 定时计算,例如心跳
  • 任何其他生成有用事件流的机制。事件流可以是有限的,也可以是无限的。如果输入生成无限的事件流,则此方法应循环,直到通过 stop 方法发出停止请求。如果输入生成有限的事件流,则此方法应在生成流中的最后一个事件或发出停止请求时终止,以先到者为准。

事件应构造为 Map<String, Object> 的实例,并通过 Consumer<Map<String, Object>>.accept() 方法推送到事件管道中。为了减少分配和 GC 压力,输入可以通过在调用 Consumer<Map<String, Object>>.accept() 之间修改其字段来重复使用相同的映射实例,因为事件管道将基于映射数据的副本创建事件。

Stop 和 awaitStop 方法

private final CountDownLatch done = new CountDownLatch(1);
private volatile boolean stopped;

@Override
public void stop() {
    stopped = true; // set flag to request cooperative stop of input
}

@Override
public void awaitStop() throws InterruptedException {
    done.await(); // blocks until input has stopped
}

stop 方法通知输入停止生成事件。停止机制可以通过任何符合 API 合约的方式实现,尽管 volatile boolean 标志对于许多用例来说效果很好。

输入以异步和协作方式停止。使用 awaitStop 方法进行阻塞,直到输入完成停止过程。请注意,此方法应像 stop 方法那样发出停止输入的信号。awaitStop 机制可以通过任何符合 API 合约的方式实现,尽管 CountDownLatch 对于许多用例来说效果很好。

getId 方法

@Override
public String getId() {
    return id;
}

对于输入插件,getId 方法应始终返回实例化时通过其构造函数提供给插件的 ID。

单元测试

最后但并非最不重要的是,强烈建议进行单元测试。示例输入插件包含一个示例单元测试,您可以将其用作您自己的模板。

打包和部署

Java 插件打包为 Ruby gem,用于依赖管理以及与 Ruby 插件的互操作性。一旦它们打包为 gem,就可以像 Ruby 插件一样使用 logstash-plugin 实用程序进行安装。由于 Java 插件开发不应需要 Ruby 或其工具链的知识,因此将 Java 插件打包为 Ruby gem 的过程已通过示例 Java 插件提供的 Gradle 构建文件中的自定义任务实现自动化。以下各节描述了如何配置和执行打包任务以及如何在 Logstash 中安装打包的 Java 插件。

配置 Gradle 打包任务

以下部分出现在示例 Java 插件提供的 build.gradle 文件的顶部附近

// ===========================================================================
// plugin info
// ===========================================================================
group                      'org.logstashplugins' // must match the package of the main plugin class
version                    "${file("VERSION").text.trim()}" // read from required VERSION file
description                = "Example Java filter implementation"
pluginInfo.licenses        = ['Apache-2.0'] // list of SPDX license IDs
pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
pluginInfo.authors         = ['Elasticsearch']
pluginInfo.email           = ['[email protected]']
pluginInfo.homepage        = "https://elastic.ac.cn/guide/en/logstash/current/index.html"
pluginInfo.pluginType      = "filter"
pluginInfo.pluginClass     = "JavaFilterExample"
pluginInfo.pluginName      = "java_filter_example"
// ===========================================================================

您应该为您的插件配置上述值。

  • version 值将从插件代码库根目录中的 VERSION 文件中自动读取。
  • pluginInfo.pluginType 应设置为 inputfiltercodecoutput 之一。
  • pluginInfo.pluginName 必须与主插件类上的 @LogstashPlugin 注解中指定的名称匹配。Gradle 打包任务将验证这一点,如果它们不匹配,则会返回错误。

运行 Gradle 打包任务

需要几个 Ruby 源文件以及 gemspec 文件和 Gemfile 来将插件打包为 Ruby gem。这些 Ruby 文件仅用于定义 Ruby gem 结构或在 Logstash 启动时注册 Java 插件。它们在运行时事件处理期间不使用。Gradle 打包任务会根据上述部分中配置的值自动生成所有这些文件。

您可以使用以下命令运行 Gradle 打包任务

./gradlew gem

对于 Windows 平台:在命令中适当地将 ./gradlew 替换为 gradlew.bat

该任务将在插件代码库的根目录中生成一个 gem 文件,文件名为 logstash-{plugintype}-<pluginName>-<version>.gem

在 Logstash 中安装 Java 插件

将您的 Java 插件打包为 Ruby gem 后,您可以使用以下命令将其安装在 Logstash 中

bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem

对于 Windows 平台:在命令中适当地将正斜杠替换为反斜杠。

使用 Java 输入插件运行 Logstash

以下是一个最小的 Logstash 配置,可用于测试 Java 输入插件是否已正确安装并正常运行。

input {
  java_input_example {}
}
output {
  stdout { codec => rubydebug }
}

将上述 Logstash 配置复制到诸如 java_input.conf 之类的文件中。使用以下命令启动 Logstash

bin/logstash -f /path/to/java_input.conf

使用上述配置的预期 Logstash 输出(不包括初始化)是

{
      "@version" => "1",
       "message" => "message        1 of 3       ",
    "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ
}
{
      "@version" => "1",
       "message" => "message        2 of 3       ",
    "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ
}
{
      "@version" => "1",
       "message" => "message        3 of 3       ",
    "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ
}

反馈

如果您对 Logstash 中的 Java 插件支持有任何反馈,请在我们的主 Github 问题上发表评论或在Logstash 论坛中发帖。