批量:索引多个文档
编辑批量:索引多个文档
编辑批量请求允许在一次请求中向 Elasticsearch 发送多个与文档相关的操作。当您有多个文档需要摄取时,这比为每个文档发送单独的请求更有效。
一个批量请求可以包含几种类型的操作
- 创建一个文档,在确保它不存在之后索引它,
- 索引一个文档,如果需要则创建它,如果它存在则替换它,
- 使用脚本或部分文档就地更新已存在的文档,
- 删除一个文档。
有关批量请求的完整解释,请参阅 Elasticsearch API 文档。
索引应用程序对象
编辑BulkRequest
包含一系列操作,每个操作都是一个具有多个变体的类型。要创建此请求,使用主请求的构建器对象和每个操作的流畅 DSL 很方便。
下面的示例展示了如何索引应用程序对象的列表。
List<Product> products = fetchProducts(); BulkRequest.Builder br = new BulkRequest.Builder(); for (Product product : products) { br.operations(op -> op .index(idx -> idx .index("products") .id(product.getSku()) .document(product) ) ); } BulkResponse result = esClient.bulk(br.build()); // Log errors, if any if (result.errors()) { logger.error("Bulk had errors"); for (BulkResponseItem item: result.items()) { if (item.error() != null) { logger.error(item.error().reason()); } } }
添加一个操作(请记住,列表属性是累加的)。 |
|
选择 |
|
设置索引操作的属性,类似于单个文档索引:索引名称、标识符和文档。 |
索引原始 JSON 数据
编辑批量索引请求的 document
属性可以是可以使用 Elasticsearch 客户端的 JSON 映射器序列化为 JSON 的任何对象。但是,批量摄取的数据通常以 JSON 文本(例如磁盘上的文件)的形式提供,而解析此 JSON 只是为了重新序列化以发送批量请求将是资源的浪费。因此,批量操作中的文档也可以是 BinaryData
类型,它们会逐字发送(不进行解析)到 Elasticsearch 服务器。
在下面的示例中,我们将使用 Java API 客户端的 BinaryData
从日志目录读取 json 文件并将它们发送到批量请求中。
// List json log files in the log directory File[] logFiles = logDir.listFiles( file -> file.getName().matches("log-.*\\.json") ); BulkRequest.Builder br = new BulkRequest.Builder(); for (File file: logFiles) { FileInputStream input = new FileInputStream(file); BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON); br.operations(op -> op .index(idx -> idx .index("logs") .document(data) ) ); }
使用批量摄取器进行流式摄取
编辑BulkIngester
通过提供一个实用程序类来简化批量 API 的使用,该实用程序类允许将索引/更新/删除操作透明地分组到批量请求中。您只需将 add()
批量操作添加到摄取器,它将根据其配置负责分组和批量发送它们。
当满足以下条件之一时,摄取器将发送批量请求
- 操作数超过最大值(默认为 1000)
- 批量请求的大小(以字节为单位)超过最大值(默认为 5 MiB)
- 自上次请求以来的延迟已过期(定期刷新,无默认值)
此外,您可以定义等待 Elasticsearch 执行的最大并发请求数(默认为 1)。当达到最大值并且已收集最大操作数时,向索引器添加新操作将被阻止。这通过在客户端应用程序上施加背压来避免 Elasticsearch 服务器过载。
BulkIngester<Void> ingester = BulkIngester.of(b -> b .client(esClient) .maxOperations(100) .flushInterval(1, TimeUnit.SECONDS) ); for (File file: logFiles) { FileInputStream input = new FileInputStream(file); BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON); ingester.add(op -> op .index(idx -> idx .index("logs") .document(data) ) ); } ingester.close();
此外,批量摄取器接受一个监听器,以便您的应用程序可以收到已发送的批量请求及其结果的通知。为了允许将批量操作与应用程序上下文相关联,add()
方法可以选择接受 context
参数。此上下文参数的类型用作 BulkIngester
对象的泛型参数。您可能已经注意到上面的 BulkIngester<Void>
中的 Void
类型:这是因为我们没有注册监听器,因此不关心上下文值。
以下示例展示了如何使用上下文值来实现批量摄取监听器:与之前一样,它批量发送 JSON 日志文件,但会跟踪批量请求错误和失败的操作。当操作失败时,根据错误类型,您可能需要将其重新添加到摄取器。
BulkListener<String> listener = new BulkListener<String>() { @Override public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) { } @Override public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) { // The request was accepted, but may contain failed items. // The "context" list gives the file name for each bulk item. logger.debug("Bulk request " + executionId + " completed"); for (int i = 0; i < contexts.size(); i++) { BulkResponseItem item = response.items().get(i); if (item.error() != null) { // Inspect the failure cause logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason()); } } } @Override public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) { // The request could not be sent logger.debug("Bulk request " + executionId + " failed", failure); } }; BulkIngester<String> ingester = BulkIngester.of(b -> b .client(esClient) .maxOperations(100) .flushInterval(1, TimeUnit.SECONDS) .listener(listener) ); for (File file: logFiles) { FileInputStream input = new FileInputStream(file); BinaryData data = BinaryData.of(IOUtils.toByteArray(input), ContentType.APPLICATION_JSON); ingester.add(op -> op .index(idx -> idx .index("logs") .document(data) ), file.getName() ); } ingester.close();
批量摄取还公开了统计信息,这些信息允许监视摄取过程并调整其配置
- 已添加的操作数,
- 由于达到最大并发请求数而被阻止的
add()
调用次数(争用), - 已发送的批量请求数,
- 由于达到最大并发请求数而被阻止的批量请求数。
上面示例的源代码可以在 Java API 客户端测试中找到。