正在加载

如何编写 Java 编解码器插件

注意

Java 编解码器目前仅支持 Java 输入和输出插件。它们不适用于 Ruby 输入或输出插件。

要为 Logstash 开发新的 Java 编解码器,您需要编写一个符合 Logstash Java 编解码器 API 的 Java 类,将其打包,然后使用 logstash-plugin 实用程序安装它。我们将逐步完成这些步骤。

首先复制示例编解码器插件。该插件 API 当前是 Logstash 代码库的一部分,因此您必须拥有可用的本地副本。您可以使用以下 git 命令获取 Logstash 代码库的副本

git clone --branch <branch_name> --single-branch https://github.com/elastic/logstash.git <target_folder>

branch_name 应对应于包含 Java 插件 API 首选修订版的 Logstash 版本。

注意

Java 插件 API 的 GA 版本可在 Logstash 代码库的 7.2 及更高版本的分支中使用。

为 Logstash 代码库的本地副本指定 target_folder。 如果您未指定 target_folder,则默认情况下会在当前文件夹下创建一个名为 logstash 的新文件夹。

获取相应修订版的 Logstash 代码库的副本后,您需要对其进行编译以生成包含 Java 插件 API 的 .jar 文件。 从 Logstash 代码库的根目录 ($LS_HOME) 中,您可以使用 ./gradlew assemble(如果在 Windows 上运行,则使用 gradlew.bat assemble)对其进行编译。 这应该会生成 $LS_HOME/logstash-core/build/libs/logstash-core-x.y.z.jar,其中 xyz 指的是 Logstash 的版本。

成功编译 Logstash 后,您需要告诉 Java 插件在哪里可以找到 logstash-core-x.y.z.jar 文件。 在插件项目的根文件夹中创建一个名为 gradle.properties 的新文件。 该文件应该只有一行

LOGSTASH_CORE_PATH=<target_folder>/logstash-core

其中 target_folder 是 Logstash 代码库的本地副本的根文件夹。

示例编解码器插件会解码由可配置分隔符分隔的消息,并通过写入其字符串表示形式(以分隔符分隔)来编码消息。 例如,如果编解码器配置为使用 / 作为分隔符,则输入文本 event1/event2/ 将被解码为两个单独的事件,其 message 字段分别为 event1event2。 请注意,这只是一个示例编解码器,并未涵盖生产级编解码器应涵盖的所有边缘情况。

让我们看一下该编解码器过滤器中的主类

@LogstashPlugin(name="java_codec_example")
public class JavaCodecExample implements Codec {

    public static final PluginConfigSpec<String> DELIMITER_CONFIG =
            PluginConfigSpec.stringSetting("delimiter", ",");

    private final String id;
    private final String delimiter;

    public JavaCodecExample(final Configuration config, final Context context) {
        this(config.get(DELIMITER_CONFIG));
    }

    private JavaCodecExample(String delimiter) {
        this.id = UUID.randomUUID().toString();
        this.delimiter = delimiter;
    }

    @Override
    public void decode(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
        // a not-production-grade delimiter decoder
        byte[] byteInput = new byte[byteBuffer.remaining()];
        byteBuffer.get(byteInput);
        if (byteInput.length > 0) {
            String input = new String(byteInput);
            String[] split = input.split(delimiter);
            for (String s : split) {
                Map<String, Object> map = new HashMap<>();
                map.put("message", s);
                consumer.accept(map);
            }
        }
    }

    @Override
    public void flush(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
        // if the codec maintains any internal state such as partially-decoded input, this
        // method should flush that state along with any additional input supplied in
        // the ByteBuffer

        decode(byteBuffer, consumer);
    }

    @Override
    public void encode(Event event, OutputStream outputStream) throws IOException {
        outputStream.write((event.toString() + delimiter).getBytes(Charset.defaultCharset()));
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema() {
        // should return a list of all configuration options for this plugin
        return Collections.singletonList(DELIMITER_CONFIG);
    }

    @Override
    public Codec cloneCodec() {
        return new JavaCodecExample(this.delimiter);
    }

    @Override
    public String getId() {
        return this.id;
    }

}
  1. 这是一个简单的实现

让我们逐步检查该类的每个部分。

@LogstashPlugin(name="java_codec_example")
public class JavaCodecExample implements Codec {

关于类声明的说明

  • 所有 Java 插件都必须使用 @LogstashPlugin 注释进行注释。 此外

    • 必须提供注释的 name 属性,该属性定义了插件在 Logstash 管道定义中使用的名称。 例如,此编解码器将在 Logstash 管道定义的适当输入或输出的编解码器部分中引用为 codec => java_codec_example { }
    • name 属性的值必须与类名匹配,不区分大小写和下划线。
  • 该类必须实现 co.elastic.logstash.api.Codec 接口。

  • 不得在 org.logstashco.elastic.logstash 包中创建 Java 插件,以防止与 Logstash 本身中的类发生潜在冲突。

以下代码段包含设置定义和引用它的方法

public static final PluginConfigSpec<String> DELIMITER_CONFIG =
        PluginConfigSpec.stringSetting("delimiter", ",");

@Override
public Collection<PluginConfigSpec<?>> configSchema() {
    return Collections.singletonList(DELIMITER_CONFIG);
}

PluginConfigSpec 类允许开发人员指定插件支持的设置,包括设置名称、数据类型、弃用状态、必需状态和默认值。 在此示例中,delimiter 设置定义了编解码器将事件拆分的分隔符。 它不是必需的设置,如果未显式设置,则其默认值为 ,

configSchema 方法必须返回插件支持的所有设置的列表。 Logstash 执行引擎将验证所有必需的设置是否存在以及是否存在不受支持的设置。

private final String id;
private final String delimiter;

public JavaCodecExample(final Configuration config, final Context context) {
    this(config.get(DELIMITER_CONFIG));
}

private JavaCodecExample(String delimiter) {
    this.id = UUID.randomUUID().toString();
    this.delimiter = delimiter;
}

所有 Java 编解码器插件都必须具有一个采用 ConfigurationContext 参数的构造函数。 这是将在运行时用于实例化它们的构造函数。 所有插件设置的检索和验证都应在此构造函数中进行。 在此示例中,用于分隔事件的分隔符从其设置中检索并存储在局部变量中,以便稍后在 decodeencode 方法中使用。 编解码器的 ID 初始化为随机 UUID(大多数编解码器都应该这样做),并且本地 encoder 变量被初始化为使用指定的字符集进行编码和解码。

任何其他初始化也可以在构造函数中进行。 如果在编解码器插件的配置或初始化中遇到任何无法恢复的错误,则应引发描述性异常。 该异常将被记录,并将阻止 Logstash 启动。

@Override
public void decode(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
    // a not-production-grade delimiter decoder
    byte[] byteInput = new byte[byteBuffer.remaining()];
    byteBuffer.get(byteInput);
    if (byteInput.length > 0) {
        String input = new String(byteInput);
        String[] split = input.split(delimiter);
        for (String s : split) {
            Map<String, Object> map = new HashMap<>();
            map.put("message", s);
            consumer.accept(map);
        }
    }
}

@Override
public void flush(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
    // if the codec maintains any internal state such as partially-decoded input, this
    // method should flush that state along with any additional input supplied in
    // the ByteBuffer

    decode(byteBuffer, consumer);
}

@Override
public void encode(Event event, OutputStream outputStream) throws IOException {
    outputStream.write((event.toString() + delimiter).getBytes(Charset.defaultCharset()));
}
  1. 这是一个简单的实现

decodeflushencode 方法提供了编解码器的核心功能。 编解码器可由输入使用,以将字节序列或流解码为事件,或者由输出使用,以将事件编码为字节序列。

decode 方法从指定的 ByteBuffer 解码事件,并将它们传递给提供的 Consumer。 输入必须提供一个 ByteBuffer,该 ByteBuffer 已准备好读取,其中 byteBuffer.position() 指示要读取的下一个位置,byteBuffer.limit() 指示缓冲区中不安全读取的第一个字节。 编解码器必须确保 byteBuffer.position() 反映上次读取的位置,然后才能将控制权返回给输入。 然后,输入负责在恢复写入之前通过 byteBuffer.clear()byteBuffer.compact() 将缓冲区返回到写入模式。 在上面的示例中,decode 方法只是将传入的字节流拆分到指定的分隔符上。 生产级编解码器(例如 java-line)不会做出简化的假设,即提供的字节流的结尾与事件的结尾相对应。

事件应构造为 Map<String, Object> 的实例,并通过 Consumer<Map<String, Object>>.accept() 方法推送到事件管道中。 为了减少分配和 GC 压力,编解码器可以通过在调用 Consumer<Map<String, Object>>.accept() 之间修改其字段来重用相同的映射实例,因为事件管道将基于映射数据的副本创建事件。

flush 方法与 decode 方法协同工作,以从指定的 ByteBuffer 解码所有剩余事件,以及可能在先前调用 decode 方法后剩余的任何内部状态。 作为编解码器可能维护的内部状态的一个示例,请考虑一个字节输入流 event1/event2/event3,其分隔符为 /。 由于缓冲或其他原因,输入可能会向编解码器的 decode 方法提供部分字节流,例如 event1/eve。 在这种情况下,编解码器可以保存第二个事件的前三个字符 eve,而不是假设提供的字节流在事件边界处结束。 如果下一次调用 decode 提供了 nt2/ev 字节,则编解码器会将保存的 eve 字节添加到前面以生成完整的 event2 事件,然后保存剩余的 ev 字节,以便在该事件的剩余字节被提供时进行解码。 对 flush 的调用表示提供的字节表示事件流的结尾,所有剩余的字节都应解码为事件。 上面的 flush 示例是一个简单的实现,它不维护跨 decode 调用的部分提供的字节流的任何状态。

encode 方法将事件编码为字节序列,并将其写入指定的 OutputStream。 由于单个编解码器实例在 Logstash 管道的输出阶段的所有管道工作程序之间共享,因此编解码器不应跨对其 encode 方法的调用保留状态。

@Override
public Codec cloneCodec() {
    return new JavaCodecExample(this.delimiter);
}

cloneCodec 方法应返回编解码器的相同实例,但其 ID 除外。 由于编解码器可能跨对其 decode 方法的调用是有状态的,因此多线程的输入插件应通过每个线程的 cloneCodec 方法使用每个编解码器的单独实例。 由于单个编解码器实例在 Logstash 管道的输出阶段的所有管道工作程序之间共享,因此编解码器不应跨对其 encode 方法的调用保留状态。 在上面的示例中,编解码器使用相同的分隔符但不同的 ID 进行克隆。

@Override
public String getId() {
    return id;
}

对于编解码器插件,getId 方法应始终返回在实例化时设置的 ID。 这通常是一个 UUID。

最后但并非最不重要的是,强烈建议进行单元测试。 示例编解码器插件包含一个示例单元测试,您可以将其用作自己测试的模板。

Java 插件打包为 Ruby gem,用于依赖项管理以及与 Ruby 插件的互操作性。 一旦它们被打包为 gem,就可以使用 logstash-plugin 实用程序安装它们,就像安装 Ruby 插件一样。 由于 Java 插件开发不应要求了解 Ruby 或其工具链,因此通过示例 Java 插件提供的 Gradle 构建文件中的自定义任务,已自动执行将 Java 插件打包为 Ruby gem 的过程。 以下各节介绍了如何配置和执行该打包任务,以及如何在 Logstash 中安装打包的 Java 插件。

以下部分显示在示例 Java 插件提供的 build.gradle 文件的顶部附近

// ===========================================================================
// plugin info
// ===========================================================================
group                      'org.logstashplugins'
version                    "${file("VERSION").text.trim()}"
description                = "Example Java filter implementation"
pluginInfo.licenses        = ['Apache-2.0']
pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
pluginInfo.authors         = ['Elasticsearch']
pluginInfo.email           = ['info@elastic.co']
pluginInfo.homepage        = "https://elastic.ac.cn/guide/en/logstash/current/index.html"
pluginInfo.pluginType      = "filter"
pluginInfo.pluginClass     = "JavaFilterExample"
pluginInfo.pluginName      = "java_filter_example"
// ===========================================================================
  1. 必须与主插件类的包匹配
  2. 从必需的 VERSION 文件中读取
  3. SPDX 许可证 ID 列表

您应该为您的插件配置以上值。

  • version 值将自动从插件代码库根目录中的 VERSION 文件读取。
  • pluginInfo.pluginType 应设置为 inputfiltercodecoutput 中的一个。
  • pluginInfo.pluginName 必须与主插件类上的 @LogstashPlugin 注解中指定的名称相匹配。 Gradle 打包任务将验证这一点,如果不匹配,则返回错误。

需要多个 Ruby 源文件以及 gemspec 文件和一个 Gemfile 才能将插件打包为 Ruby gem。 这些 Ruby 文件仅用于定义 Ruby gem 结构,或在 Logstash 启动时注册 Java 插件。 它们不会在运行时事件处理期间使用。 Gradle 打包任务会根据上面部分中配置的值自动生成所有这些文件。

您可以使用以下命令运行 Gradle 打包任务

./gradlew gem

对于 Windows 平台:在命令中,根据需要将 gradlew.bat 替换为 ./gradlew

该任务将在插件代码库的根目录中生成一个 gem 文件,文件名为 logstash-{{plugintype}}-<pluginName>-<version>.gem

将 Java 插件打包为 Ruby gem 后,可以使用以下命令将其安装到 Logstash 中

bin/logstash-plugin install --no-verify --local /path/to/javaPlugin.gem

对于 Windows 平台:根据需要在命令中将反斜杠替换为正斜杠。

要测试插件,请使用以下命令启动 Logstash

echo "foo,bar" | bin/logstash -e 'input { java_stdin { codec => java_codec_example } }'

上述配置的预期 Logstash 输出(不包括初始化)是

{
      "@version" => "1",
       "message" => "foo",
    "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ,
          "host" => "<yourHostName>"
}
{
      "@version" => "1",
       "message" => "bar\n",
    "@timestamp" => yyyy-MM-ddThh:mm:ss.SSSZ,
          "host" => "<yourHostName>"
}

如果您对 Logstash 中的 Java 插件支持有任何反馈,请在我们的 Github 主要问题上发表评论,或在 Logstash 论坛中发帖。

© . All rights reserved.