要为 Logstash 开发新的 Java 输入,您需要编写一个符合 Logstash Java 输入 API 的新 Java 类,将其打包,并使用 logstash-plugin 实用程序进行安装。我们将逐步介绍这些步骤。
设置您的环境
复制示例仓库
首先复制 示例输入插件。插件 API 目前是 Logstash 代码库的一部分,因此您必须拥有该代码库的本地副本。您可以使用以下 git
命令获取 Logstash 代码库的副本
git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>
branch_name
应与包含首选 Java 插件 API 修订版的 Logstash 版本相对应。
Java 插件 API 的 GA 版本在 Logstash 代码库的 7.2
及更高版本分支中可用。
指定 Logstash 代码库本地副本的 target_folder
。如果您未指定 target_folder
,则默认情况下它将位于当前文件夹下的名为 logstash
的新文件夹中。
生成 .jar 文件
获得 Logstash 代码库的适当修订版副本后,您需要对其进行编译以生成包含 Java 插件 API 的 .jar 文件。从 Logstash 代码库的根目录 ($LS_HOME) 开始,您可以使用 ./gradlew assemble
(或 gradlew.bat assemble
,如果您在 Windows 上运行)进行编译。这将生成 $LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar
,其中 x
、y
和 z
指的是 Logstash 的版本。
成功编译 Logstash 后,您需要告诉 Java 插件在哪里可以找到 logstash-core-x.y.z.jar
文件。在插件项目的根文件夹中创建一个名为 gradle.properties
的新文件。该文件应包含一行
LOGSTASH_CORE_PATH=<target_folder>/logstash-core
其中 target_folder
是 Logstash 代码库本地副本的根文件夹。
编写插件代码
示例输入插件在终止之前生成可配置数量的简单事件。让我们看一下示例输入中的主类。
@LogstashPlugin(name="java_input_example") public class JavaInputExample implements Input { public static final PluginConfigSpec<Long> EVENT_COUNT_CONFIG = PluginConfigSpec.numSetting("count", 3); public static final PluginConfigSpec<String> PREFIX_CONFIG = PluginConfigSpec.stringSetting("prefix", "message"); private String id; private long count; private String prefix; private final CountDownLatch done = new CountDownLatch(1); private volatile boolean stopped; public JavaInputExample(String id, Configuration config, Context context) { this.id = id; count = config.get(EVENT_COUNT_CONFIG); prefix = config.get(PREFIX_CONFIG); } @Override public void start(Consumer<Map<String, Object>> consumer) { int eventCount = 0; try { while (!stopped && eventCount < count) { eventCount++; consumer.accept.push(Collections.singletonMap("message", prefix + " " + StringUtils.center(eventCount + " of " + count, 20))); } } finally { stopped = true; done.countDown(); } } @Override public void stop() { stopped = true; // set flag to request cooperative stop of input } @Override public void awaitStop() throws InterruptedException { done.await(); // blocks until input has stopped } @Override public Collection<PluginConfigSpec<?>> configSchema() { return Arrays.asList(EVENT_COUNT_CONFIG, PREFIX_CONFIG); } @Override public String getId() { return this.id; } }
让我们逐步介绍并检查该类的每个部分。
类声明
@LogstashPlugin(name="java_input_example") public class JavaInputExample implements Input {
关于类声明的说明
-
所有 Java 插件都必须使用
@LogstashPlugin
注释进行注释。此外- 注释的
name
属性必须提供,并定义插件的名称,该名称将在 Logstash 管道定义中使用。例如,此输入将在 Logstash 管道定义的输入部分中引用为input { java_input_example => { .... } }
name
属性的值必须与类名匹配,不区分大小写和下划线。
- 注释的
- 该类必须实现
co.elastic.logstash.api.Input
接口。 - Java 插件不能在
org.logstash
或co.elastic.logstash
包中创建,以防止与 Logstash 本身中的类发生潜在冲突。
插件设置
以下代码段包含设置定义和引用它的方法。
public static final PluginConfigSpec<Long> EVENT_COUNT_CONFIG = PluginConfigSpec.numSetting("count", 3); public static final PluginConfigSpec<String> PREFIX_CONFIG = PluginConfigSpec.stringSetting("prefix", "message"); @Override public Collection<PluginConfigSpec<?>> configSchema() { return Arrays.asList(EVENT_COUNT_CONFIG, PREFIX_CONFIG); }
PluginConfigSpec
类允许开发人员指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。在此示例中,count
设置定义将生成的事件数量,而 prefix
设置定义要包含在事件字段中的可选前缀。这两个设置都不是必需的,如果未显式设置,则默认情况下分别为 3
和 message
。
configSchema
方法必须返回插件支持的所有设置的列表。在 Java 插件项目的未来阶段,Logstash 执行引擎将验证所有必需的设置是否存在,并且不存在任何不支持的设置。
构造函数和初始化
private String id; private long count; private String prefix; public JavaInputExample(String id, Configuration config, Context context) { this.id = id; count = config.get(EVENT_COUNT_CONFIG); prefix = config.get(PREFIX_CONFIG); }
所有 Java 输入插件都必须具有一个接受 String
id 和 Configuration
以及 Context
参数的构造函数。这是在运行时实例化它们时将使用的构造函数。所有插件设置的检索和验证应在此构造函数中进行。在此示例中,两个插件设置的值将被检索并存储在本地变量中,以便稍后在 start
方法中使用。
任何其他初始化也可能在构造函数中进行。如果在输入插件的配置或初始化中遇到任何不可恢复的错误,则应抛出描述性异常。该异常将被记录,并将阻止 Logstash 启动。
启动方法
@Override public void start(Consumer<Map<String, Object>> consumer) { int eventCount = 0; try { while (!stopped && eventCount < count) { eventCount++; consumer.accept.push(Collections.singletonMap("message", prefix + " " + StringUtils.center(eventCount + " of " + count, 20))); } } finally { stopped = true; done.countDown(); } }
start
方法在输入中开始事件生成循环。输入很灵活,可以通过多种不同的机制生成事件,包括
- 拉取机制,例如定期查询外部数据库
- 推送机制,例如从客户端发送到本地网络端口的事件
- 定时计算,例如心跳
- 任何其他生成有用事件流的机制。事件流可以是有限的,也可以是无限的。如果输入生成无限的事件流,则此方法应循环,直到通过
stop
方法发出停止请求。如果输入生成有限的事件流,则此方法应在生成流中的最后一个事件或发出停止请求时终止,以先发生者为准。
事件应构造为 Map<String, Object>
的实例,并通过 Consumer<Map<String, Object>>.accept()
方法推送到事件管道中。为了减少分配和 GC 压力,输入可以通过在调用 Consumer<Map<String, Object>>.accept()
之间修改其字段来重复使用相同的映射实例,因为事件管道将根据映射数据的副本创建事件。
停止和 awaitStop 方法
private final CountDownLatch done = new CountDownLatch(1); private volatile boolean stopped; @Override public void stop() { stopped = true; // set flag to request cooperative stop of input } @Override public void awaitStop() throws InterruptedException { done.await(); // blocks until input has stopped }
stop
方法通知输入停止生成事件。停止机制可以以任何遵守 API 契约的方式实现,尽管对于许多用例来说,volatile boolean
标志效果很好。
输入异步和协作地停止。使用 awaitStop
方法阻塞,直到输入完成停止过程。请注意,此方法不应信号输入停止,就像 stop
方法一样。awaitStop 机制可以以任何遵守 API 契约的方式实现,尽管对于许多用例来说,CountDownLatch
效果很好。
getId 方法
@Override public String getId() { return id; }
对于输入插件,getId
方法应始终返回在实例化时通过构造函数提供给插件的 id。
单元测试
最后,但同样重要的是,强烈建议进行单元测试。示例输入插件包含一个 示例单元测试,您可以将其用作自己的模板。
打包和部署
Java 插件打包为 Ruby gem,用于依赖项管理和与 Ruby 插件的互操作性。一旦它们被打包为 gem,就可以使用 logstash-plugin
实用程序安装它们,就像 Ruby 插件一样。由于 Java 插件开发不需要了解 Ruby 或其工具链,因此将 Java 插件打包为 Ruby gem 的过程已通过示例 Java 插件提供的 Gradle 构建文件中的自定义任务实现自动化。以下部分描述了如何配置和执行该打包任务,以及如何在 Logstash 中安装打包的 Java 插件。
配置 Gradle 打包任务
以下部分出现在示例 Java 插件提供的 build.gradle
文件的顶部附近
// =========================================================================== // plugin info // =========================================================================== group 'org.logstashplugins' // must match the package of the main plugin class version "${file("VERSION").text.trim()}" // read from required VERSION file description = "Example Java filter implementation" pluginInfo.licenses = ['Apache-2.0'] // list of SPDX license IDs pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" pluginInfo.authors = ['Elasticsearch'] pluginInfo.email = ['[email protected]'] pluginInfo.homepage = "https://elastic.ac.cn/guide/en/logstash/current/index.html" pluginInfo.pluginType = "filter" pluginInfo.pluginClass = "JavaFilterExample" pluginInfo.pluginName = "java_filter_example" // ===========================================================================
您应该为您的插件配置上述值。
version
值将从插件代码库根目录中的VERSION
文件中自动读取。-
pluginInfo.pluginType
应设置为input
、filter
、codec
或output
之一。 -
pluginInfo.pluginName
必须与主插件类上的@LogstashPlugin
注释中指定的名称匹配。Gradle 打包任务将验证这一点,如果它们不匹配,则会返回错误。
运行 Gradle 打包任务
几个 Ruby 源文件以及 gemspec
文件和 Gemfile
是将插件打包为 Ruby gem 所必需的。这些 Ruby 文件仅用于定义 Ruby gem 结构或在 Logstash 启动时注册 Java 插件。它们在运行时事件处理期间不会使用。Gradle 打包任务会根据上述部分中配置的值自动生成所有这些文件。
您可以使用以下命令运行 Gradle 打包任务
./gradlew gem
对于 Windows 平台:在命令中适当地将 gradlew.bat
替换为 ./gradlew
。
该任务将在插件代码库的根目录中生成一个 gem 文件,其名称为 logstash-{plugintype}-<pluginName>-<version>.gem
在 Logstash 中安装 Java 插件
将 Java 插件打包为 Ruby gem 后,您可以使用以下命令在 Logstash 中安装它
bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem
对于 Windows 平台:在命令中适当地将反斜杠替换为正斜杠。
使用 Java 输入插件运行 Logstash
以下是最小的 Logstash 配置,可用于测试 Java 输入插件是否已正确安装并正常运行。
input { java_input_example {} } output { stdout { codec => rubydebug } }
将上面的 Logstash 配置复制到一个文件中,例如 java_input.conf
。使用以下命令启动 Logstash:
bin/logstash -f /path/to/java_input.conf
使用上述配置,预期的 Logstash 输出(不包括初始化)为
{ "@version" => "1", "message" => "message 1 of 3 ", "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ } { "@version" => "1", "message" => "message 2 of 3 ", "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ } { "@version" => "1", "message" => "message 3 of 3 ", "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ }
反馈
如果您对 Logstash 中的 Java 插件支持有任何反馈,请在我们的 主要 Github 问题 上发表评论,或在 Logstash 论坛 上发帖。