错误处理程序

编辑

在 6.4 中添加。

Elasticsearch for Apache Hadoop 旨在成为一个无需过多干预的集成。大多数功能都是通过约定和配置管理的,无需大量代码即可启动并运行连接器。在遇到异常时,我们已投入大量精力来处理 Elasticsearch 中最常见和预期的错误。在遇到意外错误或指示实际问题的情况下,连接器采用“快速失败”方法。我们意识到这种方法并非对所有用户都是最佳选择,尤其是那些关注作业正常运行时间的用户。

在这些情况下,用户可以通过指定遇到错误时要采取的操作来处理意外错误。为此,我们提供了一组 API 和扩展,用户可以实现这些 API 和扩展,以根据自己的需求调整连接器在最常见位置遇到故障时的行为。

错误处理程序机制

编辑

elasticsearch-hadoop 中可以处理的每种类型的故障都有其自己的错误处理程序 API,这些 API 专为正在执行的操作量身定制。错误处理程序只是一个在发生错误时调用的函数,并告知 elasticsearch-hadoop 如何继续执行。可以为每种类型的故障指定多个错误处理程序。发生故障时,每个错误处理程序将按其在配置中指定的顺序执行。

错误处理程序会获取有关执行的操作以及遇到的任何错误详细信息的信息。然后,处理程序可以确认并处理故障,并告诉 elasticsearch-hadoop 忽略错误。或者,处理程序可以将错误标记为重新抛出,这可能会导致作业结束。错误处理程序还可以选择修改操作的参数并重试。最后,如果处理程序不知道如何处理故障,它也可以将故障“传递”给列表中的下一个处理程序。

如果提供的处理程序列表中的每个处理程序都选择“传递”,则将其标记为未处理错误,并且异常将被重新抛出,这可能会导致作业结束。连接器附带了一些默认的错误处理程序,这些处理程序可以处理大多数用例,但是,如果您发现需要更具体的错误处理策略,则可以随时编写自己的错误处理程序。

在以下部分中,我们将详细介绍不同类型的错误处理程序、其调用位置、如何配置它们以及如何在需要时编写自己的错误处理程序。

批量写入错误处理程序

编辑

在写入数据时,连接器会将文档批量到批量请求中,然后再将它们发送到 Elasticsearch。在响应中,Elasticsearch 会返回发送的每个文档的状态,其中可能包括拒绝或失败。此处遇到的一个常见错误是拒绝,这意味着文档应该写入的分片太忙而无法接受写入。此处的其他故障可能包括由于不符合当前索引映射或与文档的当前版本冲突而被拒绝的文档。

Elasticsearch for Apache Hadoop 提供了一个 API 来处理来自批量响应的文档级错误。批量写入的错误处理程序将获得

  • 尝试过的原始 JSON 批量条目
  • 错误消息
  • 文档的 HTTP 状态代码
  • 当前文档已发送到 Elasticsearch 的次数

连接器提供了一些默认的错误处理程序

HTTP 重试处理程序

编辑

始终配置为批量写入的第一个错误处理程序。

此处理程序检查常见 HTTP 代码的故障,这些代码可以重试。这些代码通常表示文档将写入的分片太忙而无法接受写入,并且已拒绝文档以减轻负载。此处理程序始终配置为首先针对批量故障运行。用户配置的所有处理程序都按顺序放置在此处理程序之后。

虽然无法修改处理程序在错误处理程序列表中的位置,但可以通过调整以下配置来修改其行为

es.batch.write.retry.policy(默认值:simple)
定义确定哪些 http 代码可以重试的策略。默认值simple允许重试 429(请求过多)和 503(不可用)。将值设置为none将不允许重试任何状态代码。
es.batch.write.retry.count(默认值:3)
如果 Elasticsearch 过载且数据被拒绝,则给定批次的重试次数。请注意,只有被拒绝的数据才会被重试。如果在执行完重试后仍有数据被拒绝,则 Hadoop 作业将被取消(并失败)。负值表示无限次重试;在设置此值时要小心,因为它可能会产生意外的副作用。
es.batch.write.retry.wait(默认值:10 秒)
批量写入重试之间等待的时间,这些重试是由批量拒绝引起的。

丢弃并记录错误处理程序

编辑

默认处理程序名称:log

调用此处理程序时,它会记录一条消息,其中包含失败的 JSON 批量条目、错误消息以及任何以前的处理程序消息。记录此消息后,处理程序会发出已确认错误的信号,从而使用/忽略它。

此处理程序的可用配置

es.write.rest.error.handler.log.logger.name(必需)
创建日志记录器实例以记录错误时要使用的字符串名称。如果使用此处理程序,则此设置是必需的。
es.write.rest.error.handler.log.logger.class(替代 logger.name)
创建日志记录器实例以记录错误时要使用的类名。此设置可以替代必需的设置es.write.rest.error.handler.log.logger.name
es.write.rest.error.handler.log.logger.level(默认值:WARN)
记录错误消息时要使用的日志记录器级别。可用选项为FATALERRORWARNINFODEBUGTRACE

故障时中止错误处理程序

编辑

默认处理程序名称:fail

调用此处理程序时,它会重新抛出给它的错误并中止。此处理程序始终加载并自动放置在错误处理程序列表的末尾。

此处理程序没有配置。

Elasticsearch 错误处理程序

编辑

默认处理程序名称:es

调用此处理程序时,它会将给定的错误转换为 JSON 文档并将其插入 Elasticsearch 索引中。完成此索引操作后,可以将处理程序配置为发出已处理错误的信号(默认值),或将错误传递给链中的下一个处理程序。ES 处理程序使用 Elastic Common Schema 的自身映射序列化错误信息。此数据可以接受用户提供的其他元数据,以帮助用户搜索和报告故障(请参阅下面的标签和标记设置)。

错误处理程序构建为在每个记录/错误的基础上执行。Elasticsearch 错误处理程序目前不会批量插入错误信息。每个错误都是逐一插入的,以确保处理每个记录/错误。如果需要处理大量错误事件,此处理程序可能无法提供非常高的吞吐量。您应该尝试设计您的作业,以便错误处理是相对不常见的情况。

此处理程序的可用配置

es.write.rest.error.handler.es.client.nodes(默认值:“localhost”或当前配置的节点)
要将错误写入的节点地址的逗号分隔字符串。建议这与正在写入的集群不同(以避免写入争用)。
es.write.rest.error.handler.es.client.port(默认值:9200 或当前配置的端口)
连接到 Elasticsearch 节点时要使用的 http 端口。
es.write.rest.error.handler.es.client.resource(必需)
要将错误信息写入的索引。强烈建议此索引仅用于错误信息。不支持索引模式。
es.write.rest.error.handler.es.client.inherit(默认值:true)
确定用于发送错误信息的客户端的设置是否应继承当前正在运行的作业的相同客户端设置。默认情况下,作业配置中的所有客户端设置都由此客户端继承。
es.write.rest.error.handler.es.client.conf.<CONFIGURATION>
此配置前缀用于在处理程序的基础 ES 客户端中设置客户端配置值。它接受配置中记录的大多数设置。
es.write.rest.error.handler.es.label.<LABEL>(可选)
此处理程序创建的每个错误事件中添加的用户定义标签字段。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.tags(可选)
要添加到此处理程序创建的每个错误事件中的逗号分隔的标记字符串。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.return.default(默认值:HANDLED)
将错误成功写入 Elasticsearch 时返回给错误处理框架的处理程序结果。可用值为HANDLEDPASSABORT。默认结果为HANDLED
es.write.rest.error.handler.es.return.default.reason(可选)
如果默认返回值为PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。
es.write.rest.error.handler.es.return.error(默认值:ABORT)
无法将错误写入 Elasticsearch 时返回给错误处理框架的处理程序结果。可用值为HANDLEDPASSABORT。默认结果为ABORT
es.write.rest.error.handler.es.return.error.reason(可选)
如果错误返回值为PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。

使用批量错误处理程序

编辑

要配置批量错误处理程序,必须按顺序使用以下属性指定处理程序。

设置es.write.rest.error.handlers

列出用于批量写入错误处理的错误处理程序名称,以及调用它们的顺序。每个默认处理程序都可以通过其处理程序名称来引用,因为连接器知道如何加载它们。用户或第三方代码提供的任何处理程序都需要在其处理程序名称前加上es.write.rest.error.handler.前缀。

对于批量写入失败,HTTP 重试内置处理程序始终作为第一个错误处理程序放置。此外,失败时中止内置处理程序始终作为最后一个错误处理程序放置,以捕获任何未处理的错误。这两个错误处理程序本身构成了 elasticsearch-hadoop 的默认批量写入错误处理行为,这与以前版本的行为一致。

  1. HTTP 重试内置处理程序:重试来自 Elasticsearch 的良性批量拒绝和失败,并将任何其他错误传递给下一级。
  2. 任何已配置的用户处理程序都将在此处。
  3. 失败时中止内置处理程序:重新抛出遇到的任何错误。

此行为通过使用 handlers 属性将处理程序插入链中来修改。假设我们想记录所有错误并忽略它们。

es.write.rest.error.handlers = log 

指定默认的 Drop and Log 处理程序

使用上述配置,处理程序列表现在如下所示

  1. HTTP 重试处理程序
  2. Drop and Log 处理程序
  3. 失败时中止处理程序

如上所述,内置的log错误处理程序有一个必需的设置:用于日志记录器名称的内容。使用的日志记录器将尊重您已有的任何日志记录配置,因此需要一个要使用的日志记录器名称。

es.write.rest.error.handlers = log 
es.write.rest.error.handler.log.logger.name = BulkErrors 

指定默认的 Drop and Log 内置处理程序

Drop and Log 内置处理程序将所有错误记录到BulkErrors日志记录器中。

此时,失败时中止内置处理程序实际上被忽略了,因为 Drop and Log 内置处理程序将始终将错误标记为已使用。这种做法可能很危险,因为潜在的重要错误可能会被简单地忽略。在许多情况下,用户最好编写自己的错误处理程序来处理预期的异常。

编写您自己的批量错误处理程序

编辑

假设您正在将敏感的事务数据流式传输到 Elasticsearch。在这种情况下,您的数据经过仔细版本控制,并且您利用 Elasticsearch 的版本系统来防止用旧数据覆盖较新的数据。也许您的数据以某种方式分发,允许较新的数据在某些较旧的数据片段之前潜入 Elasticsearch。不用担心,版本系统将拒绝旧数据并保留 Elasticsearch 中数据的完整性。这里的问题是您的流式处理作业失败了,因为返回了冲突错误,并且连接器不确定您是否期望这样。

让我们为这种情况编写一个错误处理程序。

package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteErrorHandler;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteFailure;
import org.elasticsearch.hadoop.rest.bulk.handler.DelayableErrorCollector;

public class IgnoreConflictsHandler extends BulkWriteErrorHandler { 

    private static final Logger LOGGER = ...; 

    @Override
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) 
    throws Exception
    {
        if (entry.getResponseCode() == 409) { 
            LOGGER.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED; 
        }
        return collector.pass("Not a conflict response code."); 
    }
}

我们创建一个类并扩展 BulkWriteErrorHandler 基类。

使用首选的日志记录解决方案创建日志记录器。

覆盖onError方法,该方法将使用错误详细信息调用。

检查错误的响应代码,查看它是否为 409(冲突)。

如果它是冲突,则记录错误并返回HandlerResult.HANDLED以表示已确认错误。

如果错误不是冲突,我们将其与我们无法处理的原因一起传递给下一个错误处理程序。

在我们能够将此处理程序放入批量写入错误处理程序列表之前,我们必须使用es.write.rest.error.handler.[HANDLER-NAME]在设置中为处理程序类注册一个名称。

设置es.write.rest.error.handler.[HANDLER-NAME]
创建一个名为 HANDLER-NAME 的新处理程序。此属性的值必须是为此处理程序实例化的类的二进制名称。

在这种情况下,让我们为我们的忽略冲突处理程序注册一个处理程序名称。

es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

现在我们有了处理程序的名称,我们可以在处理程序列表中使用它。

es.write.rest.error.handlers = ignoreConflict
es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

现在,每当发生批量失败时,您的忽略冲突错误处理程序都会被调用,并将指示连接器可以忽略来自 Elasticsearch 的冲突响应代码。

高级概念

编辑

如果我们不希望记录数据并丢弃它,而是希望将其持久化到某个地方以确保安全呢?如果我们想将属性传递给我们的处理程序以参数化其行为呢?让我们创建一个将错误信息存储在本地文件中以供以后分析的处理程序。

package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteErrorHandler;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteFailure;
import org.elasticsearch.hadoop.rest.bulk.handler.DelayableErrorCollector;

public class OutputToFileHandler extends BulkWriteErrorHandler { 

    private OutputStream outputStream;   
    private BufferedWriter writer;

    @Override
    public void init(Properties properties) {   
        try {
            outputStream = new FileOutputStream(properties.getProperty("filename"));   
            writer = new BufferedWriter(new OutputStreamWriter(outputStream));
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Could not open file", e);
        }
    }

    @Override
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector)   
    throws Exception
    {
        writer.write("Code: " + entry.getResponseCode());
        writer.newLine();
        writer.write("Error: " + entry.getException().getMessage());
        writer.newLine();
        for (String message : entry.previousHandlerMessages()) {
            writer.write("Previous Handler: " + message);           
            writer.newLine();
        }
        writer.write("Attempts: " + entry.getNumberOfAttempts());
        writer.newLine();
        writer.write("Entry: ");
        writer.newLine();
        IOUtils.copy(entry.getEntryContents(), writer);
        writer.newLine();

        return HandlerResult.HANDLED; 
    }

    @Override
    public void close() {   
        try {
            writer.close();
            outputStream.close();
        } catch (IOException e) {
            throw new RuntimeException("Closing file failed", e);
        }
    }
}

扩展 BulkWriteErrorHandler 基类。

用于将数据写入文件的某些本地状态。

我们覆盖init方法。此处理程序的任何属性都将在此处传递。

我们正在从属性中提取要写入的文件。我们将在下面看到如何设置此属性。

覆盖onError方法以定义我们的行为。

写出错误信息。这突出了BulkWriteFailure对象提供的所有可用数据。

返回HANDLED结果以表示已处理错误。

最后,关闭任何内部分配的资源。

此处理程序中添加了initclose方法。init方法在任务开始时首次创建处理程序时调用,close方法在任务结束时调用。init方法接受一个 properties 参数,该参数包含通过使用es.write.rest.error.handler.[HANDLER-NAME].[PROPERTY-NAME]设置的任何特定于处理程序的属性。

设置es.write.rest.error.handler.[HANDLER-NAME].[PROPERTY-NAME]
用于将属性传递给处理程序。HANDLER-NAME 是要配置的处理程序,PROPERTY-NAME 是要为处理程序设置的属性。

在我们的用例中,我们将如下配置我们的文件日志记录错误处理程序。

es.write.rest.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler   
es.write.rest.error.handler.writeFile.filename = /path/to/some/output/file   

我们使用名称writeFile注册我们的新处理程序。

现在,我们为writeFile处理程序设置了一个名为filename的属性。在处理程序的init方法中,可以通过使用filename作为属性键来获取它。

现在将它与前面的示例(忽略冲突)结合起来。

es.write.rest.error.handlers = ignoreConflict,writeFile

es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

es.write.rest.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler
es.write.rest.error.handler.writeFile.filename = /path/to/some/output/file

您现在拥有一个处理程序链,该链默认情况下重试批量拒绝(HTTP 重试内置处理程序),然后忽略任何冲突错误(我们自己的忽略冲突处理程序),然后通过将其他错误写入文件(我们自己的输出到文件处理程序)来忽略任何其他错误。

序列化错误处理程序

编辑

在将数据发送到 Elasticsearch 之前,elasticsearch-hadoop 必须将每个文档序列化为 JSON 批量条目。在此过程中确定批量操作、提取文档元数据以及将特定于集成的数据结构转换为 JSON 文档。在此过程中,记录结构的不一致会导致在序列化过程中抛出异常。这些错误通常会导致任务失败和处理停止。

Elasticsearch for Apache Hadoop 提供了一个 API 来在记录级别处理序列化错误。给定序列化错误处理程序

  • 无法序列化的特定于集成的数据结构
  • 序列化过程中遇到的异常

Hive 中尚不可用序列化错误处理程序。Elasticsearch for Apache Hadoop 使用 Hive 的 SerDe 构造在发送到输出格式之前将数据转换为批量条目。SerDe 对象没有在对象结束其生命周期时调用的清理方法。因此,我们不支持 Hive 中的序列化错误处理程序,因为它们无法在作业执行结束时关闭。

连接器提供了一些默认的错误处理程序

Drop and Log 错误处理程序

编辑

默认处理程序名称:log

当调用此处理程序时,它会记录一条消息,其中包含失败的数据结构的 toString() 内容、错误消息和任何先前的处理程序消息。记录此消息后,处理程序会发出已确认错误的信号,从而使用/忽略它。

此处理程序的可用配置

es.write.data.error.handler.log.logger.name(必需)
创建日志记录器实例以记录错误时要使用的字符串名称。如果使用此处理程序,则此设置是必需的。
es.write.data.error.handler.log.logger.class(作为 logger.name 的替代方案)
创建日志记录器实例以记录错误时要使用的类名。此设置可以替代必需的设置es.write.data.error.handler.log.logger.name
es.write.data.error.handler.log.logger.level(默认:WARN)
记录错误消息时要使用的日志记录器级别。可用选项为FATALERRORWARNINFODEBUGTRACE

失败时中止错误处理程序

编辑

默认处理程序名称:fail

调用此处理程序时,它会重新抛出给它的错误并中止。此处理程序始终加载并自动放置在错误处理程序列表的末尾。

此处理程序没有配置。

Elasticsearch 错误处理程序

编辑

默认处理程序名称:es

调用此处理程序时,它会将给定的错误转换为 JSON 文档并将其插入 Elasticsearch 索引中。完成此索引操作后,可以将处理程序配置为发出已处理错误的信号(默认值),或将错误传递给链中的下一个处理程序。ES 处理程序使用 Elastic Common Schema 的自身映射序列化错误信息。此数据可以接受用户提供的其他元数据,以帮助用户搜索和报告故障(请参阅下面的标签和标记设置)。

错误处理程序构建为在每个记录/错误的基础上执行。Elasticsearch 错误处理程序目前不会批量插入错误信息。每个错误都是逐一插入的,以确保处理每个记录/错误。如果需要处理大量错误事件,此处理程序可能无法提供非常高的吞吐量。您应该尝试设计您的作业,以便错误处理是相对不常见的情况。

此处理程序的可用配置

es.write.rest.error.handler.es.client.nodes(默认值:“localhost”或当前配置的节点)
要将错误写入的节点地址的逗号分隔字符串。建议这与正在写入的集群不同(以避免写入争用)。
es.write.rest.error.handler.es.client.port(默认值:9200 或当前配置的端口)
连接到 Elasticsearch 节点时要使用的 http 端口。
es.write.rest.error.handler.es.client.resource(必需)
要将错误信息写入的索引。强烈建议此索引仅用于错误信息。不支持索引模式。
es.write.rest.error.handler.es.client.inherit(默认值:true)
确定用于发送错误信息的客户端的设置是否应继承当前正在运行的作业的相同客户端设置。默认情况下,作业配置中的所有客户端设置都由此客户端继承。
es.write.rest.error.handler.es.client.conf.<CONFIGURATION>
此配置前缀用于在处理程序的基础 ES 客户端中设置客户端配置值。它接受配置中记录的大多数设置。
es.write.rest.error.handler.es.label.<LABEL>(可选)
此处理程序创建的每个错误事件中添加的用户定义标签字段。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.tags(可选)
要添加到此处理程序创建的每个错误事件中的逗号分隔的标记字符串。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.return.default(默认值:HANDLED)
将错误成功写入 Elasticsearch 时返回给错误处理框架的处理程序结果。可用值为HANDLEDPASSABORT。默认结果为HANDLED
es.write.rest.error.handler.es.return.default.reason(可选)
如果默认返回值为PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。
es.write.rest.error.handler.es.return.error(默认值:ABORT)
无法将错误写入 Elasticsearch 时返回给错误处理框架的处理程序结果。可用值为HANDLEDPASSABORT。默认结果为ABORT
es.write.rest.error.handler.es.return.error.reason(可选)
如果错误返回值为PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。

使用序列化错误处理程序

编辑

要配置序列化错误处理程序,必须使用以下属性按顺序指定处理程序。

设置es.write.data.error.handlers
列出用于序列化错误处理的错误处理程序名称,以及调用它们的顺序。每个默认处理程序都可以通过其处理程序名称来引用,因为连接器知道如何加载它们。用户或第三方代码提供的任何处理程序都需要在其处理程序名称前加上es.write.data.error.handler.前缀。

对于序列化失败,失败时中止内置处理程序始终作为最后一个错误处理程序放置,以捕获任何未处理的错误。此错误处理程序构成了 elasticsearch-hadoop 的默认序列化错误处理行为,这与以前版本的行为一致。

  1. 任何已配置的用户处理程序都将在此处。
  2. 失败时中止内置处理程序:重新抛出遇到的任何错误。

此行为通过使用 handlers 属性将处理程序插入链中来修改。假设我们想记录所有错误并忽略它们。

es.write.data.error.handlers = log 

指定默认的 Drop and Log 处理程序

使用上述配置,处理程序列表现在如下所示

  1. Drop and Log 处理程序
  2. 失败时中止处理程序

如上所述,内置的log错误处理程序有一个必需的设置:用于日志记录器名称的内容。使用的日志记录器将尊重您已有的任何日志记录配置,因此需要一个要使用的日志记录器名称。

es.write.data.error.handlers = log 
es.write.data.error.handler.log.logger.name = SerializationErrors 

指定默认的 Drop and Log 内置处理程序

Drop and Log 内置处理程序将所有错误记录到SerializationErrors日志记录器中。

此时,失败时中止内置处理程序实际上被忽略了,因为 Drop and Log 内置处理程序将始终将错误标记为已使用。这种做法可能很危险,因为潜在的重要错误可能会被简单地忽略。在许多情况下,用户最好编写自己的错误处理程序来处理预期的异常。

编写您自己的序列化处理程序

编辑

假设您正在将一些非结构化数据流式传输到 Elasticsearch。在这种情况下,您的数据未完全清理,可能包含连接器无法转换为 JSON 的字段值。您可能不希望您的流式处理作业在此数据上失败,因为您可能期望它包含错误。在这种情况下,您可能希望以比依赖日志记录解决方案的 toString() 方法更全面的方式记录数据。

让我们为这种情况编写一个错误处理程序。

package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure;

public class CustomLogOnError extends SerializationErrorHandler {      

    private Log logger = ???; 

    @Override
    public HandlerResult onError(SerializationFailure entry, ErrorCollector<Object> collector) throws Exception {  
        MyRecord record = (MyRecord) entry.getRecord();                             
        logger.error("Could not serialize record. " +
                "Record data : " + record.getSpecificField() + ", " + record.getOtherField(), entry.getException()); 
        return HandlerResult.HANDLED;                                               
    }
}

我们创建一个类并扩展 SerializationErrorHandler 基类。

使用首选的日志记录解决方案创建日志记录器。

覆盖onError方法,该方法将使用错误详细信息调用。

检索无法序列化的记录。将其转换为您从作业中预期的记录类型。

记录您感兴趣的数据的特定信息。

最后,在记录错误后,返回HandlerResult.HANDLED以表示已确认错误。

在我们能够将此处理程序放入序列化错误处理程序列表之前,我们必须使用es.write.data.error.handler.[HANDLER-NAME]在设置中为处理程序类注册一个名称。

设置es.write.data.error.handler.[HANDLER-NAME]
创建一个名为 HANDLER-NAME 的新处理程序。此属性的值必须是为此处理程序实例化的类的二进制名称。

在这种情况下,让我们为我们的忽略冲突处理程序注册一个处理程序名称。

es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError

现在我们有了处理程序的名称,我们可以在处理程序列表中使用它。

es.write.data.error.handlers = customLog
es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError

现在,每当发生序列化失败时,您的自定义日志记录错误处理程序都会被调用,并将指示连接器可以忽略这些失败以继续处理。

高级概念

编辑

如果不想记录数据并丢弃,而是希望将其持久化到某个地方以确保安全呢?如果我们想将属性传递给我们的处理程序以参数化其行为呢?让我们创建一个处理程序,将错误信息存储在本地文件中以供以后分析。

package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure;

public class OutputToFileHandler extends SerializationErrorHandler { 

    private OutputStream outputStream;   
    private BufferedWriter writer;

    @Override
    public void init(Properties properties) {   
        try {
            outputStream = new FileOutputStream(properties.getProperty("filename"));   
            writer = new BufferedWriter(new OutputStreamWriter(outputStream));
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Could not open file", e);
        }
    }

    @Override
    public HandlerResult onError(SerializationFailure entry, ErrorCollector<Object> collector)   
    throws Exception
    {
        writer.write("Record: " + entry.getRecord().toString());
        writer.newLine();
        writer.write("Error: " + entry.getException().getMessage());
        writer.newLine();
        for (String message : entry.previousHandlerMessages()) {
            writer.write("Previous Handler: " + message);           
            writer.newLine();
        }

        return HandlerResult.PASS; 
    }

    @Override
    public void close() {   
        try {
            writer.close();
            outputStream.close();
        } catch (IOException e) {
            throw new RuntimeException("Closing file failed", e);
        }
    }
}

扩展 SerializationErrorHandler 基类

用于将数据写入文件的某些本地状态。

我们覆盖init方法。此处理程序的任何属性都将在此处传递。

我们正在从属性中提取要写入的文件。我们将在下面看到如何设置此属性。

覆盖onError方法以定义我们的行为。

写出错误信息。这突出了 SerializationFailure 对象提供的所有可用数据。

返回 PASS 结果以指示错误应传递给链中的下一个错误处理程序。

最后,关闭任何内部分配的资源。

此处理程序中添加了 initclose 方法。 init 方法在任务开始时首次创建处理程序时调用, close 方法在任务结束时调用。 init 方法接受一个 properties 参数,其中包含使用 es.write.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME] 设置的任何特定于处理程序的属性。

设置 es.write.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]
用于将属性传递给处理程序。HANDLER-NAME 是要配置的处理程序,PROPERTY-NAME 是要为处理程序设置的属性。

在我们的用例中,我们将如下配置我们的文件日志记录错误处理程序。

es.write.data.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler   
es.write.data.error.handler.writeFile.filename = /path/to/some/output/file 

我们使用名称writeFile注册我们的新处理程序。

现在,我们为writeFile处理程序设置了一个名为filename的属性。在处理程序的init方法中,可以通过使用filename作为属性键来获取它。

现在将所有内容整合在一起

es.write.data.error.handlers = writeFile,customLog

es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError

es.write.data.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler
es.write.data.error.handler.writeFile.filename = /path/to/some/output/file

现在,您有一系列处理程序,这些处理程序将有关故障的所有相关数据写入文件(我们的 writeFile 处理程序),然后使用自定义日志行记录错误并忽略错误以继续处理(我们的 customLog 处理程序)。

反序列化错误处理程序

编辑

读取数据时,连接器会针对已配置的索引执行滚动请求并读取其内容。对于滚动搜索结果中的每次命中,连接器都会尝试将其反序列化为特定于集成的记录类型。在使用 MapReduce 时,此数据类型是 MapWritable 或 Text(对于原始 JSON 数据)。对于像 Spark SQL 这样使用数据架构的集成,结果数据类型是 Row 对象。

Elasticsearch 将文档存储在 lucene 索引中。这些文档有时可能具有松散的定义,或者由于某种原因而具有无法解析为基于架构的数据类型的结构。有时字段可能采用无法正确读取的格式。

Elasticsearch for Apache Hadoop 提供了一个 API 来处理来自滚动响应的文档级反序列化错误。提供了滚动读取的错误处理程序

  • 尝试读取的原始 JSON 搜索结果
  • 遇到的异常

注意:反序列化错误处理程序仅允许处理解析滚动响应中的文档时发生的错误。搜索结果可能已成功读取,但仍格式错误,因此在框架的完全不同的部分使用时会导致异常。此错误处理程序从处理滚动读取过程中异常的最合理位置的顶部调用,但这并没有封装每个集成的所有逻辑。

连接器提供了一些默认的错误处理程序

丢弃并记录错误处理程序

编辑

默认处理程序名称:log

当此处理程序被调用时,它会记录一条消息,其中包含失败的 JSON 搜索命中、错误消息和任何先前的处理程序消息。记录此消息后,处理程序会发出已确认错误的信号,从而使用/忽略它。

此处理程序的可用配置

es.read.data.error.handler.log.logger.name(必需)
创建日志记录器实例以记录错误时要使用的字符串名称。如果使用此处理程序,则此设置是必需的。
es.read.data.error.handler.log.logger.class(替代 logger.name)
创建日志记录器实例以记录错误时要使用的类名。此设置可用于代替必需的设置 es.read.data.error.handler.log.logger.name
es.read.data.error.handler.log.logger.level(默认值:WARN)
记录错误消息时要使用的日志记录器级别。可用选项为FATALERRORWARNINFODEBUGTRACE

失败时中止错误处理程序

编辑

默认处理程序名称:fail

调用此处理程序时,它会重新抛出给它的错误并中止。此处理程序始终加载并自动放置在错误处理程序列表的末尾。

此处理程序没有配置。

Elasticsearch 错误处理程序

编辑

默认处理程序名称:es

调用此处理程序时,它会将给定的错误转换为 JSON 文档并将其插入 Elasticsearch 索引中。完成此索引操作后,可以将处理程序配置为发出已处理错误的信号(默认值),或将错误传递给链中的下一个处理程序。ES 处理程序使用 Elastic Common Schema 的自身映射序列化错误信息。此数据可以接受用户提供的其他元数据,以帮助用户搜索和报告故障(请参阅下面的标签和标记设置)。

错误处理程序构建为在每个记录/错误的基础上执行。Elasticsearch 错误处理程序目前不会批量插入错误信息。每个错误都是逐一插入的,以确保处理每个记录/错误。如果需要处理大量错误事件,此处理程序可能无法提供非常高的吞吐量。您应该尝试设计您的作业,以便错误处理是相对不常见的情况。

此处理程序的可用配置

es.write.rest.error.handler.es.client.nodes(默认值:“localhost”或当前配置的节点)
要将错误写入的节点地址的逗号分隔字符串。建议这与正在写入的集群不同(以避免写入争用)。
es.write.rest.error.handler.es.client.port(默认值:9200 或当前配置的端口)
连接到 Elasticsearch 节点时要使用的 http 端口。
es.write.rest.error.handler.es.client.resource(必需)
要将错误信息写入的索引。强烈建议此索引仅用于错误信息。不支持索引模式。
es.write.rest.error.handler.es.client.inherit(默认值:true)
确定用于发送错误信息的客户端的设置是否应继承当前正在运行的作业的相同客户端设置。默认情况下,作业配置中的所有客户端设置都由此客户端继承。
es.write.rest.error.handler.es.client.conf.<CONFIGURATION>
此配置前缀用于在处理程序的基础 ES 客户端中设置客户端配置值。它接受配置中记录的大多数设置。
es.write.rest.error.handler.es.label.<LABEL>(可选)
此处理程序创建的每个错误事件中添加的用户定义标签字段。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.tags(可选)
要添加到此处理程序创建的每个错误事件中的逗号分隔的标记字符串。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.return.default(默认值:HANDLED)
将错误成功写入 Elasticsearch 时返回给错误处理框架的处理程序结果。可用值为HANDLEDPASSABORT。默认结果为HANDLED
es.write.rest.error.handler.es.return.default.reason(可选)
如果默认返回值为PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。
es.write.rest.error.handler.es.return.error(默认值:ABORT)
无法将错误写入 Elasticsearch 时返回给错误处理框架的处理程序结果。可用值为HANDLEDPASSABORT。默认结果为ABORT
es.write.rest.error.handler.es.return.error.reason(可选)
如果错误返回值为PASS,则此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。

使用反序列化错误处理程序

编辑

要配置反序列化错误处理程序,必须按顺序使用以下属性指定处理程序。

设置 es.read.data.error.handlers
列出用于反序列化错误处理的错误处理程序的名称,以及它们应该调用的顺序。每个默认处理程序都可以通过其处理程序名称来引用,因为连接器知道如何加载它们。任何来自用户或第三方代码的处理程序都需要在其处理程序名称前加上 es.read.data.error.handler. 前缀。

对于反序列化失败,始终将“失败时中止”内置处理程序作为最后一个错误处理程序放置,以捕获任何未处理的错误。仅此错误处理程序构成了 elasticsearch-hadoop 的默认反序列化错误处理行为,这与先前版本的行为一致。

  1. 任何已配置的用户处理程序都将在此处。
  2. 失败时中止内置处理程序:重新抛出遇到的任何错误。

此行为通过使用 handlers 属性将处理程序插入链中来修改。假设我们想记录所有错误并忽略它们。

es.read.data.error.handlers = log 

指定默认的 Drop and Log 处理程序

使用上述配置,处理程序列表现在如下所示

  1. Drop and Log 处理程序
  2. 失败时中止处理程序

如上所述,内置的log错误处理程序有一个必需的设置:用于日志记录器名称的内容。使用的日志记录器将尊重您已有的任何日志记录配置,因此需要一个要使用的日志记录器名称。

es.read.data.error.handlers = log 
es.read.data.error.handler.log.logger.name = DeserializationErrors 

指定默认的 Drop and Log 内置处理程序

“丢弃并记录”内置处理程序会将所有错误记录到 DeserializationErrors 日志记录器

此时,失败时中止内置处理程序实际上被忽略了,因为 Drop and Log 内置处理程序将始终将错误标记为已使用。这种做法可能很危险,因为潜在的重要错误可能会被简单地忽略。在许多情况下,用户最好编写自己的错误处理程序来处理预期的异常。

编写您自己的反序列化错误处理程序

编辑

假设您正在从 Elasticsearch 读取大型日志数据索引。在这种情况下,您的日志数据高度非结构化,并且并非所有内容都对您的流程至关重要。由于读取的数据量很大,因此您的作业需要很长时间才能完成。在这种情况下,您可能希望用虚拟记录替换无法读取的记录以标记失败,并且不中断您的处理。应记录并丢弃有问题的 数据。

让我们为这种情况编写一个错误处理程序。

package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationFailure;

public class ReturnDummyHandler extends DeserializationErrorHandler { 

    private static final Logger LOGGER = ...; 
    private static final String DUMMY_RECORD = "..."; 

    @Override
    public HandlerResult onError(DeserializationFailure entry, ErrorCollector<byte[]> collector) 
    throws Exception
    {
        BufferedReader reader = new BufferedReader(new InputStreamReader(entry.getHitContents()));
        StringBuilder hitContent = new StringBuilder();
        for (String line = reader.readLine(); line != null; line = reader.readLine()) {           
            hitContent.append(line);
        }
        LOGGER.warn("Encountered malformed record during read. Replacing with dummy record. " +   
                            "Malformed Data: " + hitContent, entry.getException());
        return collector.retry(DUMMY_RECORD.getBytes());                                         
    }
}

我们创建一个类并扩展 DeserializationErrorHandler 基类

使用首选的日志记录解决方案创建日志记录器。

我们创建一个字符串,用于我们应该反序列化而不是失败的虚拟记录

覆盖onError方法,该方法将使用错误详细信息调用。

我们读取失败搜索命中的内容作为字符串

我们记录失败文档的内容,以及详细说明失败原因的异常

最后,我们返回要反序列化的虚拟数据内容。

在我们能够将此处理程序放入反序列化错误处理程序列表中之前,我们必须使用 es.read.data.error.handler.[HANDLER-NAME] 在设置中注册处理程序类及其名称。

设置 es.read.data.error.handler.[HANDLER-NAME]
创建一个名为 HANDLER-NAME 的新处理程序。此属性的值必须是为此处理程序实例化的类的二进制名称。

在这种情况下,让我们为我们的虚拟记录处理程序注册一个处理程序名称

es.read.data.error.handler.returnDummy = org.myproject.myhandlers.ReturnDummyHandler

现在我们有了处理程序的名称,我们可以在处理程序列表中使用它。

es.read.data.error.handlers = returnDummy
es.read.data.error.handler.returnDummy = org.myproject.myhandlers.ReturnDummyHandler

现在,每当发生反序列化失败时,您的虚拟数据错误处理程序将被调用,并指示连接器使用您提供的虚拟记录而不是格式错误的数据。

高级概念

编辑

如果我们不希望记录数据并丢弃它,而是希望将其持久化到某个地方以确保安全呢?如果我们想将属性传递给我们的处理程序以参数化其行为呢?让我们创建一个将错误信息存储在本地文件中以供以后分析的处理程序。

package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationFailure;

public class ReturnDummyAndLogToFileHandler extends DeserializationErrorHandler { 

    private static final String DUMMY_RECORD = "...";

    private OutputStream outputStream;   
    private BufferedWriter writer;

    @Override
    public void init(Properties properties) {   
        try {
            outputStream = new FileOutputStream(properties.getProperty("filename"));   
            writer = new BufferedWriter(new OutputStreamWriter(outputStream));
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Could not open file", e);
        }
    }

    @Override
    public HandlerResult onError(DeserializationFailure entry, ErrorCollector<byte[]> collector)   
    throws Exception
    {
        BufferedReader reader = new BufferedReader(new InputStreamReader(entry.getHitContents()));
        StringBuilder hitContent = new StringBuilder();
        for (String line = reader.readLine(); line != null; line = reader.readLine()) {           
            hitContent.append(line);
        }

        writer.write("Error: " + entry.getException().getMessage());
        writer.newLine();
        for (String message : entry.previousHandlerMessages()) {
            writer.write("Previous Handler: " + message);           
            writer.newLine();
        }
        writer.write("Entry: ");
        writer.newLine();
        writer.write(hitContent.toString());
        writer.newLine();

        return collector.retry(DUMMY_RECORD.getBytes());            
    }

    @Override
    public void close() {   
        try {
            writer.close();
            outputStream.close();
        } catch (IOException e) {
            throw new RuntimeException("Closing file failed", e);
        }
    }
}

扩展 DeserializationErrorHandler 基类

用于将数据写入文件的某些本地状态。

我们覆盖了 init 方法。此处理程序的任何属性都将在此处传递。

我们正在从属性中提取要写入的文件。我们将在下面看到如何设置此属性

覆盖 onError 方法以定义我们的行为

读取失败搜索命中的内容

写出错误信息。这突出了 DeserializationFailure 对象提供的所有可用数据

使用我们的虚拟记录执行重试操作

最后,关闭任何内部分配的资源

此处理程序中添加了 initclose 方法。 init 方法在任务开始时首次创建滚动查询时调用, close 方法在任务结束时关闭滚动查询时调用。 init 方法接受一个 properties 参数,其中包含使用 es.read.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME] 设置的任何特定于处理程序的属性。

设置 es.read.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]
用于将属性传递给处理程序。HANDLER-NAME 是要配置的处理程序,PROPERTY-NAME 是要为处理程序设置的属性。

在我们的用例中,我们将如下配置我们的文件日志记录错误处理程序。

es.read.data.error.handler.writeFile = org.myproject.myhandlers.ReturnDummyAndLogToFileHandler   
es.read.data.error.handler.writeFile.filename = /path/to/some/output/file   

我们使用名称writeFile注册我们的新处理程序。

现在,我们为writeFile处理程序设置了一个名为filename的属性。在处理程序的init方法中,可以通过使用filename作为属性键来获取它。

现在将所有内容与前面的示例整合在一起

es.read.data.error.handlers = writeFile
es.read.data.error.handler.writeFile = org.myproject.myhandlers.ReturnDummyAndLogToFileHandler
es.read.data.error.handler.writeFile.filename = /path/to/some/output/file

您现在有一个处理程序,它会重试将格式错误的数据替换为虚拟记录,然后通过将这些格式错误的记录及其错误信息写入自定义文件来记录这些记录。