批量:索引多个文档编辑

批量请求允许将多个与文档相关的操作发送到 Elasticsearch 中的一个请求中。当您有多个文档要导入时,这比使用单独的请求发送每个文档更有效。

批量请求可以包含几种类型的操作

  • 创建文档,在确保它不存在的情况下对其进行索引,
  • 索引文档,如果需要则创建它,如果它存在则替换它,
  • 更新已存在的文档,可以使用脚本或部分文档,
  • 删除文档。

有关批量请求的完整说明,请参阅 Elasticsearch API 文档

索引应用程序对象编辑

一个 BulkRequest 包含一系列操作,每个操作都是一个 具有多个变体的类型。要创建此请求,可以使用主请求的构建器对象和每个操作的流畅 DSL 很方便。

以下示例展示了如何索引应用程序对象的列表。

List<Product> products = fetchProducts();

BulkRequest.Builder br = new BulkRequest.Builder();

for (Product product : products) {
    br.operations(op -> op           
        .index(idx -> idx            
            .index("products")       
            .id(product.getSku())
            .document(product)
        )
    );
}

BulkResponse result = esClient.bulk(br.build());

// Log errors, if any
if (result.errors()) {
    logger.error("Bulk had errors");
    for (BulkResponseItem item: result.items()) {
        if (item.error() != null) {
            logger.error(item.error().reason());
        }
    }
}

添加一个操作(请记住 列表属性是累加的)。opBulkOperation 的构建器,BulkOperation 是一个 变体类型。此类型具有 indexcreateupdatedelete 变体。

选择 index 操作变体,idxIndexOperation 的构建器。

设置索引操作的属性,类似于 单个文档索引:索引名称、标识符和文档。

索引原始 JSON 数据编辑

批量索引请求的 document 属性可以是任何可以使用 Elasticsearch 客户端的 JSON 映射器序列化为 JSON 的对象。但是,批量导入的数据通常以 JSON 文本形式提供(例如磁盘上的文件),并且仅解析此 JSON 以重新序列化它以发送批量请求将是浪费资源。因此,批量操作中的文档也可以是 BinaryData 类型,这些数据将逐字发送(不进行解析)到 Elasticsearch 服务器。

在下面的示例中,我们将使用 Java API 客户端的 BinaryData 从日志目录读取 json 文件并将它们发送到批量请求中。

// List json log files in the log directory
File[] logFiles = logDir.listFiles(
    file -> file.getName().matches("log-.*\\.json")
);

BulkRequest.Builder br = new BulkRequest.Builder();

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    br.operations(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}

使用批量导入器进行流式导入编辑

BulkIngester 通过提供一个实用程序类来简化 Bulk API 的使用,该类允许索引/更新/删除操作透明地分组到批量请求中。您只需要将批量操作 add() 到导入器,它将根据其配置负责分组和批量发送它们。

当满足以下条件之一时,导入器将发送批量请求

  • 操作数量超过最大值(默认为 1000)
  • 批量请求大小(以字节为单位)超过最大值(默认为 5 MiB)
  • 自上次请求以来的延迟已过期(定期刷新,无默认值)

此外,您可以定义 Elasticsearch 要执行的并发请求的最大数量(默认为 1)。当达到该最大值并且已收集到最大数量的操作时,向索引器添加新操作将被阻塞。这通过对客户端应用程序施加反压来避免过载 Elasticsearch 服务器。

BulkIngester<Void> ingester = BulkIngester.of(b -> b
    .client(esClient)    
    .maxOperations(100)  
    .flushInterval(1, TimeUnit.SECONDS) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    ingester.add(op -> op 
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}

ingester.close(); 

设置用于发送批量请求的 Elasticsearch 客户端。

设置在发送批量请求之前要收集的操作的最大数量。

设置刷新间隔。

将批量操作添加到导入器。

关闭导入器以刷新挂起的操作并释放资源。

此外,批量导入器接受一个侦听器,以便您的应用程序可以收到发送的批量请求及其结果的通知。为了允许将批量操作与应用程序上下文相关联,add() 方法可以选择接受一个 context 参数。此上下文参数的类型用作 BulkIngester 对象的泛型参数。您可能已经注意到上面的 BulkIngester<Void> 中的 Void 类型:这是因为我们没有注册侦听器,因此不关心上下文值。

以下示例展示了如何使用上下文值来实现批量导入侦听器:与以前一样,它以批量方式发送 JSON 日志文件,但跟踪批量请求错误和失败的操作。当操作失败时,您可能希望根据错误类型将其重新添加到导入器。

BulkListener<String> listener = new BulkListener<String>() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
        // The request was accepted, but may contain failed items.
        // The "context" list gives the file name for each bulk item.
        logger.debug("Bulk request " + executionId + " completed");
        for (int i = 0; i < contexts.size(); i++) {
            BulkResponseItem item = response.items().get(i);
            if (item.error() != null) {
                // Inspect the failure cause
                logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());
            }
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
        // The request could not be sent
        logger.debug("Bulk request " + executionId + " failed", failure);
    }
};

BulkIngester<String> ingester = BulkIngester.of(b -> b
    .client(esClient)
    .maxOperations(100)
    .flushInterval(1, TimeUnit.SECONDS)
    .listener(listener) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    ingester.add(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        ),
        file.getName() 
    );
}

ingester.close();

创建一个侦听器,其中上下文值是导入的文件名的字符串。

在批量导入器上注册侦听器。

将文件名设置为批量操作的上下文值。

批量导入还公开统计信息,允许监控导入过程并调整其配置

  • 添加的操作数量,
  • 由于达到并发请求的最大数量而导致 add() 调用被阻塞的次数(争用),
  • 发送的批量请求数量,
  • 由于达到并发请求的最大数量而导致被阻塞的批量请求数量。

上面示例的源代码可以在 Java API 客户端测试 中找到。