Java 编解码器目前仅支持 Java 输入和输出插件。它们不适用于 Ruby 输入或输出插件。
要为 Logstash 开发新的 Java 编解码器,您需要编写一个符合 Logstash Java 编解码器 API 的新 Java 类,将其打包,并使用 logstash-plugin 实用程序安装它。我们将逐步介绍这些步骤。
设置您的环境
复制示例仓库
首先复制 示例编解码器插件。插件 API 目前是 Logstash 代码库的一部分,因此您必须拥有该代码库的本地副本。您可以使用以下 git
命令获取 Logstash 代码库的副本
git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>
branch_name
应与包含首选 Java 插件 API 修订版的 Logstash 版本相对应。
Java 插件 API 的 GA 版本在 Logstash 代码库的 7.2
及更高版本分支中可用。
指定 Logstash 代码库本地副本的 target_folder
。如果您未指定 target_folder
,则默认情况下它将在当前文件夹下创建一个名为 logstash
的新文件夹。
生成 .jar 文件
获取到 Logstash 代码库的适当修订版副本后,您需要对其进行编译以生成包含 Java 插件 API 的 .jar 文件。从 Logstash 代码库的根目录 ($LS_HOME) 开始,您可以使用 ./gradlew assemble
(或如果您在 Windows 上运行,则使用 gradlew.bat assemble
)进行编译。这将生成 $LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar
,其中 x
、y
和 z
指的是 Logstash 的版本。
成功编译 Logstash 后,您需要告诉 Java 插件在哪里可以找到 logstash-core-x.y.z.jar
文件。在插件项目的根文件夹中创建一个名为 gradle.properties
的新文件。该文件应包含一行
LOGSTASH_CORE_PATH=<target_folder>/logstash-core
其中 target_folder
是 Logstash 代码库本地副本的根文件夹。
编写插件代码
示例编解码器插件对由可配置分隔符分隔的消息进行解码,并通过写入其字符串表示形式(以分隔符分隔)来对消息进行编码。例如,如果编解码器配置为使用 /
作为分隔符,则输入文本 event1/event2/
将被解码为两个独立的事件,其 message
字段分别为 event1
和 event2
。请注意,这只是一个示例编解码器,它没有涵盖生产级编解码器应涵盖的所有边缘情况。
让我们看一下该编解码器过滤器中的主类
@LogstashPlugin(name="java_codec_example") public class JavaCodecExample implements Codec { public static final PluginConfigSpec<String> DELIMITER_CONFIG = PluginConfigSpec.stringSetting("delimiter", ","); private final String id; private final String delimiter; public JavaCodecExample(final Configuration config, final Context context) { this(config.get(DELIMITER_CONFIG)); } private JavaCodecExample(String delimiter) { this.id = UUID.randomUUID().toString(); this.delimiter = delimiter; } @Override public void decode(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) { // a not-production-grade delimiter decoder byte[] byteInput = new byte[byteBuffer.remaining()]; byteBuffer.get(byteInput); if (byteInput.length > 0) { String input = new String(byteInput); String[] split = input.split(delimiter); for (String s : split) { Map<String, Object> map = new HashMap<>(); map.put("message", s); consumer.accept(map); } } } @Override public void flush(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) { // if the codec maintains any internal state such as partially-decoded input, this // method should flush that state along with any additional input supplied in // the ByteBuffer decode(byteBuffer, consumer); // this is a simplistic implementation } @Override public void encode(Event event, OutputStream outputStream) throws IOException { outputStream.write((event.toString() + delimiter).getBytes(Charset.defaultCharset())); } @Override public Collection<PluginConfigSpec<?>> configSchema() { // should return a list of all configuration options for this plugin return Collections.singletonList(DELIMITER_CONFIG); } @Override public Codec cloneCodec() { return new JavaCodecExample(this.delimiter); } @Override public String getId() { return this.id; } }
让我们逐步分析该类的每个部分。
类声明
@LogstashPlugin(name="java_codec_example") public class JavaCodecExample implements Codec {
关于类声明的说明
-
所有 Java 插件都必须使用
@LogstashPlugin
注释进行注释。此外- 注释的
name
属性必须提供,并定义插件的名称,该名称将在 Logstash 管道定义中使用。例如,此编解码器将在 Logstash 管道定义中适当输入或输出的编解码器部分中引用为codec => java_codec_example { }
name
属性的值必须与类名匹配,不区分大小写和下划线。
- 注释的
- 该类必须实现
co.elastic.logstash.api.Codec
接口。 - Java 插件不能在
org.logstash
或co.elastic.logstash
包中创建,以防止与 Logstash 本身中的类发生潜在冲突。
插件设置
以下代码片段包含设置定义和引用它的方法
public static final PluginConfigSpec<String> DELIMITER_CONFIG = PluginConfigSpec.stringSetting("delimiter", ","); @Override public Collection<PluginConfigSpec<?>> configSchema() { return Collections.singletonList(DELIMITER_CONFIG); }
PluginConfigSpec
类允许开发人员指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。在本例中,delimiter
设置定义了编解码器将事件拆分的分隔符。它不是必需的设置,如果未显式设置,则其默认值为 ,
。
configSchema
方法必须返回插件支持的所有设置的列表。Logstash 执行引擎将验证所有必需的设置都存在,并且不存在任何不支持的设置。
构造函数和初始化
private final String id; private final String delimiter; public JavaCodecExample(final Configuration config, final Context context) { this(config.get(DELIMITER_CONFIG)); } private JavaCodecExample(String delimiter) { this.id = UUID.randomUUID().toString(); this.delimiter = delimiter; }
所有 Java 编解码器插件都必须有一个接受 Configuration
和 Context
参数的构造函数。这是在运行时实例化它们时将使用的构造函数。所有插件设置的检索和验证应在此构造函数中进行。在本例中,用于分隔事件的分隔符是从其设置中检索的,并存储在局部变量中,以便稍后在 decode
和 encode
方法中使用。编解码器的 ID 初始化为随机 UUID(大多数编解码器都应如此),并且局部 encoder
变量初始化为使用指定字符集进行编码和解码。
任何其他初始化也可以在构造函数中进行。如果在编解码器插件的配置或初始化中遇到任何不可恢复的错误,则应抛出描述性异常。该异常将被记录,并将阻止 Logstash 启动。
编解码器方法
@Override public void decode(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) { // a not-production-grade delimiter decoder byte[] byteInput = new byte[byteBuffer.remaining()]; byteBuffer.get(byteInput); if (byteInput.length > 0) { String input = new String(byteInput); String[] split = input.split(delimiter); for (String s : split) { Map<String, Object> map = new HashMap<>(); map.put("message", s); consumer.accept(map); } } } @Override public void flush(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) { // if the codec maintains any internal state such as partially-decoded input, this // method should flush that state along with any additional input supplied in // the ByteBuffer decode(byteBuffer, consumer); // this is a simplistic implementation } @Override public void encode(Event event, OutputStream outputStream) throws IOException { outputStream.write((event.toString() + delimiter).getBytes(Charset.defaultCharset())); }
decode
、flush
和 encode
方法提供了编解码器的核心功能。编解码器可以由输入使用来将字节序列或流解码为事件,或由输出使用来将事件编码为字节序列。
decode
方法从指定的 ByteBuffer
中解码事件,并将它们传递给提供的 Consumer
。输入必须提供一个准备读取的 ByteBuffer
,其中 byteBuffer.position()
指示下一个要读取的位置,byteBuffer.limit()
指示缓冲区中第一个不安全读取的字节。编解码器必须确保 byteBuffer.position()
反映最后一个读取的位置,然后再将控制权返回给输入。然后,输入负责通过 byteBuffer.clear()
或 byteBuffer.compact()
将缓冲区返回到写入模式,然后再恢复写入。在上面的示例中,decode
方法只是在指定的分隔符上拆分传入的字节流。生产级编解码器(如 java-line
)不会做出简化的假设,即提供的字节流的末尾与事件的末尾相对应。
事件应构造为 Map<String, Object>
的实例,并通过 Consumer<Map<String, Object>>.accept()
方法推送到事件管道中。为了减少分配和 GC 压力,编解码器可以通过在调用 Consumer<Map<String, Object>>.accept()
之间修改其字段来重复使用相同的映射实例,因为事件管道将根据映射数据的副本创建事件。
flush
方法与 decode
方法协同工作,以从指定的 ByteBuffer
中解码所有剩余的事件,以及在先前调用 decode
方法后可能保留的任何内部状态。作为编解码器可能维护的内部状态的示例,请考虑具有 /
分隔符的字节 event1/event2/event3
的输入流。由于缓冲或其他原因,输入可能会向编解码器的 decode
方法提供部分字节流,例如 event1/eve
。在这种情况下,编解码器可以保存第二个事件的开头三个字符 eve
,而不是假设提供的字节流在事件边界处结束。如果对 decode
的下一次调用提供了 nt2/ev
字节,则编解码器将附加保存的 eve
字节以生成完整的 event2
事件,然后保存剩余的 ev
字节,以便在提供该事件的剩余字节时进行解码。对 flush
的调用向编解码器发出信号,表明提供的字节表示事件流的末尾,所有剩余的字节都应解码为事件。上面的 flush
示例是一个简单的实现,它不维护任何关于跨调用 decode
的部分提供的字节流的状态。
encode
方法将事件编码为字节序列,并将其写入指定的 OutputStream
。由于单个编解码器实例在 Logstash 管道输出阶段的所有管道工作程序之间共享,因此编解码器不应跨调用其 encode
方法保留状态。
cloneCodec 方法
@Override public Codec cloneCodec() { return new JavaCodecExample(this.delimiter); }
cloneCodec
方法应返回编解码器的相同实例,但其 ID 除外。由于编解码器可能在跨调用其 decode
方法时保持状态,因此多线程的输入插件应通过 cloneCodec
方法为每个线程使用编解码器的单独实例。由于单个编解码器实例在 Logstash 管道输出阶段的所有管道工作程序之间共享,因此编解码器不应跨调用其 encode
方法保留状态。在上面的示例中,编解码器使用相同的分隔符但不同的 ID 进行克隆。
getId 方法
@Override public String getId() { return id; }
对于编解码器插件,getId
方法应始终返回在实例化时设置的 ID。这通常是一个 UUID。
单元测试
最后,但同样重要的是,强烈建议进行单元测试。示例编解码器插件包含一个 示例单元测试,您可以将其用作您自己的模板。
打包和部署
Java 插件被打包成 Ruby gem 用于依赖管理和与 Ruby 插件的互操作性。一旦它们被打包成 gem,就可以使用 logstash-plugin
工具安装它们,就像 Ruby 插件一样。由于 Java 插件开发不需要了解 Ruby 或其工具链,因此将 Java 插件打包成 Ruby gem 的过程已通过示例 Java 插件提供的 Gradle 构建文件中的自定义任务实现自动化。以下部分描述了如何配置和执行该打包任务,以及如何在 Logstash 中安装打包的 Java 插件。
配置 Gradle 打包任务
以下部分出现在示例 Java 插件提供的 build.gradle
文件的顶部附近
// =========================================================================== // plugin info // =========================================================================== group 'org.logstashplugins' // must match the package of the main plugin class version "${file("VERSION").text.trim()}" // read from required VERSION file description = "Example Java filter implementation" pluginInfo.licenses = ['Apache-2.0'] // list of SPDX license IDs pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" pluginInfo.authors = ['Elasticsearch'] pluginInfo.email = ['[email protected]'] pluginInfo.homepage = "https://elastic.ac.cn/guide/en/logstash/current/index.html" pluginInfo.pluginType = "filter" pluginInfo.pluginClass = "JavaFilterExample" pluginInfo.pluginName = "java_filter_example" // ===========================================================================
您应该为您的插件配置上面的值。
version
值将自动从插件代码库根目录中的VERSION
文件中读取。-
pluginInfo.pluginType
应设置为input
、filter
、codec
或output
之一。 -
pluginInfo.pluginName
必须与主插件类上的@LogstashPlugin
注释中指定的名称匹配。Gradle 打包任务将验证这一点,如果它们不匹配,则会返回错误。
运行 Gradle 打包任务
几个 Ruby 源文件以及 gemspec
文件和 Gemfile
是将插件打包成 Ruby gem 所必需的。这些 Ruby 文件仅用于定义 Ruby gem 结构或在 Logstash 启动时注册 Java 插件。它们在运行时事件处理期间不使用。Gradle 打包任务会根据上面部分中配置的值自动生成所有这些文件。
您可以使用以下命令运行 Gradle 打包任务
./gradlew gem
对于 Windows 平台:在命令中将 gradlew.bat
替换为 ./gradlew
。
该任务将在插件代码库的根目录中生成一个 gem 文件,其名称为 logstash-{plugintype}-<pluginName>-<version>.gem
在 Logstash 中安装 Java 插件
将 Java 插件打包成 Ruby gem 后,可以使用以下命令在 Logstash 中安装它
bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem
对于 Windows 平台:在命令中将反斜杠替换为正斜杠。
使用 Java 编解码器插件运行 Logstash
要测试插件,请使用以下命令启动 Logstash
echo "foo,bar" | bin/logstash -e 'input { java_stdin { codec => java_codec_example } }'
使用上述配置的预期 Logstash 输出(不包括初始化)为
{ "@version" => "1", "message" => "foo", "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ, "host" => "<yourHostName>" } { "@version" => "1", "message" => "bar\n", "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ, "host" => "<yourHostName>" }
反馈
如果您对 Logstash 中的 Java 插件支持有任何反馈,请在我们的 主要 Github 问题 上发表评论,或在 Logstash 论坛 上发帖。