如何编写 Java 输入插件
要为 Logstash 开发新的 Java 输入,您需要编写符合 Logstash Java 输入 API 的新 Java 类,对其进行打包,然后使用 logstash-plugin 实用程序进行安装。 我们将逐步介绍这些步骤。
首先复制示例输入插件。 插件 API 当前是 Logstash 代码库的一部分,因此您必须拥有可用的本地副本。 您可以使用以下 git
命令获取 Logstash 代码库的副本
git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>
branch_name
应对应于包含 Java 插件 API 首选版本的 Logstash 版本。
Java 插件 API 的 GA 版本在 Logstash 代码库的 7.2
及更高版本的分支中可用。
为您的 Logstash 代码库本地副本指定 target_folder
。 如果您未指定 target_folder
,则默认情况下,它将位于当前文件夹下的名为 logstash
的新文件夹中。
获取 Logstash 代码库的相应修订版后,您需要对其进行编译以生成包含 Java 插件 API 的 .jar 文件。 在 Logstash 代码库的根目录 ($LS_HOME) 中,您可以使用 ./gradlew assemble
(如果在 Windows 上运行,则使用 gradlew.bat assemble
)对其进行编译。 这应该生成 $LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar
,其中 x
、y
和 z
指的是 Logstash 的版本。
成功编译 Logstash 后,您需要告诉 Java 插件在哪里可以找到 logstash-core-x.y.z.jar
文件。 在插件项目的根文件夹中创建一个名为 gradle.properties
的新文件。 该文件应包含单行
LOGSTASH_CORE_PATH=<target_folder>/logstash-core
其中 target_folder
是 Logstash 代码库本地副本的根文件夹。
示例输入插件在终止之前生成可配置数量的简单事件。 让我们看一下示例输入中的主类。
@LogstashPlugin(name="java_input_example")
public class JavaInputExample implements Input {
public static final PluginConfigSpec<Long> EVENT_COUNT_CONFIG =
PluginConfigSpec.numSetting("count", 3);
public static final PluginConfigSpec<String> PREFIX_CONFIG =
PluginConfigSpec.stringSetting("prefix", "message");
private String id;
private long count;
private String prefix;
private final CountDownLatch done = new CountDownLatch(1);
private volatile boolean stopped;
public JavaInputExample(String id, Configuration config, Context context) {
this.id = id;
count = config.get(EVENT_COUNT_CONFIG);
prefix = config.get(PREFIX_CONFIG);
}
@Override
public void start(Consumer<Map<String, Object>> consumer) {
int eventCount = 0;
try {
while (!stopped && eventCount < count) {
eventCount++;
consumer.accept.push(Collections.singletonMap("message",
prefix + " " + StringUtils.center(eventCount + " of " + count, 20)));
}
} finally {
stopped = true;
done.countDown();
}
}
@Override
public void stop() {
stopped = true;
}
@Override
public void awaitStop() throws InterruptedException {
done.await();
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Arrays.asList(EVENT_COUNT_CONFIG, PREFIX_CONFIG);
}
@Override
public String getId() {
return this.id;
}
}
- 设置标志以请求输入协作停止
- 阻塞直到输入停止
让我们逐步检查该类的每个部分。
@LogstashPlugin(name="java_input_example")
public class JavaInputExample implements Input {
关于类声明的说明
所有 Java 插件都必须使用
@LogstashPlugin
注释进行注释。 此外- 必须提供注释的
name
属性,并定义插件的名称,因为它将在 Logstash 管道定义中使用。 例如,此输入将在 Logstash 管道定义的 input 部分中被引用为input { java_input_example => { .... } }
name
属性的值必须与类的名称匹配,排除大小写和下划线。
- 必须提供注释的
该类必须实现
co.elastic.logstash.api.Input
接口。Java 插件不能在
org.logstash
或co.elastic.logstash
包中创建,以防止与 Logstash 本身中的类发生潜在冲突。
下面的代码段包含设置定义和引用该设置的方法。
public static final PluginConfigSpec<Long> EVENT_COUNT_CONFIG =
PluginConfigSpec.numSetting("count", 3);
public static final PluginConfigSpec<String> PREFIX_CONFIG =
PluginConfigSpec.stringSetting("prefix", "message");
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Arrays.asList(EVENT_COUNT_CONFIG, PREFIX_CONFIG);
}
PluginConfigSpec
类允许开发人员指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。 在此示例中,count
设置定义将生成的事件数,prefix
设置定义一个可选的前缀以包含在事件字段中。 这两个设置都不是必需的,如果未显式设置,则设置分别默认为 3
和 message
。
configSchema
方法必须返回插件支持的所有设置的列表。 在 Java 插件项目的未来阶段,Logstash 执行引擎将验证是否存在所有必需的设置,并且不存在不受支持的设置。
private String id;
private long count;
private String prefix;
public JavaInputExample(String id, Configuration config, Context context) {
this.id = id;
count = config.get(EVENT_COUNT_CONFIG);
prefix = config.get(PREFIX_CONFIG);
}
所有 Java 输入插件都必须具有一个构造函数,该构造函数接受 String
id 以及 Configuration
和 Context
参数。 这是将在运行时用于实例化它们的构造函数。 所有插件设置的检索和验证都应在此构造函数中进行。 在此示例中,检索两个插件设置的值并将其存储在局部变量中,以供以后在 start
方法中使用。
任何其他初始化也可能在构造函数中进行。 如果在输入插件的配置或初始化中遇到任何不可恢复的错误,则应抛出描述性异常。 该异常将被记录,并将阻止 Logstash 启动。
@Override
public void start(Consumer<Map<String, Object>> consumer) {
int eventCount = 0;
try {
while (!stopped && eventCount < count) {
eventCount++;
consumer.accept.push(Collections.singletonMap("message",
prefix + " " + StringUtils.center(eventCount + " of " + count, 20)));
}
} finally {
stopped = true;
done.countDown();
}
}
start
方法启动输入中的事件生成循环。 输入是灵活的,可以通过许多不同的机制生成事件,包括
- 一种拉取机制,例如定期查询外部数据库
- 一种推送机制,例如从客户端发送到本地网络端口的事件
- 一种定时计算,例如心跳
- 任何其他产生有用事件流的机制。 事件流可以是有限的或无限的。 如果输入产生无限的事件流,则此方法应循环直到通过
stop
方法发出停止请求。 如果输入产生有限的事件流,则此方法应在生成流中的最后一个事件或发出停止请求时终止,以先到者为准。
事件应构造为 Map<String, Object>
的实例,并通过 Consumer<Map<String, Object>>.accept()
方法推送到事件管道中。 为了减少分配和 GC 压力,输入可以通过在调用 Consumer<Map<String, Object>>.accept()
之间修改其字段来重用同一个 map 实例,因为事件管道将基于 map 数据的副本创建事件。
private final CountDownLatch done = new CountDownLatch(1);
private volatile boolean stopped;
@Override
public void stop() {
stopped = true;
}
@Override
public void awaitStop() throws InterruptedException {
done.await();
}
- 设置标志以请求输入协作停止
- 阻塞直到输入停止
stop
方法通知输入停止生成事件。 可以通过任何符合 API 约定的方式实现停止机制,尽管 volatile boolean
标志对于许多用例来说效果很好。
输入以异步和协作方式停止。 使用 awaitStop
方法阻止直到输入完成停止过程。 请注意,此方法不应像 stop
方法那样发出信号以停止输入。 可以通过任何符合 API 约定的方式实现 awaitStop 机制,尽管 CountDownLatch
对于许多用例来说效果很好。
@Override
public String getId() {
return id;
}
对于输入插件,getId
方法应始终返回在实例化时通过其构造函数提供给插件的 id。
最后,但肯定不是最不重要的,强烈建议进行单元测试。 示例输入插件包含一个示例单元测试,您可以将其用作您自己的模板。
Java 插件被打包为 Ruby gem,用于依赖项管理以及与 Ruby 插件的互操作性。 一旦它们被打包为 gem,就可以像 Ruby 插件一样使用 logstash-plugin
实用程序进行安装。 因为 Java 插件开发不应要求了解 Ruby 或其工具链,所以将 Java 插件打包为 Ruby gem 的过程已通过示例 Java 插件提供的 Gradle 构建文件中的自定义任务自动化。 以下各节描述了如何配置和执行该打包任务,以及如何在 Logstash 中安装打包的 Java 插件。
以下部分出现在示例 Java 插件提供的 build.gradle
文件的顶部附近
// ===========================================================================
// plugin info
// ===========================================================================
group 'org.logstashplugins'
version "${file("VERSION").text.trim()}"
description = "Example Java filter implementation"
pluginInfo.licenses = ['Apache-2.0']
pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
pluginInfo.authors = ['Elasticsearch']
pluginInfo.email = ['info@elastic.co']
pluginInfo.homepage = "https://elastic.ac.cn/guide/en/logstash/current/index.html"
pluginInfo.pluginType = "filter"
pluginInfo.pluginClass = "JavaFilterExample"
pluginInfo.pluginName = "java_filter_example"
// ===========================================================================
- 必须与主插件类的包匹配
- 从必需的 VERSION 文件中读取
- SPDX 许可证 ID 列表
您应该为您的插件配置上述值。
version
值将自动从插件代码库根目录中的VERSION
文件中读取。pluginInfo.pluginType
应设置为input
、filter
、codec
或output
之一。pluginInfo.pluginName
必须与主插件类上@LogstashPlugin
注释中指定的名称匹配。 Gradle 打包任务将验证这一点,如果它们不匹配,则返回一个错误。
需要几个 Ruby 源文件以及一个 gemspec
文件和一个 Gemfile
,才能将插件打包为 Ruby gem。 这些 Ruby 文件仅用于定义 Ruby gem 结构或在 Logstash 启动时注册 Java 插件。 它们不在运行时事件处理期间使用。 Gradle 打包任务会根据上面部分中配置的值自动生成所有这些文件。
您可以使用以下命令运行 Gradle 打包任务
./gradlew gem
对于 Windows 平台:在命令中将 ./gradlew
替换为 gradlew.bat
(如果适用)。
该任务将在插件代码库的根目录中生成一个 gem 文件,名称为 logstash-{{plugintype}}-<pluginName>-<version>.gem
将 Java 插件打包为 Ruby gem 后,可以使用此命令将其安装在 Logstash 中
bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem
对于 Windows 平台:在命令中将反斜杠替换为正斜杠(如果适用)。
以下是可用于测试 Java 输入插件是否已正确安装并运行的最小 Logstash 配置。
input {
java_input_example {}
}
output {
stdout { codec => rubydebug }
}
将上述 Logstash 配置复制到 java_input.conf 等文件中。 使用以下命令启动 Logstash
bin/logstash -f /path/to/java_input.conf
使用上述配置的预期 Logstash 输出(不包括初始化)是
{
"@version" => "1",
"message" => "message 1 of 3 ",
"@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ
}
{
"@version" => "1",
"message" => "message 2 of 3 ",
"@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ
}
{
"@version" => "1",
"message" => "message 3 of 3 ",
"@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ
}
如果您对 Logstash 中的 Java 插件支持有任何反馈,请在我们的主要 Github issue上发表评论,或在Logstash 论坛中发帖。