批量:索引多个文档
编辑批量:索引多个文档
编辑批量请求允许在一个请求中将多个与文档相关的操作发送到 Elasticsearch。当您有多个文档需要导入时,这比使用单独的请求发送每个文档更高效。
批量请求可以包含几种操作
- 创建文档,在确保它不存在后对其进行索引,
- 索引文档,如果需要则创建它,如果它存在则替换它,
- 更新已存在的文档,可以使用脚本或部分文档,
- 删除文档。
请参阅 Elasticsearch API 文档,了解批量请求的完整解释。
索引应用程序对象
编辑一个 BulkRequest
包含一系列操作,每个操作都是一个 具有多个变体的类型。要创建此请求,可以使用主请求的构建器对象以及每个操作的流畅 DSL,这很方便。
下面的示例显示了如何索引应用程序对象的列表。
List<Product> products = fetchProducts(); BulkRequest.Builder br = new BulkRequest.Builder(); for (Product product : products) { br.operations(op -> op .index(idx -> idx .index("products") .id(product.getSku()) .document(product) ) ); } BulkResponse result = esClient.bulk(br.build()); // Log errors, if any if (result.errors()) { logger.error("Bulk had errors"); for (BulkResponseItem item: result.items()) { if (item.error() != null) { logger.error(item.error().reason()); } } }
添加一个操作(请记住,列表属性是累加的)。 |
|
选择 |
|
设置索引操作的属性,类似于 单个文档索引:索引名称、标识符和文档。 |
索引原始 JSON 数据
编辑批量索引请求的 document
属性可以是任何可以使用 Elasticsearch 客户端的 JSON 映射器序列化为 JSON 的对象。但是,批量导入的数据通常以 JSON 文本的形式提供(例如磁盘上的文件),并且仅解析此 JSON 以重新序列化它以发送批量请求将浪费资源。因此,批量操作中的文档也可以是 BinaryData
类型,这些类型会逐字(不进行解析)发送到 Elasticsearch 服务器。
在下面的示例中,我们将使用 Java API 客户端的 BinaryData
从日志目录读取 json 文件并在批量请求中发送它们。
// List json log files in the log directory File[] logFiles = logDir.listFiles( file -> file.getName().matches("log-.*\\.json") ); BulkRequest.Builder br = new BulkRequest.Builder(); for (File file: logFiles) { FileInputStream input = new FileInputStream(file); BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON); br.operations(op -> op .index(idx -> idx .index("logs") .document(data) ) ); }
使用批量导入器进行流式导入
编辑BulkIngester
通过提供一个实用程序类来简化 Bulk API 的使用,该类允许将索引/更新/删除操作透明地分组到批量请求中。您只需将批量操作 add()
到导入器,它将根据其配置处理分组和批量发送。
当满足以下条件之一时,导入器将发送批量请求
- 操作数超过最大值(默认为 1000)
- 批量请求大小(以字节为单位)超过最大值(默认为 5 MiB)
- 自上次请求以来延迟已过期(定期刷新,无默认值)
此外,您可以定义 Elasticsearch 等待执行的最大并发请求数(默认为 1)。当达到该最大值并且已收集最大操作数时,向索引器添加新操作将被阻塞。这样可以避免通过向客户端应用程序施加反压来过载 Elasticsearch 服务器。
BulkIngester<Void> ingester = BulkIngester.of(b -> b .client(esClient) .maxOperations(100) .flushInterval(1, TimeUnit.SECONDS) ); for (File file: logFiles) { FileInputStream input = new FileInputStream(file); BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON); ingester.add(op -> op .index(idx -> idx .index("logs") .document(data) ) ); } ingester.close();
此外,批量导入器接受一个监听器,以便您的应用程序可以收到发送的批量请求及其结果的通知。为了允许将批量操作与应用程序上下文相关联,add()
方法可以选择接受一个 context
参数。此上下文参数的类型用作 BulkIngester
对象的泛型参数。您可能已经注意到上面 BulkIngester<Void>
中的 Void
类型:这是因为我们没有注册监听器,因此不关心上下文值。
以下示例显示了如何使用上下文值来实现批量导入监听器:与之前一样,它批量发送 JSON 日志文件,但跟踪批量请求错误和失败的操作。当操作失败时,根据错误类型,您可能希望将其重新添加到导入器。
BulkListener<String> listener = new BulkListener<String>() { @Override public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) { } @Override public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) { // The request was accepted, but may contain failed items. // The "context" list gives the file name for each bulk item. logger.debug("Bulk request " + executionId + " completed"); for (int i = 0; i < contexts.size(); i++) { BulkResponseItem item = response.items().get(i); if (item.error() != null) { // Inspect the failure cause logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason()); } } } @Override public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) { // The request could not be sent logger.debug("Bulk request " + executionId + " failed", failure); } }; BulkIngester<String> ingester = BulkIngester.of(b -> b .client(esClient) .maxOperations(100) .flushInterval(1, TimeUnit.SECONDS) .listener(listener) ); for (File file: logFiles) { FileInputStream input = new FileInputStream(file); BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON); ingester.add(op -> op .index(idx -> idx .index("logs") .document(data) ), file.getName() ); } ingester.close();
批量导入还公开了统计信息,这些信息允许监视导入过程并调整其配置
- 添加的操作数,
- 由于达到最大并发请求数而导致
add()
调用被阻塞的次数(争用), - 发送的批量请求数,
- 由于达到最大并发请求数而导致批量请求被阻塞的次数。
上面示例的源代码可以在 Java API 客户端测试 中找到。