批量:索引多个文档

编辑

批量请求允许在一次请求中向 Elasticsearch 发送多个与文档相关的操作。当您有多个文档需要摄取时,这比为每个文档发送单独的请求更有效。

一个批量请求可以包含几种类型的操作

  • 创建一个文档,在确保它不存在之后索引它,
  • 索引一个文档,如果需要则创建它,如果它存在则替换它,
  • 使用脚本或部分文档就地更新已存在的文档,
  • 删除一个文档。

有关批量请求的完整解释,请参阅 Elasticsearch API 文档

索引应用程序对象

编辑

BulkRequest 包含一系列操作,每个操作都是一个具有多个变体的类型。要创建此请求,使用主请求的构建器对象和每个操作的流畅 DSL 很方便。

下面的示例展示了如何索引应用程序对象的列表。

List<Product> products = fetchProducts();

BulkRequest.Builder br = new BulkRequest.Builder();

for (Product product : products) {
    br.operations(op -> op           
        .index(idx -> idx            
            .index("products")       
            .id(product.getSku())
            .document(product)
        )
    );
}

BulkResponse result = esClient.bulk(br.build());

// Log errors, if any
if (result.errors()) {
    logger.error("Bulk had errors");
    for (BulkResponseItem item: result.items()) {
        if (item.error() != null) {
            logger.error(item.error().reason());
        }
    }
}

添加一个操作(请记住,列表属性是累加的)。opBulkOperation 的构建器,它是一个变体类型。此类型具有 indexcreateupdatedelete 变体。

选择 index 操作变体,idxIndexOperation 的构建器。

设置索引操作的属性,类似于单个文档索引:索引名称、标识符和文档。

索引原始 JSON 数据

编辑

批量索引请求的 document 属性可以是可以使用 Elasticsearch 客户端的 JSON 映射器序列化为 JSON 的任何对象。但是,批量摄取的数据通常以 JSON 文本(例如磁盘上的文件)的形式提供,而解析此 JSON 只是为了重新序列化以发送批量请求将是资源的浪费。因此,批量操作中的文档也可以是 BinaryData 类型,它们会逐字发送(不进行解析)到 Elasticsearch 服务器。

在下面的示例中,我们将使用 Java API 客户端的 BinaryData 从日志目录读取 json 文件并将它们发送到批量请求中。

// List json log files in the log directory
File[] logFiles = logDir.listFiles(
    file -> file.getName().matches("log-.*\\.json")
);

BulkRequest.Builder br = new BulkRequest.Builder();

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    br.operations(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}

使用批量摄取器进行流式摄取

编辑

BulkIngester 通过提供一个实用程序类来简化批量 API 的使用,该实用程序类允许将索引/更新/删除操作透明地分组到批量请求中。您只需将 add() 批量操作添加到摄取器,它将根据其配置负责分组和批量发送它们。

当满足以下条件之一时,摄取器将发送批量请求

  • 操作数超过最大值(默认为 1000)
  • 批量请求的大小(以字节为单位)超过最大值(默认为 5 MiB)
  • 自上次请求以来的延迟已过期(定期刷新,无默认值)

此外,您可以定义等待 Elasticsearch 执行的最大并发请求数(默认为 1)。当达到最大值并且已收集最大操作数时,向索引器添加新操作将被阻止。这通过在客户端应用程序上施加背压来避免 Elasticsearch 服务器过载。

BulkIngester<Void> ingester = BulkIngester.of(b -> b
    .client(esClient)    
    .maxOperations(100)  
    .flushInterval(1, TimeUnit.SECONDS) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    ingester.add(op -> op 
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}

ingester.close(); 

设置用于发送批量请求的 Elasticsearch 客户端。

设置在发送批量请求之前要收集的最大操作数。

设置刷新间隔。

向摄取器添加批量操作。

关闭摄取器以刷新挂起的操作并释放资源。

此外,批量摄取器接受一个监听器,以便您的应用程序可以收到已发送的批量请求及其结果的通知。为了允许将批量操作与应用程序上下文相关联,add() 方法可以选择接受 context 参数。此上下文参数的类型用作 BulkIngester 对象的泛型参数。您可能已经注意到上面的 BulkIngester<Void> 中的 Void 类型:这是因为我们没有注册监听器,因此不关心上下文值。

以下示例展示了如何使用上下文值来实现批量摄取监听器:与之前一样,它批量发送 JSON 日志文件,但会跟踪批量请求错误和失败的操作。当操作失败时,根据错误类型,您可能需要将其重新添加到摄取器。

BulkListener<String> listener = new BulkListener<String>() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
        // The request was accepted, but may contain failed items.
        // The "context" list gives the file name for each bulk item.
        logger.debug("Bulk request " + executionId + " completed");
        for (int i = 0; i < contexts.size(); i++) {
            BulkResponseItem item = response.items().get(i);
            if (item.error() != null) {
                // Inspect the failure cause
                logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());
            }
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
        // The request could not be sent
        logger.debug("Bulk request " + executionId + " failed", failure);
    }
};

BulkIngester<String> ingester = BulkIngester.of(b -> b
    .client(esClient)
    .maxOperations(100)
    .flushInterval(1, TimeUnit.SECONDS)
    .listener(listener) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    ingester.add(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        ),
        file.getName() 
    );
}

ingester.close();

创建一个监听器,其中上下文值是摄取文件名的字符串。

在批量摄取器上注册监听器。

将文件名设置为批量操作的上下文值。

批量摄取还公开了统计信息,这些信息允许监视摄取过程并调整其配置

  • 已添加的操作数,
  • 由于达到最大并发请求数而被阻止的 add() 调用次数(争用),
  • 已发送的批量请求数,
  • 由于达到最大并发请求数而被阻止的批量请求数。

上面示例的源代码可以在 Java API 客户端测试中找到。