错误处理程序

编辑

在 6.4 中添加。

Apache Hadoop 的 Elasticsearch 设计为一种几乎无需手动干预的集成。大多数功能通过约定和配置进行管理,无需大量的代码即可启动并运行连接器。在异常处理方面,我们投入了大量精力来处理来自 Elasticsearch 的最常见和预期错误。如果错误是意外的或表明存在实际问题,连接器将采用“快速失败”方法。我们意识到这种方法并非对所有用户都是最佳的,特别是那些关注作业正常运行时间的用户。

在这些情况下,用户可以通过指定遇到意外错误时要采取的操作来处理它们。为此,我们提供了一组 API 和扩展,用户可以实现这些 API 和扩展,以根据自己的需求定制连接器在最常见的位置遇到故障时的行为。

错误处理程序机制

编辑

在 elasticsearch-hadoop 中可以处理的每种类型的故障都有其自己的错误处理程序 API,该 API 针对正在执行的操作量身定制。错误处理程序只是在发生错误时调用的函数,并通知 elasticsearch-hadoop 如何继续。可以为每种类型的故障指定多个错误处理程序。当发生故障时,每个错误处理程序将按照其在配置中指定的顺序执行。

错误处理程序会收到有关执行的操作以及遇到的任何错误详细信息。然后,处理程序可以确认并消耗该故障,告知 elasticsearch-hadoop 忽略该错误。或者,处理程序可以标记错误以重新抛出,从而可能结束作业。错误处理程序还可以选择修改操作的参数并重试。最后,如果处理程序不知道如何处理故障,它也可以将故障“传递”给列表中的下一个处理程序。

如果提供的处理程序列表中的每个处理程序都选择“传递”,则该错误将被标记为未处理的错误,并且异常将被重新抛出,从而可能结束作业。连接器附带一些默认错误处理程序,它们可以处理大多数用例,但是如果您发现需要更具体的错误处理策略,则始终可以编写自己的处理程序。

在以下各节中,我们将详细介绍不同类型的错误处理程序、调用它们的位置、如何配置它们以及如果需要,如何编写自己的错误处理程序。

批量写入错误处理程序

编辑

写入数据时,连接器会将文档批量处理为批量请求,然后再将其发送到 Elasticsearch。在响应中,Elasticsearch 会返回每个已发送文档的状态,其中可能包括拒绝或失败。此处遇到的一个常见错误是拒绝,这意味着该文档要写入的分片太忙,无法接受写入。此处其他失败可能包括由于不符合当前索引映射或与文档的当前版本冲突而被拒绝的文档。

Apache Hadoop 的 Elasticsearch 提供了一个 API 来处理来自批量响应的文档级错误。批量写入的错误处理程序会获得

  • 尝试的原始 JSON 批量条目
  • 错误消息
  • 文档的 HTTP 状态代码
  • 当前文档已发送到 Elasticsearch 的次数

连接器提供了一些默认错误处理程序

HTTP 重试处理程序

编辑

始终配置为批量写入的第一个错误处理程序。

此处理程序检查失败的常见 HTTP 代码,这些代码可以重试。这些代码通常表示该文档要写入的分片太忙,无法接受写入,并且该文档被拒绝以减少负载。此处理程序始终配置为首先运行批量失败。用户配置的所有处理程序都按顺序放在此处理程序之后。

尽管无法修改错误处理程序列表中的处理程序位置,但是可以通过调整以下配置来修改其行为

es.batch.write.retry.policy(默认值:simple)
定义用于确定哪些 HTTP 代码可以重试的策略。默认值 simple 允许重试 429(请求过多)和 503(不可用)。将值设置为 none 将不允许重试任何状态代码。
es.batch.write.retry.count(默认值 3)
如果 Elasticsearch 过载并且数据被拒绝,则给定批次的重试次数。请注意,只有被拒绝的数据才会重试。如果在执行重试后仍有数据被拒绝,则 Hadoop 作业将被取消(并失败)。负值表示无限重试;请谨慎设置此值,因为它可能会产生不良的副作用。
es.batch.write.retry.wait(默认值 10 秒)
由于批量拒绝而导致的批量写入重试之间的等待时间。

删除和记录错误处理程序

编辑

默认处理程序名称:log

当调用此处理程序时,它会记录一条消息,其中包含失败的 JSON 批量条目、错误消息以及任何先前的处理程序消息。记录此消息后,处理程序会发出信号,表明该错误已被确认,从而消耗/忽略该错误。

此处理程序的可用配置

es.write.rest.error.handler.log.logger.name(必需)
创建用于记录错误的记录器实例时要使用的字符串名称。如果使用此处理程序,则此设置是必需的。
es.write.rest.error.handler.log.logger.class(logger.name 的替代方案)
创建用于记录错误的记录器实例时要使用的类名称。可以使用此设置代替必需的设置 es.write.rest.error.handler.log.logger.name
es.write.rest.error.handler.log.logger.level(默认值:WARN)
记录错误消息时要使用的记录器级别。可用选项为 FATALERRORWARNINFODEBUGTRACE

失败时中止错误处理程序

编辑

默认处理程序名称:fail

当调用此处理程序时,它会重新抛出给它的错误并中止。此处理程序始终加载并自动放置在错误处理程序列表的末尾。

此处理程序没有配置。

Elasticsearch 错误处理程序

编辑

默认处理程序名称:es

当调用此处理程序时,它会将给定的错误转换为 JSON 文档并将其插入到 Elasticsearch 索引中。当此索引操作完成时,可以配置该处理程序以发出错误已处理的信号(默认),或者将错误传递给链中的下一个处理程序。ES 处理程序使用其自己的 Elastic Common Schema 映射序列化错误信息。此数据可以接受其他用户提供的元数据,这些元数据可以帮助用户搜索和报告失败(请参见下面的标签和标记设置)。

错误处理程序构建为按记录/错误执行。Elasticsearch 错误处理程序当前不批量插入错误信息。每个错误都一次插入一个,以确保处理每个记录/错误。在需要处理大量错误事件的情况下,此处理程序可能无法提供非常高的吞吐量。您应该尝试设计您的作业,以便错误处理是相对不常见的事件。

此处理程序的可用配置

es.write.rest.error.handler.es.client.nodes(默认值:“localhost”或当前配置的节点)
用于写入错误的逗号分隔的节点地址字符串。建议此群集与正在写入的群集不同(以避免写入争用)。
es.write.rest.error.handler.es.client.port(默认值:9200 或当前配置的端口)
连接到 Elasticsearch 节点时要使用的 HTTP 端口。
es.write.rest.error.handler.es.client.resource(必需)
用于写入错误信息的索引。强烈建议此索引仅用于错误信息。不支持索引模式。
es.write.rest.error.handler.es.client.inherit(默认值:true)
确定用于发送错误信息的客户端的设置是否应继承当前正在运行的作业中的相同客户端设置。默认情况下,作业配置中的所有客户端设置都由此客户端继承。
es.write.rest.error.handler.es.client.conf.<配置>
此配置前缀用于在处理程序的基础 ES 客户端中设置客户端配置值。这接受 配置 中记录的大多数设置。
es.write.rest.error.handler.es.label.<标签>(可选)
添加到此处理程序创建的每个错误事件的用户定义的标签字段。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.tags(可选)
添加到此处理程序创建的每个错误事件的逗号分隔标记字符串。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.return.default(默认值:HANDLED)
当错误成功写入 Elasticsearch 时,要返回给错误处理框架的处理程序结果。可用值为 HANDLEDPASSABORT。默认结果为 HANDLED
es.write.rest.error.handler.es.return.default.reason(可选)
如果默认返回值是 PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。
es.write.rest.error.handler.es.return.error(默认值:ABORT)
当无法将错误写入 Elasticsearch 时,要返回给错误处理框架的处理程序结果。可用值为 HANDLEDPASSABORT。默认结果为 ABORT
es.write.rest.error.handler.es.return.error.reason(可选)
如果错误返回值是 PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。

使用批量错误处理程序

编辑

要配置批量错误处理程序,您必须使用以下属性按顺序指定处理程序。

设置 es.write.rest.error.handlers
列出用于批量写入错误处理的错误处理程序的名称,以及它们应该被调用的顺序。每个默认处理程序都可以通过其处理程序名称来引用,因为连接器知道如何加载它们。任何由用户或第三方代码提供的处理程序都需要使用 es.write.rest.error.handler. 前缀定义其处理程序名称。

对于批量写入失败,HTTP 重试内置处理程序始终作为第一个错误处理程序放置。此外,“失败时中止”内置处理程序始终作为最后一个错误处理程序放置,以捕获任何未处理的错误。这两个错误处理程序共同构成了 elasticsearch-hadoop 的默认批量写入错误处理行为,这与之前版本的行为一致。

  1. HTTP 重试内置处理程序:重试来自 Elasticsearch 的良性批量拒绝和失败,并将任何其他错误传递到下一级处理程序
  2. 任何配置的用户处理程序都将在此处执行。
  3. 失败时中止内置处理程序:重新抛出它遇到的任何错误

可以通过使用 handlers 属性将处理程序插入到链中来修改此行为。假设我们要记录所有错误并忽略它们。

es.write.rest.error.handlers = log 

指定默认的“丢弃并记录”处理程序

使用上述配置,处理程序列表现在如下所示

  1. HTTP 重试处理程序
  2. 丢弃并记录处理程序
  3. 失败时中止处理程序

如上所述,内置的 log 错误处理程序有一个必需的设置:用于记录器名称的名称。使用的记录器将遵循您已有的任何日志配置,因此需要一个名称供记录器使用

es.write.rest.error.handlers = log 
es.write.rest.error.handler.log.logger.name = BulkErrors 

指定默认的“丢弃并记录”内置处理程序

“丢弃并记录”内置处理程序会将所有错误记录到 BulkErrors 记录器

此时,“失败时中止”内置处理程序实际上会被忽略,因为“丢弃并记录”内置处理程序始终会将错误标记为已消耗。这种做法可能很危险,因为可能会忽略潜在的重要错误。在许多情况下,用户最好编写自己的错误处理程序来处理预期的异常。

编写您自己的批量错误处理程序

编辑

假设您正在将敏感的事务数据流式传输到 Elasticsearch。在这种情况下,您的数据经过仔细的版本控制,并且您利用 Elasticsearch 的版本系统来防止使用旧数据覆盖新数据。也许您的数据以某种方式分发,允许新数据在某些旧数据之前偷偷进入 Elasticsearch。不用担心,版本系统会拒绝旧数据并保持 Elasticsearch 中数据的完整性。这里的问题是您的流式传输作业失败了,因为返回了冲突错误,并且连接器不确定您是否期望发生这种情况。

让我们为此情况编写一个错误处理程序

package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteErrorHandler;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteFailure;
import org.elasticsearch.hadoop.rest.bulk.handler.DelayableErrorCollector;

public class IgnoreConflictsHandler extends BulkWriteErrorHandler { 

    private static final Logger LOGGER = ...; 

    @Override
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) 
    throws Exception
    {
        if (entry.getResponseCode() == 409) { 
            LOGGER.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED; 
        }
        return collector.pass("Not a conflict response code."); 
    }
}

我们创建一个类并扩展 BulkWriteErrorHandler 基类

使用首选的日志记录解决方案创建记录器

覆盖 onError 方法,该方法将使用错误详细信息调用

检查错误中的响应代码,查看是否为 409(冲突)

如果是冲突,则记录错误并返回 HandlerResult.HANDLED,以表示已确认错误

如果错误不是冲突,我们会将其传递给下一个错误处理程序,并说明我们无法处理的原因

在将此处理程序放置到批量写入错误处理程序列表中之前,我们必须使用 es.write.rest.error.handler.[HANDLER-NAME] 在设置中注册处理程序类的名称

设置 es.write.rest.error.handler.[HANDLER-NAME]
创建一个名为 HANDLER-NAME 的新处理程序。此属性的值必须是此处理程序要实例化的类的二进制名称。

在这种情况下,让我们为我们的忽略冲突处理程序注册一个处理程序名称

es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

现在我们有了处理程序的名称,我们可以在处理程序列表中使用它

es.write.rest.error.handlers = ignoreConflict
es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

现在,每当发生批量失败时,都会调用您的忽略冲突错误处理程序,并指示连接器可以忽略来自 Elasticsearch 的冲突响应代码。

高级概念

编辑

如果不是记录数据并删除它,而是想将其持久化到某个地方以安全保存呢?如果我们想将属性传递到我们的处理程序中以参数化其行为呢?让我们创建一个处理程序,将错误信息存储在本地文件中以供以后分析。

package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteErrorHandler;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteFailure;
import org.elasticsearch.hadoop.rest.bulk.handler.DelayableErrorCollector;

public class OutputToFileHandler extends BulkWriteErrorHandler { 

    private OutputStream outputStream;   
    private BufferedWriter writer;

    @Override
    public void init(Properties properties) {   
        try {
            outputStream = new FileOutputStream(properties.getProperty("filename"));   
            writer = new BufferedWriter(new OutputStreamWriter(outputStream));
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Could not open file", e);
        }
    }

    @Override
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector)   
    throws Exception
    {
        writer.write("Code: " + entry.getResponseCode());
        writer.newLine();
        writer.write("Error: " + entry.getException().getMessage());
        writer.newLine();
        for (String message : entry.previousHandlerMessages()) {
            writer.write("Previous Handler: " + message);           
            writer.newLine();
        }
        writer.write("Attempts: " + entry.getNumberOfAttempts());
        writer.newLine();
        writer.write("Entry: ");
        writer.newLine();
        IOUtils.copy(entry.getEntryContents(), writer);
        writer.newLine();

        return HandlerResult.HANDLED; 
    }

    @Override
    public void close() {   
        try {
            writer.close();
            outputStream.close();
        } catch (IOException e) {
            throw new RuntimeException("Closing file failed", e);
        }
    }
}

扩展 BulkWriteErrorHandler 基类

一些本地状态,用于将数据写入到文件中

我们覆盖 init 方法。此处理程序的任何属性都将在此处传入。

我们正在从属性中提取要写入的文件。我们将在下面看到如何设置此属性。

覆盖 onError 方法以定义我们的行为。

写出错误信息。这突出了 BulkWriteFailure 对象提供的所有可用数据。

返回 HANDLED 结果,以表示错误已处理。

最后,关闭所有内部分配的资源。

添加到此处理程序的是 initclose 方法。当任务开始时首次创建处理程序时调用 init 方法,当任务结束时调用 close 方法。init 方法接受一个 properties 参数,该参数包含通过使用 es.write.rest.error.handler.[HANDLER-NAME].[PROPERTY-NAME] 设置的任何处理程序特定属性。

设置 es.write.rest.error.handler.[HANDLER-NAME].[PROPERTY-NAME]
用于将属性传递到处理程序。HANDLER-NAME 是要配置的处理程序,PROPERTY-NAME 是要为处理程序设置的属性。

在我们的用例中,我们将按如下方式配置我们的文件日志记录错误处理程序

es.write.rest.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler   
es.write.rest.error.handler.writeFile.filename = /path/to/some/output/file   

我们使用名称 writeFile 注册我们的新处理程序

现在,我们为 writeFile 处理程序设置一个名为 filename 的属性。在处理程序的 init 方法中,可以通过使用 filename 作为属性键来获取该属性。

现在,将其与前面的示例(忽略冲突)结合起来

es.write.rest.error.handlers = ignoreConflict,writeFile

es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

es.write.rest.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler
es.write.rest.error.handler.writeFile.filename = /path/to/some/output/file

现在,您拥有一个处理程序链,默认情况下会重试批量拒绝(HTTP 重试内置处理程序),然后忽略任何冲突错误(我们自己的忽略冲突处理程序),然后通过将其写入文件来忽略任何其他错误(我们自己的输出到文件处理程序)。

序列化错误处理程序

编辑

在将数据发送到 Elasticsearch 之前,elasticsearch-hadoop 必须将每个文档序列化为 JSON 批量条目。正是在此过程中确定批量操作,提取文档元数据,并将特定于集成的数​​据结构转换为 JSON 文档。在此过程中,记录结构的不一致会导致序列化过程中抛出异常。这些错误通常会导致任务失败并停止处理。

用于 Apache Hadoop 的 Elasticsearch 提供了一个 API 来处理记录级别的序列化错误。序列化错误的处理程序如下:

  • 无法序列化的特定于集成的数据结构
  • 序列化期间遇到的异常

序列化错误处理程序尚不适用于 Hive。用于 Apache Hadoop 的 Elasticsearch 使用 Hive 的 SerDe 构造在发送到输出格式之前将数据转换为批量条目。SerDe 对象没有在对象结束其生命周期时调用的清理方法。因此,我们不支持 Hive 中的序列化错误处理程序,因为它们无法在作业执行结束时关闭。

连接器提供了一些默认错误处理程序

丢弃并记录错误处理程序

编辑

默认处理程序名称:log

调用此处理程序时,它会记录一条消息,其中包含失败的数据结构的 toString() 内容、错误消息以及任何先前的处理程序消息。记录此消息后,处理程序会发出信号,表示该错误已被确认,从而消耗/忽略该错误。

此处理程序的可用配置

es.write.data.error.handler.log.logger.name (必需)
创建用于记录错误的记录器实例时要使用的字符串名称。如果使用此处理程序,则此设置是必需的。
es.write.data.error.handler.log.logger.class (logger.name 的替代项)
创建用于记录错误的记录器实例时要使用的类名。可以使用此设置来代替必需的设置 es.write.data.error.handler.log.logger.name
es.write.data.error.handler.log.logger.level (默认值:WARN)
记录错误消息时要使用的记录器级别。可用选项为 FATALERRORWARNINFODEBUGTRACE

失败时中止错误处理程序

编辑

默认处理程序名称:fail

当调用此处理程序时,它会重新抛出给它的错误并中止。此处理程序始终加载并自动放置在错误处理程序列表的末尾。

此处理程序没有配置。

Elasticsearch 错误处理程序

编辑

默认处理程序名称:es

当调用此处理程序时,它会将给定的错误转换为 JSON 文档并将其插入到 Elasticsearch 索引中。当此索引操作完成时,可以配置该处理程序以发出错误已处理的信号(默认),或者将错误传递给链中的下一个处理程序。ES 处理程序使用其自己的 Elastic Common Schema 映射序列化错误信息。此数据可以接受其他用户提供的元数据,这些元数据可以帮助用户搜索和报告失败(请参见下面的标签和标记设置)。

错误处理程序构建为按记录/错误执行。Elasticsearch 错误处理程序当前不批量插入错误信息。每个错误都一次插入一个,以确保处理每个记录/错误。在需要处理大量错误事件的情况下,此处理程序可能无法提供非常高的吞吐量。您应该尝试设计您的作业,以便错误处理是相对不常见的事件。

此处理程序的可用配置

es.write.rest.error.handler.es.client.nodes(默认值:“localhost”或当前配置的节点)
用于写入错误的逗号分隔的节点地址字符串。建议此群集与正在写入的群集不同(以避免写入争用)。
es.write.rest.error.handler.es.client.port(默认值:9200 或当前配置的端口)
连接到 Elasticsearch 节点时要使用的 HTTP 端口。
es.write.rest.error.handler.es.client.resource(必需)
用于写入错误信息的索引。强烈建议此索引仅用于错误信息。不支持索引模式。
es.write.rest.error.handler.es.client.inherit(默认值:true)
确定用于发送错误信息的客户端的设置是否应继承当前正在运行的作业中的相同客户端设置。默认情况下,作业配置中的所有客户端设置都由此客户端继承。
es.write.rest.error.handler.es.client.conf.<配置>
此配置前缀用于在处理程序的基础 ES 客户端中设置客户端配置值。这接受 配置 中记录的大多数设置。
es.write.rest.error.handler.es.label.<标签>(可选)
添加到此处理程序创建的每个错误事件的用户定义的标签字段。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.tags(可选)
添加到此处理程序创建的每个错误事件的逗号分隔标记字符串。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.return.default(默认值:HANDLED)
当错误成功写入 Elasticsearch 时,要返回给错误处理框架的处理程序结果。可用值为 HANDLEDPASSABORT。默认结果为 HANDLED
es.write.rest.error.handler.es.return.default.reason(可选)
如果默认返回值是 PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。
es.write.rest.error.handler.es.return.error(默认值:ABORT)
当无法将错误写入 Elasticsearch 时,要返回给错误处理框架的处理程序结果。可用值为 HANDLEDPASSABORT。默认结果为 ABORT
es.write.rest.error.handler.es.return.error.reason(可选)
如果错误返回值是 PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。

使用序列化错误处理程序

编辑

要配置序列化错误处理程序,您必须使用以下属性按顺序指定处理程序。

设置 es.write.data.error.handlers
列出用于序列化错误处理的错误处理程序的名称,以及它们应该被调用的顺序。每个默认处理程序都可以通过其处理程序名称来引用,因为连接器知道如何加载它们。任何由用户或第三方代码提供的处理程序都需要使用 es.write.data.error.handler. 前缀定义其处理程序名称。

对于序列化失败,“失败时中止”内置处理程序始终作为最后一个错误处理程序放置,以捕获任何未处理的错误。此错误处理程序构成了 elasticsearch-hadoop 的默认序列化错误处理行为,这与之前版本的行为一致。

  1. 任何配置的用户处理程序都将在此处执行。
  2. 失败时中止内置处理程序:重新抛出它遇到的任何错误

可以通过使用 handlers 属性将处理程序插入到链中来修改此行为。假设我们要记录所有错误并忽略它们。

es.write.data.error.handlers = log 

指定默认的“丢弃并记录”处理程序

使用上述配置,处理程序列表现在如下所示

  1. 丢弃并记录处理程序
  2. 失败时中止处理程序

如上所述,内置的 log 错误处理程序有一个必需的设置:用于记录器名称的名称。使用的记录器将遵循您已有的任何日志配置,因此需要一个名称供记录器使用

es.write.data.error.handlers = log 
es.write.data.error.handler.log.logger.name = SerializationErrors 

指定默认的“丢弃并记录”内置处理程序

“丢弃并记录”内置处理程序会将所有错误记录到 SerializationErrors 记录器

此时,“失败时中止”内置处理程序实际上会被忽略,因为“丢弃并记录”内置处理程序始终会将错误标记为已消耗。这种做法可能很危险,因为可能会忽略潜在的重要错误。在许多情况下,用户最好编写自己的错误处理程序来处理预期的异常。

编写您自己的序列化处理程序

编辑

假设您正在将一些非结构化数据流式传输到 Elasticsearch。在这种情况下,您的数据未完全清理,可能包含连接器无法转换为 JSON 的字段值。您可能不希望您的流式传输作业因这些数据而失败,因为您可能会期望它包含错误。在这种情况下,您可能希望以比依赖日志记录解决方案的 toString() 方法来获取数据更全面的方式来记录数据。

让我们为此情况编写一个错误处理程序

package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure;

public class CustomLogOnError extends SerializationErrorHandler {      

    private Log logger = ???; 

    @Override
    public HandlerResult onError(SerializationFailure entry, ErrorCollector<Object> collector) throws Exception {  
        MyRecord record = (MyRecord) entry.getRecord();                             
        logger.error("Could not serialize record. " +
                "Record data : " + record.getSpecificField() + ", " + record.getOtherField(), entry.getException()); 
        return HandlerResult.HANDLED;                                               
    }
}

我们创建一个类并扩展 SerializationErrorHandler 基类

使用首选的日志记录解决方案创建记录器

覆盖 onError 方法,该方法将使用错误详细信息调用

检索未能序列化的记录。将其强制转换为您希望从作业中获得的记录类型

记录您感兴趣的数据中的特定信息

最后,在记录错误后,返回 HandlerResult.HANDLED,以表示已确认错误

在将此处理程序放置到序列化错误处理程序列表中之前,我们必须使用 es.write.data.error.handler.[HANDLER-NAME] 在设置中注册处理程序类的名称

设置 es.write.data.error.handler.[HANDLER-NAME]
创建一个名为 HANDLER-NAME 的新处理程序。此属性的值必须是此处理程序要实例化的类的二进制名称。

在这种情况下,让我们为我们的忽略冲突处理程序注册一个处理程序名称

es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError

现在我们有了处理程序的名称,我们可以在处理程序列表中使用它

es.write.data.error.handlers = customLog
es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError

现在,每当发生序列化失败时,都会调用您的自定义日志记录错误处理程序,并指示连接器可以忽略这些失败以继续处理。

高级概念

编辑

如果我们不想仅仅记录数据然后丢弃,而是想将其持久化保存到某个地方呢?如果我们想将属性传递给处理器来参数化其行为呢?让我们创建一个处理器,将错误信息存储在本地文件中以供后续分析。

package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure;

public class OutputToFileHandler extends SerializationErrorHandler { 

    private OutputStream outputStream;   
    private BufferedWriter writer;

    @Override
    public void init(Properties properties) {   
        try {
            outputStream = new FileOutputStream(properties.getProperty("filename"));   
            writer = new BufferedWriter(new OutputStreamWriter(outputStream));
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Could not open file", e);
        }
    }

    @Override
    public HandlerResult onError(SerializationFailure entry, ErrorCollector<Object> collector)   
    throws Exception
    {
        writer.write("Record: " + entry.getRecord().toString());
        writer.newLine();
        writer.write("Error: " + entry.getException().getMessage());
        writer.newLine();
        for (String message : entry.previousHandlerMessages()) {
            writer.write("Previous Handler: " + message);           
            writer.newLine();
        }

        return HandlerResult.PASS; 
    }

    @Override
    public void close() {   
        try {
            writer.close();
            outputStream.close();
        } catch (IOException e) {
            throw new RuntimeException("Closing file failed", e);
        }
    }
}

扩展 SerializationErrorHandler 基类

一些本地状态,用于将数据写入到文件中

我们覆盖 init 方法。此处理程序的任何属性都将在此处传入。

我们正在从属性中提取要写入的文件。我们将在下面看到如何设置此属性。

覆盖 onError 方法以定义我们的行为。

写出错误信息。这突出了 SerializationFailure 对象提供的所有可用数据。

返回 PASS 结果,表示该错误应传递给链中的下一个错误处理器。

最后,关闭所有内部分配的资源。

此处理器还添加了 initclose 方法。init 方法在任务开始时首次创建处理器时调用,而 close 方法在任务结束时调用。init 方法接受一个 properties 参数,其中包含使用 es.write.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME] 设置的任何处理器特定属性。

设置 es.write.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]
用于将属性传递到处理程序。HANDLER-NAME 是要配置的处理程序,PROPERTY-NAME 是要为处理程序设置的属性。

在我们的用例中,我们将按如下方式配置我们的文件日志记录错误处理程序

es.write.data.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler   
es.write.data.error.handler.writeFile.filename = /path/to/some/output/file 

我们使用名称 writeFile 注册我们的新处理程序

现在,我们为 writeFile 处理程序设置一个名为 filename 的属性。在处理程序的 init 方法中,可以通过使用 filename 作为属性键来获取该属性。

现在将它们组合在一起

es.write.data.error.handlers = writeFile,customLog

es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError

es.write.data.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler
es.write.data.error.handler.writeFile.filename = /path/to/some/output/file

你现在有一个处理器链,它将所有关于失败的相关数据写入一个文件(我们的 writeFile 处理器),然后使用自定义日志行记录错误并忽略该错误以继续处理(我们的 customLog 处理器)。

反序列化错误处理器

编辑

读取数据时,连接器针对配置的索引执行滚动请求并读取其内容。对于滚动搜索结果中的每个命中,连接器会尝试将其反序列化为特定于集成的记录类型。当使用 MapReduce 时,此数据类型是 MapWritable 或 Text(对于原始 JSON 数据)。对于像 Spark SQL 这样使用数据模式的集成,生成的数据类型是 Row 对象。

Elasticsearch 将文档存储在 Lucene 索引中。这些文档有时可能具有松散的定义,或者具有由于某种原因无法解析为基于模式的数据类型的结构。有时,字段的格式可能无法正确读取。

Elasticsearch for Apache Hadoop 提供了一个 API 来处理来自滚动响应的文档级反序列化错误。为滚动读取提供了错误处理器

  • 尝试过的原始 JSON 搜索结果
  • 遇到的异常

注意:反序列化错误处理器仅允许处理从滚动响应中解析文档时发生的错误。搜索结果可能被成功读取,但仍然格式错误,从而在框架的完全不同的部分中使用时引发异常。此错误处理器从滚动读取过程中处理异常的最合理位置的顶部调用,但这并不封装每个集成的所有逻辑。

连接器提供了一些默认错误处理程序

丢弃并记录错误处理器

编辑

默认处理程序名称:log

当调用此处理器时,它会记录一条消息,其中包含失败的 JSON 搜索命中、错误消息和任何先前的处理器消息。记录此消息后,处理器会发出错误已确认的信号,从而消耗/忽略该错误。

此处理程序的可用配置

es.read.data.error.handler.log.logger.name (必需)
创建用于记录错误的记录器实例时要使用的字符串名称。如果使用此处理程序,则此设置是必需的。
es.read.data.error.handler.log.logger.class (logger.name 的替代方案)
创建用于记录错误的记录器实例时要使用的类名。可以使用此设置代替必需的设置 es.read.data.error.handler.log.logger.name
es.read.data.error.handler.log.logger.level (默认值: WARN)
记录错误消息时要使用的记录器级别。可用选项为 FATALERRORWARNINFODEBUGTRACE

失败时中止错误处理器

编辑

默认处理程序名称:fail

当调用此处理程序时,它会重新抛出给它的错误并中止。此处理程序始终加载并自动放置在错误处理程序列表的末尾。

此处理程序没有配置。

Elasticsearch 错误处理器

编辑

默认处理程序名称:es

当调用此处理程序时,它会将给定的错误转换为 JSON 文档并将其插入到 Elasticsearch 索引中。当此索引操作完成时,可以配置该处理程序以发出错误已处理的信号(默认),或者将错误传递给链中的下一个处理程序。ES 处理程序使用其自己的 Elastic Common Schema 映射序列化错误信息。此数据可以接受其他用户提供的元数据,这些元数据可以帮助用户搜索和报告失败(请参见下面的标签和标记设置)。

错误处理程序构建为按记录/错误执行。Elasticsearch 错误处理程序当前不批量插入错误信息。每个错误都一次插入一个,以确保处理每个记录/错误。在需要处理大量错误事件的情况下,此处理程序可能无法提供非常高的吞吐量。您应该尝试设计您的作业,以便错误处理是相对不常见的事件。

此处理程序的可用配置

es.write.rest.error.handler.es.client.nodes(默认值:“localhost”或当前配置的节点)
用于写入错误的逗号分隔的节点地址字符串。建议此群集与正在写入的群集不同(以避免写入争用)。
es.write.rest.error.handler.es.client.port(默认值:9200 或当前配置的端口)
连接到 Elasticsearch 节点时要使用的 HTTP 端口。
es.write.rest.error.handler.es.client.resource(必需)
用于写入错误信息的索引。强烈建议此索引仅用于错误信息。不支持索引模式。
es.write.rest.error.handler.es.client.inherit(默认值:true)
确定用于发送错误信息的客户端的设置是否应继承当前正在运行的作业中的相同客户端设置。默认情况下,作业配置中的所有客户端设置都由此客户端继承。
es.write.rest.error.handler.es.client.conf.<配置>
此配置前缀用于在处理程序的基础 ES 客户端中设置客户端配置值。这接受 配置 中记录的大多数设置。
es.write.rest.error.handler.es.label.<标签>(可选)
添加到此处理程序创建的每个错误事件的用户定义的标签字段。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.tags(可选)
添加到此处理程序创建的每个错误事件的逗号分隔标记字符串。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.return.default(默认值:HANDLED)
当错误成功写入 Elasticsearch 时,要返回给错误处理框架的处理程序结果。可用值为 HANDLEDPASSABORT。默认结果为 HANDLED
es.write.rest.error.handler.es.return.default.reason(可选)
如果默认返回值是 PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。
es.write.rest.error.handler.es.return.error(默认值:ABORT)
当无法将错误写入 Elasticsearch 时,要返回给错误处理框架的处理程序结果。可用值为 HANDLEDPASSABORT。默认结果为 ABORT
es.write.rest.error.handler.es.return.error.reason(可选)
如果错误返回值是 PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。

使用反序列化错误处理器

编辑

要配置反序列化错误处理器,必须按以下属性指定的顺序指定处理器。

设置 es.read.data.error.handlers
列出用于反序列化错误处理的错误处理器的名称以及应调用它们的顺序。每个默认处理器都可以通过其处理器名称引用,因为连接器知道如何加载它们。从用户或第三方代码提供的任何处理器都需要使用 es.read.data.error.handler. 前缀定义其处理器名称。

对于反序列化失败,内置的“失败时中止”处理器始终作为最后一个错误处理器放置,以捕获任何未处理的错误。此错误处理器单独构成了 elasticsearch-hadoop 的默认反序列化错误处理行为,这与以前版本的行为匹配。

  1. 任何配置的用户处理程序都将在此处执行。
  2. 失败时中止内置处理程序:重新抛出它遇到的任何错误

可以通过使用 handlers 属性将处理程序插入到链中来修改此行为。假设我们要记录所有错误并忽略它们。

es.read.data.error.handlers = log 

指定默认的“丢弃并记录”处理程序

使用上述配置,处理程序列表现在如下所示

  1. 丢弃并记录处理程序
  2. 失败时中止处理程序

如上所述,内置的 log 错误处理程序有一个必需的设置:用于记录器名称的名称。使用的记录器将遵循您已有的任何日志配置,因此需要一个名称供记录器使用

es.read.data.error.handlers = log 
es.read.data.error.handler.log.logger.name = DeserializationErrors 

指定默认的“丢弃并记录”内置处理程序

内置的“丢弃并记录”处理器会将所有错误记录到 DeserializationErrors 记录器

此时,“失败时中止”内置处理程序实际上会被忽略,因为“丢弃并记录”内置处理程序始终会将错误标记为已消耗。这种做法可能很危险,因为可能会忽略潜在的重要错误。在许多情况下,用户最好编写自己的错误处理程序来处理预期的异常。

编写你自己的反序列化错误处理器

编辑

假设你正在从 Elasticsearch 读取一个大型日志数据索引。在这种情况下,你的日志数据是高度非结构化的,并且其所有内容对于你的流程并非至关重要。由于读取的数据量很大,因此你的作业需要很长时间才能完成。在这种情况下,你可能希望将无法读取的记录替换为虚拟记录以标记失败,并且不中断你的处理。应记录并丢弃有问题的数据。

让我们为此情况编写一个错误处理程序

package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationFailure;

public class ReturnDummyHandler extends DeserializationErrorHandler { 

    private static final Logger LOGGER = ...; 
    private static final String DUMMY_RECORD = "..."; 

    @Override
    public HandlerResult onError(DeserializationFailure entry, ErrorCollector<byte[]> collector) 
    throws Exception
    {
        BufferedReader reader = new BufferedReader(new InputStreamReader(entry.getHitContents()));
        StringBuilder hitContent = new StringBuilder();
        for (String line = reader.readLine(); line != null; line = reader.readLine()) {           
            hitContent.append(line);
        }
        LOGGER.warn("Encountered malformed record during read. Replacing with dummy record. " +   
                            "Malformed Data: " + hitContent, entry.getException());
        return collector.retry(DUMMY_RECORD.getBytes());                                         
    }
}

我们创建一个类并扩展 DeserializationErrorHandler 基类

使用首选的日志记录解决方案创建记录器

我们创建一个 String 用于我们的虚拟记录,该虚拟记录应被反序列化

覆盖 onError 方法,该方法将使用错误详细信息调用

我们将失败的搜索命中的内容读取为 String

我们记录失败的文档的内容,以及详细说明失败原因的异常

最后,我们返回要反序列化的虚拟数据内容。

在我们可以将此处理器放置在反序列化错误处理器列表中之前,我们必须使用 es.read.data.error.handler.[HANDLER-NAME] 在设置中注册带有名称的处理器类

设置 es.read.data.error.handler.[HANDLER-NAME]
创建一个名为 HANDLER-NAME 的新处理程序。此属性的值必须是此处理程序要实例化的类的二进制名称。

在这种情况下,让我们为我们的虚拟记录处理器注册一个处理器名称

es.read.data.error.handler.returnDummy = org.myproject.myhandlers.ReturnDummyHandler

现在我们有了处理程序的名称,我们可以在处理程序列表中使用它

es.read.data.error.handlers = returnDummy
es.read.data.error.handler.returnDummy = org.myproject.myhandlers.ReturnDummyHandler

现在,每当发生反序列化失败时,都会调用你的虚拟数据错误处理器,并将指示连接器使用你提供的虚拟记录而不是格式错误的数据。

高级概念

编辑

如果不是记录数据并删除它,而是想将其持久化到某个地方以安全保存呢?如果我们想将属性传递到我们的处理程序中以参数化其行为呢?让我们创建一个处理程序,将错误信息存储在本地文件中以供以后分析。

package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationFailure;

public class ReturnDummyAndLogToFileHandler extends DeserializationErrorHandler { 

    private static final String DUMMY_RECORD = "...";

    private OutputStream outputStream;   
    private BufferedWriter writer;

    @Override
    public void init(Properties properties) {   
        try {
            outputStream = new FileOutputStream(properties.getProperty("filename"));   
            writer = new BufferedWriter(new OutputStreamWriter(outputStream));
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Could not open file", e);
        }
    }

    @Override
    public HandlerResult onError(DeserializationFailure entry, ErrorCollector<byte[]> collector)   
    throws Exception
    {
        BufferedReader reader = new BufferedReader(new InputStreamReader(entry.getHitContents()));
        StringBuilder hitContent = new StringBuilder();
        for (String line = reader.readLine(); line != null; line = reader.readLine()) {           
            hitContent.append(line);
        }

        writer.write("Error: " + entry.getException().getMessage());
        writer.newLine();
        for (String message : entry.previousHandlerMessages()) {
            writer.write("Previous Handler: " + message);           
            writer.newLine();
        }
        writer.write("Entry: ");
        writer.newLine();
        writer.write(hitContent.toString());
        writer.newLine();

        return collector.retry(DUMMY_RECORD.getBytes());            
    }

    @Override
    public void close() {   
        try {
            writer.close();
            outputStream.close();
        } catch (IOException e) {
            throw new RuntimeException("Closing file failed", e);
        }
    }
}

扩展 DeserializationErrorHandler 基类

一些本地状态,用于将数据写入到文件中

我们覆盖 init 方法。此处理器的任何属性都将在此处传递

我们正在从属性中提取要写入的文件。我们将在下面看到如何设置此属性

覆盖 onError 方法以定义我们的行为

读取失败的搜索命中的内容

写出错误信息。这突出了 DeserializationFailure 对象提供的所有可用数据

使用我们的虚拟记录执行重试操作

最后,关闭任何内部分配的资源

此处理器还添加了 initclose 方法。init 方法在首次创建滚动查询时在任务开始时调用,而 close 方法在任务结束时关闭滚动查询时调用。init 方法接受一个 properties 参数,其中包含使用 es.read.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME] 设置的任何处理器特定属性。

设置 es.read.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]
用于将属性传递到处理程序。HANDLER-NAME 是要配置的处理程序,PROPERTY-NAME 是要为处理程序设置的属性。

在我们的用例中,我们将按如下方式配置我们的文件日志记录错误处理程序

es.read.data.error.handler.writeFile = org.myproject.myhandlers.ReturnDummyAndLogToFileHandler   
es.read.data.error.handler.writeFile.filename = /path/to/some/output/file   

我们使用名称 writeFile 注册我们的新处理程序

现在,我们为 writeFile 处理程序设置一个名为 filename 的属性。在处理程序的 init 方法中,可以通过使用 filename 作为属性键来获取该属性。

现在将它与前面的示例结合在一起

es.read.data.error.handlers = writeFile
es.read.data.error.handler.writeFile = org.myproject.myhandlers.ReturnDummyAndLogToFileHandler
es.read.data.error.handler.writeFile.filename = /path/to/some/output/file

你现在有一个处理器,它可以重试用虚拟记录替换格式错误的数据,然后通过将这些格式错误的记录及其错误信息写入自定义文件来记录这些记录。