批量:索引多个文档

编辑

批量请求允许在一个请求中将多个与文档相关的操作发送到 Elasticsearch。当您有多个文档需要导入时,这比使用单独的请求发送每个文档更高效。

批量请求可以包含几种操作

  • 创建文档,在确保它不存在后对其进行索引,
  • 索引文档,如果需要则创建它,如果它存在则替换它,
  • 更新已存在的文档,可以使用脚本或部分文档,
  • 删除文档。

请参阅 Elasticsearch API 文档,了解批量请求的完整解释。

索引应用程序对象

编辑

一个 BulkRequest 包含一系列操作,每个操作都是一个 具有多个变体的类型。要创建此请求,可以使用主请求的构建器对象以及每个操作的流畅 DSL,这很方便。

下面的示例显示了如何索引应用程序对象的列表。

List<Product> products = fetchProducts();

BulkRequest.Builder br = new BulkRequest.Builder();

for (Product product : products) {
    br.operations(op -> op           
        .index(idx -> idx            
            .index("products")       
            .id(product.getSku())
            .document(product)
        )
    );
}

BulkResponse result = esClient.bulk(br.build());

// Log errors, if any
if (result.errors()) {
    logger.error("Bulk had errors");
    for (BulkResponseItem item: result.items()) {
        if (item.error() != null) {
            logger.error(item.error().reason());
        }
    }
}

添加一个操作(请记住,列表属性是累加的)。opBulkOperation 的构建器,BulkOperation 是一个 变体类型。此类型具有 indexcreateupdatedelete 变体。

选择 index 操作变体,idxIndexOperation 的构建器。

设置索引操作的属性,类似于 单个文档索引:索引名称、标识符和文档。

索引原始 JSON 数据

编辑

批量索引请求的 document 属性可以是任何可以使用 Elasticsearch 客户端的 JSON 映射器序列化为 JSON 的对象。但是,批量导入的数据通常以 JSON 文本的形式提供(例如磁盘上的文件),并且仅解析此 JSON 以重新序列化它以发送批量请求将浪费资源。因此,批量操作中的文档也可以是 BinaryData 类型,这些类型会逐字(不进行解析)发送到 Elasticsearch 服务器。

在下面的示例中,我们将使用 Java API 客户端的 BinaryData 从日志目录读取 json 文件并在批量请求中发送它们。

// List json log files in the log directory
File[] logFiles = logDir.listFiles(
    file -> file.getName().matches("log-.*\\.json")
);

BulkRequest.Builder br = new BulkRequest.Builder();

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    br.operations(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}

使用批量导入器进行流式导入

编辑

BulkIngester 通过提供一个实用程序类来简化 Bulk API 的使用,该类允许将索引/更新/删除操作透明地分组到批量请求中。您只需将批量操作 add() 到导入器,它将根据其配置处理分组和批量发送。

当满足以下条件之一时,导入器将发送批量请求

  • 操作数超过最大值(默认为 1000)
  • 批量请求大小(以字节为单位)超过最大值(默认为 5 MiB)
  • 自上次请求以来延迟已过期(定期刷新,无默认值)

此外,您可以定义 Elasticsearch 等待执行的最大并发请求数(默认为 1)。当达到该最大值并且已收集最大操作数时,向索引器添加新操作将被阻塞。这样可以避免通过向客户端应用程序施加反压来过载 Elasticsearch 服务器。

BulkIngester<Void> ingester = BulkIngester.of(b -> b
    .client(esClient)    
    .maxOperations(100)  
    .flushInterval(1, TimeUnit.SECONDS) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    ingester.add(op -> op 
        .index(idx -> idx
            .index("logs")
            .document(data)
        )
    );
}

ingester.close(); 

设置用于发送批量请求的 Elasticsearch 客户端。

设置发送批量请求之前要收集的最大操作数。

设置刷新间隔。

向导入器添加批量操作。

关闭导入器以刷新挂起的操作并释放资源。

此外,批量导入器接受一个监听器,以便您的应用程序可以收到发送的批量请求及其结果的通知。为了允许将批量操作与应用程序上下文相关联,add() 方法可以选择接受一个 context 参数。此上下文参数的类型用作 BulkIngester 对象的泛型参数。您可能已经注意到上面 BulkIngester<Void> 中的 Void 类型:这是因为我们没有注册监听器,因此不关心上下文值。

以下示例显示了如何使用上下文值来实现批量导入监听器:与之前一样,它批量发送 JSON 日志文件,但跟踪批量请求错误和失败的操作。当操作失败时,根据错误类型,您可能希望将其重新添加到导入器。

BulkListener<String> listener = new BulkListener<String>() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
        // The request was accepted, but may contain failed items.
        // The "context" list gives the file name for each bulk item.
        logger.debug("Bulk request " + executionId + " completed");
        for (int i = 0; i < contexts.size(); i++) {
            BulkResponseItem item = response.items().get(i);
            if (item.error() != null) {
                // Inspect the failure cause
                logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());
            }
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
        // The request could not be sent
        logger.debug("Bulk request " + executionId + " failed", failure);
    }
};

BulkIngester<String> ingester = BulkIngester.of(b -> b
    .client(esClient)
    .maxOperations(100)
    .flushInterval(1, TimeUnit.SECONDS)
    .listener(listener) 
);

for (File file: logFiles) {
    FileInputStream input = new FileInputStream(file);
    BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON);

    ingester.add(op -> op
        .index(idx -> idx
            .index("logs")
            .document(data)
        ),
        file.getName() 
    );
}

ingester.close();

创建一个监听器,其中上下文值为导入的文件名。

在批量导入器上注册监听器。

将文件名设置为批量操作的上下文值。

批量导入还公开了统计信息,这些信息允许监视导入过程并调整其配置

  • 添加的操作数,
  • 由于达到最大并发请求数而导致 add() 调用被阻塞的次数(争用),
  • 发送的批量请求数,
  • 由于达到最大并发请求数而导致批量请求被阻塞的次数。

上面示例的源代码可以在 Java API 客户端测试 中找到。