客户端助手
编辑客户端助手
编辑客户端提供了一组方便的助手,可以为您在使用某些 API 时提供更舒适的体验。
客户端助手是实验性的,其 API 可能会在接下来的小版本中发生变化。助手将无法在低于 10 的任何 Node.js 版本中使用。
批量助手
编辑新增 于 v7.7.0
由于 API 的结构,运行批量请求可能很复杂,此助手旨在为批量 API 提供更好的开发人员体验。
用法
编辑const { createReadStream } = require('fs') const split = require('split2') const { Client } = require('@elastic/elasticsearch') const client = new Client({ cloud: { id: '<cloud-id>' }, auth: { apiKey: 'base64EncodedKey' } }) const result = await client.helpers.bulk({ datasource: createReadStream('./dataset.ndjson').pipe(split()), onDocument (doc) { return { index: { _index: 'my-index' } } } }) console.log(result) // { // total: number, // failed: number, // retry: number, // successful: number, // time: number, // bytes: number, // aborted: boolean // }
要创建批量助手的新实例,请如上面的示例所示访问它,配置选项如下:
|
一个数组,异步生成器或包含您需要索引/创建/更新/删除的数据的可读流。它可以是字符串或对象的数组,也可以是 JSON 字符串或 JavaScript 对象的流。 const { createReadStream } = require('fs') const split = require('split2') const b = client.helpers.bulk({ // if you just use split(), the data will be used as array of strings datasource: createReadStream('./dataset.ndjson').pipe(split()) // if you need to manipulate the data, you can pass JSON.parse to split datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)) }) |
|
为数据源的每个文档调用的函数。在此函数内部,您可以操作文档,并且必须返回要对文档执行的操作。请查看 批量 API 文档,了解支持的操作。 const b = client.helpers.bulk({ onDocument (doc) { return { index: { _index: 'my-index' } } } }) |
|
每次无法索引文档且已达到最大重试次数时调用的函数。 const b = client.helpers.bulk({ onDrop (doc) { console.log(doc) } }) |
|
批量请求中每次成功操作都会调用的函数,其中包含来自 Elasticsearch 的结果以及发送的原始文档,对于删除操作则为 const b = client.helpers.bulk({ onSuccess ({ result, document }) { console.log(`SUCCESS: Document ${result.index._id} indexed to ${result.index._index}`) } }) |
|
在发送之前要达到的批量主体的大小(以字节为单位)。默认为 5MB。 const b = client.helpers.bulk({ flushBytes: 1000000 }) |
|
助手在从上次读取的文档中刷新主体之前等待的时间(以毫秒为单位)。 const b = client.helpers.bulk({ flushInterval: 30000 }) |
|
同时执行的请求数。 const b = client.helpers.bulk({ concurrency: 10 }) |
|
在调用 const b = client.helpers.bulk({ retries: 3 }) |
|
重试之前等待的时间(以毫秒为单位)。 const b = client.helpers.bulk({ wait: 3000 }) |
|
如果为 const b = client.helpers.bulk({ refreshOnCompletion: true // or refreshOnCompletion: 'index-name' }) |
支持的操作
编辑索引
编辑client.helpers.bulk({ datasource: myDatasource, onDocument (doc) { return { index: { _index: 'my-index' } } } })
创建
编辑client.helpers.bulk({ datasource: myDatasource, onDocument (doc) { return { create: { _index: 'my-index', _id: doc.id } } } })
更新
编辑client.helpers.bulk({ datasource: myDatasource, onDocument (doc) { // Note that the update operation requires you to return // an array, where the first element is the action, while // the second are the document option return [ { update: { _index: 'my-index', _id: doc.id } }, { doc_as_upsert: true } ] } })
删除
编辑client.helpers.bulk({ datasource: myDatasource, onDocument (doc) { return { delete: { _index: 'my-index', _id: doc.id } } } })
中止批量操作
编辑如果需要,您可以随时中止批量操作。批量助手返回一个 thenable,其中有一个 abort
方法。
abort 方法会停止批量操作的执行,但如果您使用的并发数大于 1,则已在运行的操作将不会停止。
const { createReadStream } = require('fs') const split = require('split2') const { Client } = require('@elastic/elasticsearch') const client = new Client({ cloud: { id: '<cloud-id>' }, auth: { apiKey: 'base64EncodedKey' } }) const b = client.helpers.bulk({ datasource: createReadStream('./dataset.ndjson').pipe(split()), onDocument (doc) { return { index: { _index: 'my-index' } } }, onDrop (doc) { b.abort() } }) console.log(await b)
将自定义选项传递给批量 API
编辑您可以将链接支持的任何选项传递给助手:批量 API,并且助手将这些选项与批量 API 调用结合使用。
const result = await client.helpers.bulk({ datasource: [...], onDocument (doc) { return { index: { _index: 'my-index' } } }, pipeline: 'my-pipeline' })
与异步生成器一起使用
编辑const { Client } = require('@elastic/elasticsearch') async function * generator () { const dataset = [ { user: 'jon', age: 23 }, { user: 'arya', age: 18 }, { user: 'tyrion', age: 39 } ] for (const doc of dataset) { yield doc } } const client = new Client({ cloud: { id: '<cloud-id>' }, auth: { apiKey: 'base64EncodedKey' } }) const result = await client.helpers.bulk({ datasource: generator(), onDocument (doc) { return { index: { _index: 'my-index' } } } }) console.log(result)
在操作之前修改文档
编辑新增 于 v8.8.2
如果需要在将数据源发送到 Elasticsearch 之前修改其中的文档,可以在 onDocument
函数中返回一个数组,而不是操作对象。数组中的第一项必须是操作对象,第二项必须是您希望将其发送到 Elasticsearch 的文档或部分文档对象。
const { Client } = require('@elastic/elasticsearch') const client = new Client({ cloud: { id: '<cloud-id>' }, auth: { apiKey: 'base64EncodedKey' } }) const result = await client.helpers.bulk({ datasource: [...], onDocument (doc) { return [ { index: { _index: 'my-index' } }, { ...doc, favorite_color: 'mauve' }, ] } }) console.log(result)
多搜索助手
编辑新增 于 v7.8.0
如果您以高频率发送搜索请求,则此助手可能对您有用。它在后台使用多搜索 API 来批量处理请求并提高应用程序的整体性能。result
还公开了一个 documents
属性,该属性允许您直接访问命中源。
用法
编辑const { Client } = require('@elastic/elasticsearch') const client = new Client({ cloud: { id: '<cloud-id>' }, auth: { apiKey: 'base64EncodedKey' } }) const m = client.helpers.msearch() m.search( { index: 'stackoverflow' }, { query: { match: { title: 'javascript' } } } ) .then(result => console.log(result.body)) // or result.documents .catch(err => console.error(err))
要创建多搜索 (msearch) 助手的新实例,您应该如上面的示例所示访问它,配置选项如下:
|
单个 msearch 请求中应发送多少个搜索操作。 const m = client.helpers.msearch({ operations: 10 }) |
|
助手在从上次读取的操作中刷新操作之前等待的时间(以毫秒为单位)。 const m = client.helpers.msearch({ flushInterval: 500 }) |
|
同时执行的请求数。 const m = client.helpers.msearch({ concurrency: 10 }) |
|
在解析请求之前重试操作的次数。仅在出现 429 错误时才重试操作。 const m = client.helpers.msearch({ retries: 3 }) |
|
重试之前等待的时间(以毫秒为单位)。 const m = client.helpers.msearch({ wait: 3000 }) |
停止 msearch 助手
编辑如果需要,您可以随时停止 msearch 处理器。msearch 助手返回一个 thenable,其中有一个 stop
方法。
如果您创建了多个 msearch 助手实例并将其用于有限的时间段,请记住在使用完它们后始终使用 stop
方法,否则您的应用程序将开始泄漏内存。
stop
方法接受一个可选的错误,该错误将在每个后续搜索请求中分派。
stop 方法会停止 msearch 处理器的执行,但如果您使用的并发数大于 1,则已在运行的操作将不会停止。
const { Client } = require('@elastic/elasticsearch') const client = new Client({ cloud: { id: '<cloud-id>' }, auth: { apiKey: 'base64EncodedKey' } }) const m = client.helpers.msearch() m.search( { index: 'stackoverflow' }, { query: { match: { title: 'javascript' } } } ) .then(result => console.log(result.body)) .catch(err => console.error(err)) m.search( { index: 'stackoverflow' }, { query: { match: { title: 'ruby' } } } ) .then(result => console.log(result.body)) .catch(err => console.error(err)) setImmediate(() => m.stop())
搜索助手
编辑新增 于 v7.7.0
一个围绕搜索 API 的简单包装器。它不是返回整个 result
对象,而是仅返回搜索文档源。为了提高性能,此助手会自动将 filter_path=hits.hits._source
添加到查询字符串。
const documents = await client.helpers.search({ index: 'stackoverflow', query: { match: { title: 'javascript' } } }) for (const doc of documents) { console.log(doc) }
滚动搜索助手
编辑新增 于 v7.7.0
此助手提供了一种简单直观的方法来使用滚动搜索 API。调用后,它会返回一个 异步迭代器,可以与 for-await...of 一起使用。它会自动处理 429
错误并使用客户端的 maxRetries
选项。
const scrollSearch = client.helpers.scrollSearch({ index: 'stackoverflow', query: { match: { title: 'javascript' } } }) for await (const result of scrollSearch) { console.log(result) }
清除滚动搜索
编辑如果需要,您可以通过调用 result.clear()
来清除滚动搜索。
for await (const result of scrollSearch) { if (condition) { await result.clear() } }
快速获取文档
编辑如果只需要滚动搜索结果中的文档,可以通过 result.documents
访问它们。
for await (const result of scrollSearch) { console.log(result.documents) }
滚动文档助手
编辑新增 于 v7.7.0
它的工作方式与滚动搜索助手相同,但它只返回文档。请注意,每个循环周期都会返回一个文档,并且您不能使用 clear
方法。为了提高性能,此助手会自动将 filter_path=hits.hits._source
添加到查询字符串。
const scrollSearch = client.helpers.scrollDocuments({ index: 'stackoverflow', query: { match: { title: 'javascript' } } }) for await (const doc of scrollSearch) { console.log(doc) }
ES|QL 助手
编辑ES|QL 查询可以以多种格式返回其结果。ES|QL 查询返回的默认 JSON 格式包含每行的值数组,列名和类型分别返回
用法
编辑toRecords
编辑新增 于 v8.14.0
ES|QL 查询返回的默认 JSON 格式包含每行的值数组,列名和类型分别返回
{ "columns": [ { "name": "@timestamp", "type": "date" }, { "name": "client_ip", "type": "ip" }, { "name": "event_duration", "type": "long" }, { "name": "message", "type": "keyword" } ], "values": [ [ "2023-10-23T12:15:03.360Z", "172.21.2.162", 3450233, "Connected to 10.1.0.3" ], [ "2023-10-23T12:27:28.948Z", "172.21.2.113", 2764889, "Connected to 10.1.0.2" ] ] }
在许多情况下,最好操作对象数组(每行一个对象),而不是数组数组。ES|QL toRecords
助手将行数据转换为对象。
await client.helpers .esql({ query: 'FROM sample_data | LIMIT 2' }) .toRecords() // => // { // "columns": [ // { "name": "@timestamp", "type": "date" }, // { "name": "client_ip", "type": "ip" }, // { "name": "event_duration", "type": "long" }, // { "name": "message", "type": "keyword" } // ], // "records": [ // { // "@timestamp": "2023-10-23T12:15:03.360Z", // "client_ip": "172.21.2.162", // "event_duration": 3450233, // "message": "Connected to 10.1.0.3" // }, // { // "@timestamp": "2023-10-23T12:27:28.948Z", // "client_ip": "172.21.2.113", // "event_duration": 2764889, // "message": "Connected to 10.1.0.2" // }, // ] // }
在 TypeScript 中,您可以声明 toRecords
返回的类型
type EventLog = { '@timestamp': string, client_ip: string, event_duration: number, message: string, } const result = await client.helpers .esql({ query: 'FROM sample_data | LIMIT 2' }) .toRecords<EventLog>()
toArrowReader
编辑新增 于 v8.16.0
ES|QL 可以以多种二进制格式返回结果,包括 Apache Arrow 的流格式。由于它是一种非常高效的读取格式,因此对于执行高性能的内存分析非常有用。而且,由于响应是作为一批批记录进行流式传输的,因此可以将其用于对大于内存的数据集生成聚合和其他计算。
toArrowReader
返回一个 RecordBatchStreamReader
。
const reader = await client.helpers .esql({ query: 'FROM sample_data' }) .toArrowReader() // print each record as JSON for (const recordBatch of reader) { for (const record of recordBatch) { console.log(record.toJSON()) } }
toArrowTable
编辑新增 于 v8.16.0
如果您想以 Arrow 格式提取整个数据集,但不进行流式传输,可以使用 toArrowTable
助手获取 Table 作为返回。
const table = await client.helpers .esql({ query: 'FROM sample_data' }) .toArrowTable() console.log(table.toArray())