要为 Logstash 开发新的 Java 输出,您需要编写一个新的 Java 类,该类符合 Logstash Java 输出 API,对其进行打包,并使用 logstash-plugin 实用程序进行安装。我们将逐步介绍这些步骤。
首先复制示例输出插件。该插件 API 当前是 Logstash 代码库的一部分,因此您必须拥有可用的本地副本。您可以使用以下 git
命令获取 Logstash 代码库的副本
git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>
branch_name
应对应于包含 Java 插件 API 首选版本的 Logstash 版本。
Java 插件 API 的 GA 版本在 Logstash 代码库的 7.2
及更高版本的分支中可用。
为您的 Logstash 代码库本地副本指定 target_folder
。如果您未指定 target_folder
,则它默认为您当前文件夹下的一个名为 logstash
的新文件夹。
获取 Logstash 代码库的适当修订版本后,您需要对其进行编译以生成包含 Java 插件 API 的 .jar 文件。从 Logstash 代码库的根目录 ($LS_HOME) 中,您可以使用 ./gradlew assemble
(如果在 Windows 上运行,则使用 gradlew.bat assemble
)进行编译。这将生成 $LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar
,其中 x
、y
和 z
指的是 Logstash 的版本。
成功编译 Logstash 后,您需要告诉您的 Java 插件在哪里找到 logstash-core-x.y.z.jar
文件。在您的插件项目的根文件夹中创建一个名为 gradle.properties
的新文件。该文件应包含一行:
LOGSTASH_CORE_PATH=<target_folder>/logstash-core
其中 target_folder
是您 Logstash 代码库本地副本的根文件夹。
示例输出插件使用事件的 toString
方法将事件打印到控制台。让我们看一下示例输出中的主类
@LogstashPlugin(name = "java_output_example") public class JavaOutputExample implements Output { public static final PluginConfigSpec<String> PREFIX_CONFIG = PluginConfigSpec.stringSetting("prefix", ""); private final String id; private String prefix; private PrintStream printer; private final CountDownLatch done = new CountDownLatch(1); private volatile boolean stopped = false; public JavaOutputExample(final String id, final Configuration configuration, final Context context) { this(id, configuration, context, System.out); } JavaOutputExample(final String id, final Configuration config, final Context context, OutputStream targetStream) { this.id = id; prefix = config.get(PREFIX_CONFIG); printer = new PrintStream(targetStream); } @Override public void output(final Collection<Event> events) { Iterator<Event> z = events.iterator(); while (z.hasNext() && !stopped) { String s = prefix + z.next(); printer.println(s); } } @Override public void stop() { stopped = true; done.countDown(); } @Override public void awaitStop() throws InterruptedException { done.await(); } @Override public Collection<PluginConfigSpec<?>> configSchema() { return Collections.singletonList(PREFIX_CONFIG); } @Override public String getId() { return id; } }
让我们逐步检查该类的每个部分。
@LogstashPlugin(name="java_output_example") public class JavaOutputExample implements Output {
关于类声明的注意事项
-
所有 Java 插件都必须使用
@LogstashPlugin
注释进行注释。此外- 必须提供注释的
name
属性,并定义插件在 Logstash 管道定义中使用的名称。例如,此输出将在 Logstash 管道定义的输出部分中被引用为output { java_output_example => { .... } }
name
属性的值必须与类名匹配,不包括大小写和下划线。
- 必须提供注释的
- 该类必须实现
co.elastic.logstash.api.Output
接口。 - Java 插件不能在
org.logstash
或co.elastic.logstash
包中创建,以防止与 Logstash 本身中的类发生潜在冲突。
下面的代码段包含设置定义和引用它的方法
public static final PluginConfigSpec<String> PREFIX_CONFIG = PluginConfigSpec.stringSetting("prefix", ""); @Override public Collection<PluginConfigSpec<?>> configSchema() { return Collections.singletonList(PREFIX_CONFIG); }
PluginConfigSpec
类允许开发人员指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。在此示例中,prefix
设置定义了一个可选的前缀,该前缀包含在事件的输出中。该设置不是必需的,如果未显式设置,则默认为空字符串。
configSchema
方法必须返回插件支持的所有设置的列表。在 Java 插件项目的未来阶段,Logstash 执行引擎将验证是否所有必需的设置都存在,并且不存在任何不支持的设置。
private final String id; private String prefix; private PrintStream printer; public JavaOutputExample(final String id, final Configuration configuration, final Context context) { this(configuration, context, System.out); } JavaOutputExample(final String id, final Configuration config, final Context context, OutputStream targetStream) { this.id = id; prefix = config.get(PREFIX_CONFIG); printer = new PrintStream(targetStream); }
所有 Java 输出插件都必须有一个构造函数,该构造函数接受一个 String
id 和一个 Configuration
以及 Context
参数。这是将在运行时用于实例化它们的构造函数。所有插件设置的检索和验证都应在此构造函数中进行。在此示例中,prefix
设置的值被检索并存储在局部变量中,以便稍后在 output
方法中使用。在此示例中,定义了第二个包私有构造函数,该构造函数对于使用 System.out
以外的 Stream
进行单元测试很有用。
任何其他初始化也可以在构造函数中进行。如果在输出插件的配置或初始化中遇到任何无法恢复的错误,则应抛出一个描述性异常。该异常将被记录,并将阻止 Logstash 启动。
@Override public void output(final Collection<Event> events) { Iterator<Event> z = events.iterator(); while (z.hasNext() && !stopped) { String s = prefix + z.next(); printer.println(s); } }
输出可以将事件发送到本地接收器(例如控制台或文件)或发送到远程系统(例如 Elasticsearch 或其他外部系统)。在此示例中,事件被打印到本地控制台。
private final CountDownLatch done = new CountDownLatch(1); private volatile boolean stopped; @Override public void stop() { stopped = true; done.countDown(); } @Override public void awaitStop() throws InterruptedException { done.await(); }
stop
方法通知输出停止发送事件。停止机制可以以任何符合 API 约定的方式实现,尽管 volatile boolean
标志适用于许多用例。由于此输出示例非常简单,因此其 output
方法不会检查停止标志。
输出异步且协同停止。使用 awaitStop
方法阻塞,直到输出完成停止过程。请注意,此方法不应向输出发送停止信号,就像 stop
方法那样。awaitStop 机制可以以任何符合 API 约定的方式实现,尽管 CountDownLatch
适用于许多用例。
@Override public String getId() { return id; }
对于输出插件,getId
方法应始终返回在实例化时通过其构造函数提供给插件的 id。
最后,但同样重要的是,强烈建议进行单元测试。示例输出插件包括一个示例单元测试,您可以将其用作您自己的模板。
Java 插件被打包为 Ruby gems,以进行依赖项管理并与 Ruby 插件互操作。一旦它们被打包为 gems,就可以像 Ruby 插件一样使用 logstash-plugin
实用程序进行安装。由于 Java 插件开发不应需要了解 Ruby 或其工具链,因此将 Java 插件打包为 Ruby gems 的过程已通过示例 Java 插件提供的 Gradle 构建文件中的自定义任务实现自动化。以下部分介绍了如何配置和执行该打包任务以及如何在 Logstash 中安装打包的 Java 插件。
以下部分出现在示例 Java 插件随附的 build.gradle
文件的顶部附近
// =========================================================================== // plugin info // =========================================================================== group 'org.logstashplugins' // must match the package of the main plugin class version "${file("VERSION").text.trim()}" // read from required VERSION file description = "Example Java filter implementation" pluginInfo.licenses = ['Apache-2.0'] // list of SPDX license IDs pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" pluginInfo.authors = ['Elasticsearch'] pluginInfo.email = ['[email protected]'] pluginInfo.homepage = "https://elastic.ac.cn/guide/en/logstash/current/index.html" pluginInfo.pluginType = "filter" pluginInfo.pluginClass = "JavaFilterExample" pluginInfo.pluginName = "java_filter_example" // ===========================================================================
您应为插件配置以上值。
version
值将自动从插件代码库根目录中的VERSION
文件中读取。-
pluginInfo.pluginType
应设置为input
、filter
、codec
或output
之一。 -
pluginInfo.pluginName
必须与主插件类的@LogstashPlugin
注释上指定的名称匹配。Gradle 打包任务将验证这一点,如果它们不匹配,则返回错误。
需要几个 Ruby 源文件以及一个 gemspec
文件和一个 Gemfile
来将插件打包为 Ruby gem。这些 Ruby 文件仅用于定义 Ruby gem 结构或在 Logstash 启动时注册 Java 插件。它们在运行时事件处理期间不使用。Gradle 打包任务会根据以上部分中配置的值自动生成所有这些文件。
您可以使用以下命令运行 Gradle 打包任务
./gradlew gem
对于 Windows 平台:在命令中根据需要将 gradlew.bat
替换为 ./gradlew
。
该任务将在您的插件代码库的根目录中生成一个 gem 文件,其名称为 logstash-{plugintype}-<pluginName>-<version>.gem
将 Java 插件打包为 Ruby gem 后,可以使用此命令将其安装在 Logstash 中
bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem
对于 Windows 平台:在命令中根据需要将反斜杠替换为正斜杠。
以下是一个最小的 Logstash 配置,可用于测试 Java 输出插件是否已正确安装并正常工作。
input { generator { message => "Hello world!" count => 1 } } output { java_output_example {} }
将上述 Logstash 配置复制到文件(例如 java_output.conf
)。然后应使用以下命令启动 Logstash
bin/logstash -f /path/to/java_output.conf
上述配置的预期 Logstash 输出(不包括初始化)是
{"@timestamp":"yyyy-MM-ddTHH:mm:ss.SSSZ","message":"Hello world!","@version":"1","host":"<yourHostname>","sequence":0}
如果您对 Logstash 中的 Java 插件支持有任何反馈,请在我们的主 Github 问题中发表评论或在Logstash 论坛中发帖。