错误处理程序编辑

在 6.4 中添加。

Elasticsearch for Apache Hadoop 被设计为一个基本上无需人工干预的集成。大多数功能通过约定和配置进行管理,不需要大量的代码即可启动并运行连接器。在异常方面,我们已经投入了大量精力来处理来自 Elasticsearch 的最常见和预期的错误。如果遇到意外错误或表明存在实际问题,连接器将采用“快速失败”方法。我们意识到这种方法并不适合所有用户,尤其是那些关心作业正常运行时间的用户。

在这些情况下,用户可以通过指定在遇到意外错误时要采取的操作来处理意外错误。为此,我们提供了一组 API 和扩展,用户可以实现这些 API 和扩展,以便根据自己的需求调整连接器在最常见位置遇到故障时的行为。

错误处理程序机制编辑

elasticsearch-hadoop 中可以处理的每种类型的故障都有自己的错误处理程序 API,该 API 专为正在执行的操作量身定制。错误处理程序只是一个在发生错误时调用的函数,它会通知 elasticsearch-hadoop 如何继续执行。可以为每种类型的故障指定多个错误处理程序。发生故障时,每个错误处理程序将按它们在配置中指定的顺序执行。

错误处理程序会获得有关执行的操作以及遇到的错误的任何详细信息。然后,处理程序可以确认并消耗故障,告诉 elasticsearch-hadoop 忽略错误。或者,处理程序可以标记错误以重新抛出,这可能会导致作业结束。错误处理程序还可以选择修改操作的参数并重试。最后,如果处理程序不知道如何处理故障,它也可以将故障“传递”给列表中的下一个处理程序。

如果提供的处理程序列表中的每个处理程序都选择“传递”,则将其标记为未处理的错误,并且异常将被重新抛出,这可能会导致作业结束。连接器附带了一些默认的错误处理程序,它们可以处理大多数用例,但是如果您发现需要更具体的错误处理策略,您始终可以编写自己的错误处理程序。

在以下部分,我们将详细介绍不同类型的错误处理程序、它们在何处调用、如何配置它们以及如何在需要时编写自己的错误处理程序。

批量写入错误处理程序编辑

写入数据时,连接器会将文档批量放入批量请求中,然后再将它们发送到 Elasticsearch。在响应中,Elasticsearch 为发送的每个文档返回一个状态,其中可能包括拒绝或失败。这里遇到的一个常见错误是 *拒绝*,这意味着要写入文档的分片太忙而无法接受写入。这里其他故障可能包括由于不符合当前索引映射或与文档的当前版本冲突而被拒绝的文档。

Elasticsearch for Apache Hadoop 提供了一个 API 来处理来自批量响应的文档级错误。批量写入的错误处理程序会获得

  • 尝试过的原始 JSON 批量条目
  • 错误消息
  • 文档的 HTTP 状态代码
  • 当前文档发送到 Elasticsearch 的次数

连接器提供了一些默认的错误处理程序

HTTP 重试处理程序编辑

始终配置为批量写入的第一个错误处理程序。

此处理程序检查故障以查找可以重试的常见 HTTP 代码。这些代码通常表示要写入文档的分片太忙而无法接受写入,并且文档被拒绝以减轻负载。此处理程序始终配置为首先运行以处理批量故障。用户配置的所有处理程序都按顺序放置在此处理程序之后。

虽然处理程序在处理程序列表中的位置无法修改,但可以通过调整以下配置来修改其行为

es.batch.write.retry.policy(默认值:simple)
定义用于确定哪些 http 代码可以重试的策略。默认值 simple 允许重试 429(请求过多)和 503(不可用)。将值设置为 none 将不允许重试任何状态代码。
es.batch.write.retry.count(默认值:3)
如果 Elasticsearch 过载并且数据被拒绝,则对给定批次进行重试的次数。请注意,只有被拒绝的数据才会被重试。如果在执行重试后仍然有数据被拒绝,则 Hadoop 作业将被取消(并失败)。负值表示无限次重试;在设置此值时要小心,因为它可能会产生不希望有的副作用。
es.batch.write.retry.wait(默认值:10s)
由于批量拒绝而导致的批量写入重试之间等待的时间。

删除并记录错误处理程序编辑

默认处理程序名称:log

当调用此处理程序时,它会记录一条消息,其中包含失败的 JSON 批量条目、错误消息以及任何先前的处理程序消息。记录此消息后,处理程序会发出信号表明错误已得到确认,从而消耗/忽略它。

此处理程序的可用配置

es.write.rest.error.handler.log.logger.name(必需)
创建日志记录器实例以记录错误时要使用的字符串名称。如果使用此处理程序,则此设置是必需的。
es.write.rest.error.handler.log.logger.class(作为 logger.name 的替代方案)
创建日志记录器实例以记录错误时要使用的类名。此设置可以代替必需的设置 es.write.rest.error.handler.log.logger.name
es.write.rest.error.handler.log.logger.level(默认值:WARN)
记录错误消息时要使用的日志记录器级别。可用选项为 FATALERRORWARNINFODEBUGTRACE

在故障时中止错误处理程序编辑

默认处理程序名称:fail

当调用此处理程序时,它会重新抛出给它的错误并中止。此处理程序始终加载并自动放置在错误处理程序列表的末尾。

此处理程序没有配置。

Elasticsearch 错误处理程序编辑

默认处理程序名称:es

当调用此处理程序时,它会将给定的错误转换为 JSON 文档并将其插入 Elasticsearch 索引中。完成此索引操作后,可以配置处理程序以发出信号表明错误已处理(默认值),或者将错误传递给链中的下一个处理程序。ES 处理程序使用其自己的 Elastic Common Schema 映射序列化错误信息。此数据可以接受用户提供的其他元数据,这些元数据可以帮助用户搜索和报告故障(请参阅下面的标签和标记设置)。

错误处理程序被构建为在每个记录/错误的基础上执行。Elasticsearch 错误处理程序目前不批量插入错误信息。每个错误都是一次插入,以确保处理每个记录/错误。在需要处理大量错误事件的情况下,此处理程序可能无法提供很高的吞吐量。您应该尝试设计作业,以便错误处理是一个相对不常见的事件。

此处理程序的可用配置

es.write.rest.error.handler.es.client.nodes(默认值:“localhost”或当前配置的节点)
要将错误写入的节点地址的逗号分隔字符串。建议此集群与正在写入的集群不同(以避免写入争用)。
es.write.rest.error.handler.es.client.port(默认值:9200 或当前配置的端口)
连接到 Elasticsearch 节点时要使用的 http 端口。
es.write.rest.error.handler.es.client.resource(必需)
要将错误信息写入的索引。强烈建议此索引仅用于错误信息。不支持索引模式。
es.write.rest.error.handler.es.client.inherit(默认值:true)
确定用于发送错误信息的客户端的创建设置是否应继承当前正在运行作业的相同客户端设置。默认情况下,作业配置中的所有客户端设置都将由此客户端继承。
es.write.rest.error.handler.es.client.conf.<CONFIGURATION>
此配置前缀用于在处理程序的底层 ES 客户端中设置客户端配置值。它接受 配置 中记录的大多数设置。
es.write.rest.error.handler.es.label.<LABEL>(可选)
此处理程序创建的每个错误事件中添加的用户定义标签字段。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.tags(可选)
此处理程序创建的每个错误事件中添加的标签的逗号分隔字符串。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.return.default(默认值:HANDLED)
将错误成功写入 Elasticsearch 时返回给错误处理框架的处理程序结果。可用值为 HANDLEDPASSABORT。默认结果为 HANDLED
es.write.rest.error.handler.es.return.default.reason(可选)
如果默认返回值是 PASS,此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。
es.write.rest.error.handler.es.return.error (默认值:ABORT)
当无法将错误写入 Elasticsearch 时,要返回给错误处理框架的处理程序结果。可用值是 HANDLEDPASSABORT。默认结果是 ABORT
es.write.rest.error.handler.es.return.error.reason (可选)
如果错误返回值是 PASS,此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。

使用批量错误处理程序edit

要配置批量错误处理程序,必须按以下顺序指定处理程序。

设置 es.write.rest.error.handlers
列出用于批量写入错误处理的错误处理程序名称,以及调用它们的顺序。每个默认处理程序都可以通过其处理程序名称来引用,因为连接器知道如何加载它们。任何来自用户或第三方代码的处理程序都需要使用 es.write.rest.error.handler. 前缀定义其处理程序名称。

对于批量写入失败,HTTP 重试内置处理程序始终作为第一个错误处理程序放置。此外,Abort on Failure 内置处理程序始终作为最后一个错误处理程序放置,以捕获任何未处理的错误。这两个错误处理程序本身构成了 elasticsearch-hadoop 的默认批量写入错误处理行为,这与以前版本的行为一致。

  1. HTTP 重试内置处理程序:重试来自 Elasticsearch 的良性批量拒绝和失败,并将任何其他错误传递给下一级
  2. 任何配置的用户处理程序都将在此处。
  3. Abort on Failure 内置处理程序:重新抛出它遇到的任何错误

通过使用 handlers 属性将处理程序插入链中,可以修改此行为。假设我们想要记录所有错误并忽略它们。

es.write.rest.error.handlers = log 

指定默认的 Drop and Log 处理程序

使用上述配置,处理程序列表现在看起来像这样

  1. HTTP 重试处理程序
  2. Drop and Log 处理程序
  3. Abort on Failure 处理程序

如上所述,内置的 log 错误处理程序有一个必需的设置:用于记录器名称的内容。使用的记录器将尊重您已有的任何记录配置,因此需要一个用于记录器的名称

es.write.rest.error.handlers = log 
es.write.rest.error.handler.log.logger.name = BulkErrors 

指定默认的 Drop and Log 内置处理程序

Drop and Log 内置处理程序将所有错误记录到 BulkErrors 记录器

此时,Abort on Failure 内置处理程序实际上被忽略了,因为 Drop and Log 内置处理程序将始终将错误标记为已使用。这种做法可能很危险,因为潜在的重要错误可能被简单地忽略。在许多情况下,用户最好编写自己的错误处理程序来处理预期的异常。

编写您自己的批量错误处理程序edit

假设您正在将敏感的交易数据流式传输到 Elasticsearch。在这种情况下,您的数据经过仔细的版本控制,并且您利用 Elasticsearch 的版本系统来防止用旧数据覆盖较新的数据。也许您的数据以一种允许较新的数据在一些较旧的数据片段进入 Elasticsearch 之前偷偷进入 Elasticsearch 的方式进行分布。不用担心,版本系统将拒绝较旧的数据并保留 Elasticsearch 中数据的完整性。这里的问题是您的流式传输作业失败了,因为返回了冲突错误,并且连接器不确定您是否期望这样做。

让我们为这种情况编写一个错误处理程序

package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteErrorHandler;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteFailure;
import org.elasticsearch.hadoop.rest.bulk.handler.DelayableErrorCollector;

public class IgnoreConflictsHandler extends BulkWriteErrorHandler { 

    private static final Logger LOGGER = ...; 

    @Override
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) 
    throws Exception
    {
        if (entry.getResponseCode() == 409) { 
            LOGGER.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED; 
        }
        return collector.pass("Not a conflict response code."); 
    }
}

我们创建一个类并扩展 BulkWriteErrorHandler 基类

使用首选的日志记录解决方案创建记录器

覆盖 onError 方法,该方法将使用错误详细信息调用

检查错误的响应代码,以查看它是否为 409(冲突)

如果它是一个冲突,则记录错误并返回 HandlerResult.HANDLED 以表示已确认错误

如果错误不是冲突,我们将其与无法处理它的原因一起传递给下一个错误处理程序

在我们能够将此处理程序放在批量写入错误处理程序列表中之前,我们必须使用 es.write.rest.error.handler.[HANDLER-NAME] 在设置中使用名称注册处理程序类

设置 es.write.rest.error.handler.[HANDLER-NAME]
创建一个名为 HANDLER-NAME 的新处理程序。此属性的值必须是用于此处理程序的类的二进制名称。

在这种情况下,让我们为我们的忽略冲突处理程序注册一个处理程序名称

es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

现在我们有了处理程序的名称,我们可以在处理程序列表中使用它

es.write.rest.error.handlers = ignoreConflict
es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

现在,每当发生批量失败时,您的忽略冲突错误处理程序将被调用,并指示连接器忽略来自 Elasticsearch 的冲突响应代码是可以的。

高级概念edit

如果我们不想记录数据并将其删除,而是想将其持久化到某个地方以确保安全?如果我们想将属性传递给我们的处理程序以参数化它们的行为?让我们创建一个将错误信息存储在本地文件中以供以后分析的处理程序。

package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteErrorHandler;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteFailure;
import org.elasticsearch.hadoop.rest.bulk.handler.DelayableErrorCollector;

public class OutputToFileHandler extends BulkWriteErrorHandler { 

    private OutputStream outputStream;   
    private BufferedWriter writer;

    @Override
    public void init(Properties properties) {   
        try {
            outputStream = new FileOutputStream(properties.getProperty("filename"));   
            writer = new BufferedWriter(new OutputStreamWriter(outputStream));
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Could not open file", e);
        }
    }

    @Override
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector)   
    throws Exception
    {
        writer.write("Code: " + entry.getResponseCode());
        writer.newLine();
        writer.write("Error: " + entry.getException().getMessage());
        writer.newLine();
        for (String message : entry.previousHandlerMessages()) {
            writer.write("Previous Handler: " + message);           
            writer.newLine();
        }
        writer.write("Attempts: " + entry.getNumberOfAttempts());
        writer.newLine();
        writer.write("Entry: ");
        writer.newLine();
        IOUtils.copy(entry.getEntryContents(), writer);
        writer.newLine();

        return HandlerResult.HANDLED; 
    }

    @Override
    public void close() {   
        try {
            writer.close();
            outputStream.close();
        } catch (IOException e) {
            throw new RuntimeException("Closing file failed", e);
        }
    }
}

扩展 BulkWriteErrorHandler 基类

一些用于将数据写入文件的本地状态

我们覆盖了 init 方法。此处理程序的任何属性都将在此处传递。

我们正在从属性中提取要写入的文件。我们将在下面看到如何设置此属性。

覆盖 onError 方法以定义我们的行为。

写出错误信息。这突出了 BulkWriteFailure 对象提供的全部可用数据。

返回 HANDLED 结果以表示已处理错误。

最后,关闭任何内部分配的资源。

添加到此处理程序的是 initclose 方法。当处理程序在任务开始时首次创建时,将调用 init 方法,当任务结束时,将调用 close 方法。 init 方法接受一个 properties 参数,该参数包含通过使用 es.write.rest.error.handler.[HANDLER-NAME].[PROPERTY-NAME] 设置的任何特定于处理程序的属性。

设置 es.write.rest.error.handler.[HANDLER-NAME].[PROPERTY-NAME]
用于将属性传递给处理程序。HANDLER-NAME 是要配置的处理程序,PROPERTY-NAME 是要为处理程序设置的属性。

在我们的用例中,我们将像这样配置我们的文件日志记录错误处理程序

es.write.rest.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler   
es.write.rest.error.handler.writeFile.filename = /path/to/some/output/file   

我们使用名称 writeFile 注册了新的处理程序

现在,我们为 writeFile 处理程序设置了一个名为 filename 的属性。在处理程序的 init 方法中,可以使用 filename 作为属性键来获取它。

现在,将所有内容与前面的示例(忽略冲突)结合起来

es.write.rest.error.handlers = ignoreConflict,writeFile

es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

es.write.rest.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler
es.write.rest.error.handler.writeFile.filename = /path/to/some/output/file

您现在拥有一个处理程序链,该链默认情况下会重试批量拒绝(HTTP 重试内置处理程序),然后忽略任何冲突错误(我们自己的忽略冲突处理程序),然后通过将它们写入文件来忽略任何其他错误(我们自己的输出到文件处理程序)。

序列化错误处理程序edit

在将数据发送到 Elasticsearch 之前,elasticsearch-hadoop 必须将每个文档序列化为 JSON 批量条目。正是在此过程中,确定了批量操作,提取了文档元数据,并将特定于集成的數據结构转换为 JSON 文档。在此过程中,记录结构的不一致会导致在序列化过程中抛出异常。这些错误通常会导致任务失败和处理停止。

Elasticsearch for Apache Hadoop 提供了一个 API 来在记录级别处理序列化错误。序列化错误处理程序被赋予

  • 无法序列化的特定于集成的數據结构
  • 序列化过程中遇到的异常

序列化错误处理程序尚不可用于 Hive。Elasticsearch for Apache Hadoop 使用 Hive 的 SerDe 结构在发送到输出格式之前将数据转换为批量条目。SerDe 对象没有在对象结束其生命周期时调用的清理方法。因此,我们不支持 Hive 中的序列化错误处理程序,因为它们无法在作业执行结束时关闭。

连接器提供了一些默认的错误处理程序

Drop and Log 错误处理程序edit

默认处理程序名称:log

当调用此处理程序时,它会记录一条消息,其中包含失败的數據结构的 toString() 内容、错误消息以及任何先前的处理程序消息。记录此消息后,处理程序会发出已确认错误的信号,从而使用/忽略它。

此处理程序的可用配置

es.write.data.error.handler.log.logger.name (必需)
创建日志记录器实例以记录错误时要使用的字符串名称。如果使用此处理程序,则此设置是必需的。
es.write.data.error.handler.log.logger.class (作为 logger.name 的替代)
创建记录器实例以记录错误时要使用的类名。此设置可以代替必需的设置 es.write.data.error.handler.log.logger.name
es.write.data.error.handler.log.logger.level (默认值:WARN)
记录错误消息时要使用的日志记录器级别。可用选项为 FATALERRORWARNINFODEBUGTRACE

Abort on Failure 错误处理程序edit

默认处理程序名称:fail

当调用此处理程序时,它会重新抛出给它的错误并中止。此处理程序始终加载并自动放置在错误处理程序列表的末尾。

此处理程序没有配置。

Elasticsearch 错误处理程序edit

默认处理程序名称:es

当调用此处理程序时,它会将给定的错误转换为 JSON 文档并将其插入 Elasticsearch 索引中。完成此索引操作后,可以配置处理程序以发出信号表明错误已处理(默认值),或者将错误传递给链中的下一个处理程序。ES 处理程序使用其自己的 Elastic Common Schema 映射序列化错误信息。此数据可以接受用户提供的其他元数据,这些元数据可以帮助用户搜索和报告故障(请参阅下面的标签和标记设置)。

错误处理程序被构建为在每个记录/错误的基础上执行。Elasticsearch 错误处理程序目前不批量插入错误信息。每个错误都是一次插入,以确保处理每个记录/错误。在需要处理大量错误事件的情况下,此处理程序可能无法提供很高的吞吐量。您应该尝试设计作业,以便错误处理是一个相对不常见的事件。

此处理程序的可用配置

es.write.rest.error.handler.es.client.nodes(默认值:“localhost”或当前配置的节点)
要将错误写入的节点地址的逗号分隔字符串。建议此集群与正在写入的集群不同(以避免写入争用)。
es.write.rest.error.handler.es.client.port(默认值:9200 或当前配置的端口)
连接到 Elasticsearch 节点时要使用的 http 端口。
es.write.rest.error.handler.es.client.resource(必需)
要将错误信息写入的索引。强烈建议此索引仅用于错误信息。不支持索引模式。
es.write.rest.error.handler.es.client.inherit(默认值:true)
确定用于发送错误信息的客户端的创建设置是否应继承当前正在运行作业的相同客户端设置。默认情况下,作业配置中的所有客户端设置都将由此客户端继承。
es.write.rest.error.handler.es.client.conf.<CONFIGURATION>
此配置前缀用于在处理程序的底层 ES 客户端中设置客户端配置值。它接受 配置 中记录的大多数设置。
es.write.rest.error.handler.es.label.<LABEL>(可选)
此处理程序创建的每个错误事件中添加的用户定义标签字段。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.tags(可选)
此处理程序创建的每个错误事件中添加的标签的逗号分隔字符串。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.return.default(默认值:HANDLED)
将错误成功写入 Elasticsearch 时返回给错误处理框架的处理程序结果。可用值为 HANDLEDPASSABORT。默认结果为 HANDLED
es.write.rest.error.handler.es.return.default.reason(可选)
如果默认返回值是 PASS,此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。
es.write.rest.error.handler.es.return.error (默认值:ABORT)
当无法将错误写入 Elasticsearch 时,要返回给错误处理框架的处理程序结果。可用值是 HANDLEDPASSABORT。默认结果是 ABORT
es.write.rest.error.handler.es.return.error.reason (可选)
如果错误返回值是 PASS,此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。

使用序列化错误处理程序edit

要配置序列化错误处理程序,必须按以下顺序指定处理程序。

设置 es.write.data.error.handlers
列出用于序列化错误处理的错误处理程序名称,以及调用它们的顺序。每个默认处理程序都可以通过其处理程序名称来引用,因为连接器知道如何加载它们。任何来自用户或第三方代码的处理程序都需要使用 es.write.data.error.handler. 前缀定义其处理程序名称。

对于序列化失败,Abort on Failure 内置处理程序始终作为最后一个错误处理程序放置,以捕获任何未处理的错误。此错误处理程序构成了 elasticsearch-hadoop 的默认序列化错误处理行为,这与以前版本的行为一致。

  1. 任何配置的用户处理程序都将在此处。
  2. Abort on Failure 内置处理程序:重新抛出它遇到的任何错误

通过使用 handlers 属性将处理程序插入链中,可以修改此行为。假设我们想要记录所有错误并忽略它们。

es.write.data.error.handlers = log 

指定默认的 Drop and Log 处理程序

使用上述配置,处理程序列表现在看起来像这样

  1. Drop and Log 处理程序
  2. Abort on Failure 处理程序

如上所述,内置的 log 错误处理程序有一个必需的设置:用于记录器名称的内容。使用的记录器将尊重您已有的任何记录配置,因此需要一个用于记录器的名称

es.write.data.error.handlers = log 
es.write.data.error.handler.log.logger.name = SerializationErrors 

指定默认的 Drop and Log 内置处理程序

Drop and Log 内置处理程序将所有错误记录到 SerializationErrors 记录器

此时,Abort on Failure 内置处理程序实际上被忽略了,因为 Drop and Log 内置处理程序将始终将错误标记为已使用。这种做法可能很危险,因为潜在的重要错误可能被简单地忽略。在许多情况下,用户最好编写自己的错误处理程序来处理预期的异常。

编写您自己的序列化处理程序edit

假设您正在将一些非结构化数据流式传输到 Elasticsearch。在这种情况下,您的数据没有完全清理,可能包含连接器无法转换为 JSON 的字段值。您可能不希望您的流式处理作业因这些数据而失败,因为您可能期望它包含错误。在这种情况下,您可能希望以比依赖日志记录解决方案的 toString() 方法更全面的方式记录数据。

让我们为这种情况编写一个错误处理程序

package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure;

public class CustomLogOnError extends SerializationErrorHandler {      

    private Log logger = ???; 

    @Override
    public HandlerResult onError(SerializationFailure entry, ErrorCollector<Object> collector) throws Exception {  
        MyRecord record = (MyRecord) entry.getRecord();                             
        logger.error("Could not serialize record. " +
                "Record data : " + record.getSpecificField() + ", " + record.getOtherField(), entry.getException()); 
        return HandlerResult.HANDLED;                                               
    }
}

我们创建一个类并扩展 SerializationErrorHandler 基类

使用首选的日志记录解决方案创建记录器

覆盖 onError 方法,该方法将使用错误详细信息调用

检索无法序列化的记录。将其转换为您从作业中期望的记录类型

记录您感兴趣的数据中的特定信息

最后,在记录错误后,返回 HandlerResult.HANDLED 以表示已确认错误

在我们可以在序列化错误处理程序列表中放置此处理程序之前,我们必须使用 es.write.data.error.handler.[HANDLER-NAME] 在设置中使用名称注册处理程序类。

设置 es.write.data.error.handler.[HANDLER-NAME]
创建一个名为 HANDLER-NAME 的新处理程序。此属性的值必须是用于此处理程序的类的二进制名称。

在这种情况下,让我们为我们的忽略冲突处理程序注册一个处理程序名称

es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError

现在我们有了处理程序的名称,我们可以在处理程序列表中使用它

es.write.data.error.handlers = customLog
es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError

现在,每当发生序列化失败时,您的自定义日志记录错误处理程序将被调用,并将指示连接器忽略这些失败以继续处理。

高级概念edit

如果不想记录数据并将其丢弃,而是想将其持久化到某个地方以确保安全?如果我们想将属性传递到我们的处理程序中以参数化它们的行为?让我们创建一个将错误信息存储在本地文件中以供以后分析的处理程序。

package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure;

public class OutputToFileHandler extends SerializationErrorHandler { 

    private OutputStream outputStream;   
    private BufferedWriter writer;

    @Override
    public void init(Properties properties) {   
        try {
            outputStream = new FileOutputStream(properties.getProperty("filename"));   
            writer = new BufferedWriter(new OutputStreamWriter(outputStream));
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Could not open file", e);
        }
    }

    @Override
    public HandlerResult onError(SerializationFailure entry, ErrorCollector<Object> collector)   
    throws Exception
    {
        writer.write("Record: " + entry.getRecord().toString());
        writer.newLine();
        writer.write("Error: " + entry.getException().getMessage());
        writer.newLine();
        for (String message : entry.previousHandlerMessages()) {
            writer.write("Previous Handler: " + message);           
            writer.newLine();
        }

        return HandlerResult.PASS; 
    }

    @Override
    public void close() {   
        try {
            writer.close();
            outputStream.close();
        } catch (IOException e) {
            throw new RuntimeException("Closing file failed", e);
        }
    }
}

扩展 SerializationErrorHandler 基类

一些用于将数据写入文件的本地状态

我们覆盖了 init 方法。此处理程序的任何属性都将在此处传递。

我们正在从属性中提取要写入的文件。我们将在下面看到如何设置此属性。

覆盖 onError 方法以定义我们的行为。

写出错误信息。这突出了 SerializationFailure 对象提供的全部可用数据。

返回 PASS 结果以表示应将错误传递给链中的下一个错误处理程序。

最后,关闭任何内部分配的资源。

此处理程序中添加了 initclose 方法。当任务开始时首次创建处理程序时,会调用 init 方法,当任务结束时,会调用 close 方法。 init 方法接受一个属性参数,该参数包含使用 es.write.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME] 设置的任何特定于处理程序的属性。

设置 es.write.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]
用于将属性传递给处理程序。HANDLER-NAME 是要配置的处理程序,PROPERTY-NAME 是要为处理程序设置的属性。

在我们的用例中,我们将像这样配置我们的文件日志记录错误处理程序

es.write.data.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler   
es.write.data.error.handler.writeFile.filename = /path/to/some/output/file 

我们使用名称 writeFile 注册了新的处理程序

现在,我们为 writeFile 处理程序设置了一个名为 filename 的属性。在处理程序的 init 方法中,可以使用 filename 作为属性键来获取它。

现在将所有内容整合在一起

es.write.data.error.handlers = writeFile,customLog

es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError

es.write.data.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler
es.write.data.error.handler.writeFile.filename = /path/to/some/output/file

您现在有一个处理程序链,它将有关失败的所有相关数据写入文件(我们的 writeFile 处理程序),然后使用自定义日志行记录错误并忽略错误以继续处理(我们的 customLog 处理程序)。

反序列化错误处理程序edit

在读取数据时,连接器会对配置的索引执行滚动请求并读取其内容。对于滚动搜索结果中的每个命中,连接器都会尝试将其反序列化为特定于集成的记录类型。在使用 MapReduce 时,此数据类型要么是 MapWritable,要么是 Text(用于原始 JSON 数据)。对于像 Spark SQL 这样的使用数据模式的集成,结果数据类型是 Row 对象。

Elasticsearch 将文档存储在 lucene 索引中。这些文档有时可能具有松散的定义,或者由于某种原因而具有无法解析为基于模式的数据类型的结构。有时,字段可能采用无法正确读取的格式。

Elasticsearch for Apache Hadoop 提供了一个 API 来处理来自滚动响应的文档级反序列化错误。滚动读取的错误处理程序被赋予

  • 尝试过的原始 JSON 搜索结果
  • 遇到的异常

注意:反序列化错误处理程序仅允许处理解析滚动响应中的文档时发生的错误。搜索结果可能已成功读取,但仍然格式错误,因此在框架的完全不同的部分使用时会导致异常。此错误处理程序从滚动读取过程中处理异常的最合理位置的顶部调用,但这并不封装每个集成的所有逻辑。

连接器提供了一些默认的错误处理程序

删除和记录错误处理程序edit

默认处理程序名称:log

当此处理程序被调用时,它会记录一条消息,其中包含失败的 JSON 搜索命中、错误消息以及任何先前的处理程序消息。在记录此消息后,处理程序会发出已确认错误的信号,从而消耗/忽略它。

此处理程序的可用配置

es.read.data.error.handler.log.logger.name(必需)
创建日志记录器实例以记录错误时要使用的字符串名称。如果使用此处理程序,则此设置是必需的。
es.read.data.error.handler.log.logger.class(作为 logger.name 的替代方案)
创建日志记录器实例以记录错误时要使用的类名。此设置可用于代替必需设置 es.read.data.error.handler.log.logger.name
es.read.data.error.handler.log.logger.level(默认:WARN)
记录错误消息时要使用的日志记录器级别。可用选项为 FATALERRORWARNINFODEBUGTRACE

失败时中止错误处理程序edit

默认处理程序名称:fail

当调用此处理程序时,它会重新抛出给它的错误并中止。此处理程序始终加载并自动放置在错误处理程序列表的末尾。

此处理程序没有配置。

Elasticsearch 错误处理程序edit

默认处理程序名称:es

当调用此处理程序时,它会将给定的错误转换为 JSON 文档并将其插入 Elasticsearch 索引中。完成此索引操作后,可以配置处理程序以发出信号表明错误已处理(默认值),或者将错误传递给链中的下一个处理程序。ES 处理程序使用其自己的 Elastic Common Schema 映射序列化错误信息。此数据可以接受用户提供的其他元数据,这些元数据可以帮助用户搜索和报告故障(请参阅下面的标签和标记设置)。

错误处理程序被构建为在每个记录/错误的基础上执行。Elasticsearch 错误处理程序目前不批量插入错误信息。每个错误都是一次插入,以确保处理每个记录/错误。在需要处理大量错误事件的情况下,此处理程序可能无法提供很高的吞吐量。您应该尝试设计作业,以便错误处理是一个相对不常见的事件。

此处理程序的可用配置

es.write.rest.error.handler.es.client.nodes(默认值:“localhost”或当前配置的节点)
要将错误写入的节点地址的逗号分隔字符串。建议此集群与正在写入的集群不同(以避免写入争用)。
es.write.rest.error.handler.es.client.port(默认值:9200 或当前配置的端口)
连接到 Elasticsearch 节点时要使用的 http 端口。
es.write.rest.error.handler.es.client.resource(必需)
要将错误信息写入的索引。强烈建议此索引仅用于错误信息。不支持索引模式。
es.write.rest.error.handler.es.client.inherit(默认值:true)
确定用于发送错误信息的客户端的创建设置是否应继承当前正在运行作业的相同客户端设置。默认情况下,作业配置中的所有客户端设置都将由此客户端继承。
es.write.rest.error.handler.es.client.conf.<CONFIGURATION>
此配置前缀用于在处理程序的底层 ES 客户端中设置客户端配置值。它接受 配置 中记录的大多数设置。
es.write.rest.error.handler.es.label.<LABEL>(可选)
此处理程序创建的每个错误事件中添加的用户定义标签字段。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.tags(可选)
此处理程序创建的每个错误事件中添加的标签的逗号分隔字符串。此字段将被索引到为此处理程序提供的 Elasticsearch 索引中。仅限文本数据。
es.write.rest.error.handler.es.return.default(默认值:HANDLED)
将错误成功写入 Elasticsearch 时返回给错误处理框架的处理程序结果。可用值为 HANDLEDPASSABORT。默认结果为 HANDLED
es.write.rest.error.handler.es.return.default.reason(可选)
如果默认返回值是 PASS,此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。
es.write.rest.error.handler.es.return.error (默认值:ABORT)
当无法将错误写入 Elasticsearch 时,要返回给错误处理框架的处理程序结果。可用值是 HANDLEDPASSABORT。默认结果是 ABORT
es.write.rest.error.handler.es.return.error.reason (可选)
如果错误返回值是 PASS,此可选文本设置允许用户指定处理程序将数据传递给链中下一个处理程序的原因。

使用反序列化错误处理程序edit

要配置反序列化错误处理程序,您必须使用以下属性按顺序指定处理程序。

设置 es.read.data.error.handlers
列出用于反序列化错误处理的错误处理程序的名称,以及调用它们的顺序。每个默认处理程序都可以通过其处理程序名称来引用,因为连接器知道如何加载它们。任何来自用户或第三方代码的处理程序都需要使用 es.read.data.error.handler. 前缀定义其处理程序名称。

对于反序列化失败,中止失败内置处理程序始终作为最后一个错误处理程序放置,以捕获任何未处理的错误。此错误处理程序单独构成 elasticsearch-hadoop 的默认反序列化错误处理行为,这与以前版本的行为相匹配。

  1. 任何配置的用户处理程序都将在此处。
  2. Abort on Failure 内置处理程序:重新抛出它遇到的任何错误

通过使用 handlers 属性将处理程序插入链中,可以修改此行为。假设我们想要记录所有错误并忽略它们。

es.read.data.error.handlers = log 

指定默认的 Drop and Log 处理程序

使用上述配置,处理程序列表现在看起来像这样

  1. Drop and Log 处理程序
  2. Abort on Failure 处理程序

如上所述,内置的 log 错误处理程序有一个必需的设置:用于记录器名称的内容。使用的记录器将尊重您已有的任何记录配置,因此需要一个用于记录器的名称

es.read.data.error.handlers = log 
es.read.data.error.handler.log.logger.name = DeserializationErrors 

指定默认的 Drop and Log 内置处理程序

删除和记录内置处理程序会将所有错误记录到 DeserializationErrors 日志记录器

此时,Abort on Failure 内置处理程序实际上被忽略了,因为 Drop and Log 内置处理程序将始终将错误标记为已使用。这种做法可能很危险,因为潜在的重要错误可能被简单地忽略。在许多情况下,用户最好编写自己的错误处理程序来处理预期的异常。

编写您自己的反序列化错误处理程序edit

假设您正在从 Elasticsearch 读取一个大型日志数据索引。在这种情况下,您的日志数据高度非结构化,并非所有内容对您的过程都至关重要。由于读取的数据量很大,您的作业需要很长时间才能完成。在这种情况下,您可能希望用一个虚拟记录替换无法读取的记录以标记失败,而不是中断您的处理。应记录并删除有问题的數據。

让我们为这种情况编写一个错误处理程序

package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationFailure;

public class ReturnDummyHandler extends DeserializationErrorHandler { 

    private static final Logger LOGGER = ...; 
    private static final String DUMMY_RECORD = "..."; 

    @Override
    public HandlerResult onError(DeserializationFailure entry, ErrorCollector<byte[]> collector) 
    throws Exception
    {
        BufferedReader reader = new BufferedReader(new InputStreamReader(entry.getHitContents()));
        StringBuilder hitContent = new StringBuilder();
        for (String line = reader.readLine(); line != null; line = reader.readLine()) {           
            hitContent.append(line);
        }
        LOGGER.warn("Encountered malformed record during read. Replacing with dummy record. " +   
                            "Malformed Data: " + hitContent, entry.getException());
        return collector.retry(DUMMY_RECORD.getBytes());                                         
    }
}

我们创建一个类并扩展 DeserializationErrorHandler 基类

使用首选的日志记录解决方案创建记录器

我们创建一个字符串,用于我们的虚拟记录,该记录应被反序列化而不是

覆盖 onError 方法,该方法将使用错误详细信息调用

我们将失败的搜索命中的内容作为字符串读取

我们记录失败文档的内容,以及详细说明失败原因的异常

最后,我们返回要反序列化的虚拟数据内容。

在我们可以在反序列化错误处理程序列表中放置此处理程序之前,我们必须使用 es.read.data.error.handler.[HANDLER-NAME] 在设置中使用名称注册处理程序类。

设置 es.read.data.error.handler.[HANDLER-NAME]
创建一个名为 HANDLER-NAME 的新处理程序。此属性的值必须是用于此处理程序的类的二进制名称。

在这种情况下,让我们为我们的虚拟记录处理程序注册一个处理程序名称

es.read.data.error.handler.returnDummy = org.myproject.myhandlers.ReturnDummyHandler

现在我们有了处理程序的名称,我们可以在处理程序列表中使用它

es.read.data.error.handlers = returnDummy
es.read.data.error.handler.returnDummy = org.myproject.myhandlers.ReturnDummyHandler

现在,每当发生反序列化失败时,您的虚拟数据错误处理程序将被调用,并将指示连接器使用您提供的虚拟记录而不是格式错误的数据。

高级概念edit

如果我们不想记录数据并将其删除,而是想将其持久化到某个地方以确保安全?如果我们想将属性传递给我们的处理程序以参数化它们的行为?让我们创建一个将错误信息存储在本地文件中以供以后分析的处理程序。

package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationFailure;

public class ReturnDummyAndLogToFileHandler extends DeserializationErrorHandler { 

    private static final String DUMMY_RECORD = "...";

    private OutputStream outputStream;   
    private BufferedWriter writer;

    @Override
    public void init(Properties properties) {   
        try {
            outputStream = new FileOutputStream(properties.getProperty("filename"));   
            writer = new BufferedWriter(new OutputStreamWriter(outputStream));
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Could not open file", e);
        }
    }

    @Override
    public HandlerResult onError(DeserializationFailure entry, ErrorCollector<byte[]> collector)   
    throws Exception
    {
        BufferedReader reader = new BufferedReader(new InputStreamReader(entry.getHitContents()));
        StringBuilder hitContent = new StringBuilder();
        for (String line = reader.readLine(); line != null; line = reader.readLine()) {           
            hitContent.append(line);
        }

        writer.write("Error: " + entry.getException().getMessage());
        writer.newLine();
        for (String message : entry.previousHandlerMessages()) {
            writer.write("Previous Handler: " + message);           
            writer.newLine();
        }
        writer.write("Entry: ");
        writer.newLine();
        writer.write(hitContent.toString());
        writer.newLine();

        return collector.retry(DUMMY_RECORD.getBytes());            
    }

    @Override
    public void close() {   
        try {
            writer.close();
            outputStream.close();
        } catch (IOException e) {
            throw new RuntimeException("Closing file failed", e);
        }
    }
}

扩展 DeserializationErrorHandler 基类

一些用于将数据写入文件的本地状态

我们覆盖了 init 方法。此处理程序的任何属性都将在此处传递

我们正在从属性中提取要写入的文件。我们将在下面看到如何设置此属性

覆盖 onError 方法以定义我们的行为

读取失败的搜索命中的内容

写出错误信息。这突出了 DeserializationFailure 对象提供的全部可用数据

执行重试操作,使用我们的虚拟记录

最后,关闭任何内部分配的资源

此处理程序中添加了 initclose 方法。当首次创建滚动查询时,会调用 init 方法,当任务结束时,会调用 close 方法。 init 方法接受一个属性参数,该参数包含使用 es.read.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME] 设置的任何特定于处理程序的属性。

设置 es.read.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]
用于将属性传递给处理程序。HANDLER-NAME 是要配置的处理程序,PROPERTY-NAME 是要为处理程序设置的属性。

在我们的用例中,我们将像这样配置我们的文件日志记录错误处理程序

es.read.data.error.handler.writeFile = org.myproject.myhandlers.ReturnDummyAndLogToFileHandler   
es.read.data.error.handler.writeFile.filename = /path/to/some/output/file   

我们使用名称 writeFile 注册了新的处理程序

现在,我们为 writeFile 处理程序设置了一个名为 filename 的属性。在处理程序的 init 方法中,可以使用 filename 作为属性键来获取它。

现在将所有内容与前面的示例整合在一起

es.read.data.error.handlers = writeFile
es.read.data.error.handler.writeFile = org.myproject.myhandlers.ReturnDummyAndLogToFileHandler
es.read.data.error.handler.writeFile.filename = /path/to/some/output/file

您现在有一个处理程序,它会重试用虚拟记录替换格式错误的数据,然后通过将这些格式错误的记录及其错误信息写入自定义文件来记录这些格式错误的记录。