客户端助手

编辑

客户端提供了一组方便的助手,让您在使用某些 API 时获得更舒适的体验。

客户端助手处于实验阶段,API 可能会在下个次要版本中发生更改。这些助手在低于 10 的任何 Node.js 版本中均无法工作。

批量助手

编辑

新增于 v7.7.0

由于 API 的结构复杂,运行批量请求可能很复杂,此助手旨在围绕批量 API 提供更好的开发人员体验。

用法

编辑
const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

const client = new Client({
  cloud: { id: '<cloud-id>' },
  auth: { apiKey: 'base64EncodedKey' }
})
const result = await client.helpers.bulk({
  datasource: createReadStream('./dataset.ndjson').pipe(split()),
  onDocument (doc) {
    return {
      index: { _index: 'my-index' }
    }
  }
})

console.log(result)
// {
//   total: number,
//   failed: number,
//   retry: number,
//   successful: number,
//   time: number,
//   bytes: number,
//   aborted: boolean
// }

要创建批量助手的新的实例,请按上述示例中所示进行访问,配置选项如下:

数据源

一个数组、异步生成器或包含您需要索引/创建/更新/删除的数据的可读流。它可以是字符串或对象的数组,也可以是 JSON 字符串或 JavaScript 对象的流。
如果是流,我们建议使用 split2 包,它会在换行符分隔符处分割流。
此参数是必需的。

const { createReadStream } = require('fs')
const split = require('split2')
const b = client.helpers.bulk({
  // if you just use split(), the data will be used as array of strings
  datasource: createReadStream('./dataset.ndjson').pipe(split())
  // if you need to manipulate the data, you can pass JSON.parse to split
  datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse))
})

onDocument

一个函数,它将为数据源的每个文档调用。在此函数中,您可以操作文档,并且必须返回您想要对文档执行的操作。请查看 批量 API 文档 以查看支持的操作。
此参数是必需的。

const b = client.helpers.bulk({
  onDocument (doc) {
    return {
      index: { _index: 'my-index' }
    }
  }
})

onDrop

一个函数,每当无法索引文档且已达到最大重试次数时调用。

const b = client.helpers.bulk({
  onDrop (doc) {
    console.log(doc)
  }
})

onSuccess

一个函数,它将为批量请求中的每个成功操作调用,其中包括来自 Elasticsearch 的结果以及发送的原始文档,或对于删除操作为 null

const b = client.helpers.bulk({
  onSuccess ({ result, document }) {
    console.log(`SUCCESS: Document ${result.index._id} indexed to ${result.index._index}`)
  }
})

flushBytes

达到发送之前批量正文的字节大小。默认为 5MB。
默认值: 5000000

const b = client.helpers.bulk({
  flushBytes: 1000000
})

flushInterval

助手在刷新上次读取文档的正文之前等待的时间(以毫秒为单位)。
默认值: 30000

const b = client.helpers.bulk({
  flushInterval: 30000
})

并发度

同时执行的请求数。
默认值: 5

const b = client.helpers.bulk({
  concurrency: 10
})

重试次数

在调用 onDrop 回调之前重试文档的次数。
默认值:客户端最大重试次数。

const b = client.helpers.bulk({
  retries: 3
})

等待时间

以毫秒为单位,重试前等待的时间。
默认值 5000.

const b = client.helpers.bulk({
  wait: 3000
})

refreshOnCompletion

如果为 true,则在批量操作结束时,它会在所有索引或指定的索引上运行刷新操作。
默认值: false。

const b = client.helpers.bulk({
  refreshOnCompletion: true
  // or
  refreshOnCompletion: 'index-name'
})

支持的操作

编辑
索引
编辑
client.helpers.bulk({
  datasource: myDatasource,
  onDocument (doc) {
    return {
      index: { _index: 'my-index' }
    }
  }
})
创建
编辑
client.helpers.bulk({
  datasource: myDatasource,
  onDocument (doc) {
    return {
      create: { _index: 'my-index', _id: doc.id }
    }
  }
})
更新
编辑
client.helpers.bulk({
  datasource: myDatasource,
  onDocument (doc) {
    // Note that the update operation requires you to return
    // an array, where the first element is the action, while
    // the second are the document option
    return [
      { update: { _index: 'my-index', _id: doc.id } },
      { doc_as_upsert: true }
    ]
  }
})
删除
编辑
client.helpers.bulk({
  datasource: myDatasource,
  onDocument (doc) {
    return {
      delete: { _index: 'my-index', _id: doc.id }
    }
  }
})

中止批量操作

编辑

如有需要,您可以随时中止批量操作。批量助手返回一个 可执行 thenable,它具有一个 abort 方法。

abort 方法会停止批量操作的执行,但是如果您使用的并发度大于 1,则已运行的操作不会停止。

const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

const client = new Client({
  cloud: { id: '<cloud-id>' },
  auth: { apiKey: 'base64EncodedKey' }
})
const b = client.helpers.bulk({
  datasource: createReadStream('./dataset.ndjson').pipe(split()),
  onDocument (doc) {
    return {
      index: { _index: 'my-index' }
    }
  },
  onDrop (doc) {
    b.abort()
  }
})

console.log(await b)

向批量 API 传递自定义选项

编辑

您可以将批量 API 支持的任何选项传递给助手,助手会将这些选项与批量 API 调用一起使用。链接:批量 API

const result = await client.helpers.bulk({
  datasource: [...],
  onDocument (doc) {
    return {
      index: { _index: 'my-index' }
    }
  },
  pipeline: 'my-pipeline'
})

与异步生成器的用法

编辑
const { Client } = require('@elastic/elasticsearch')

async function * generator () {
  const dataset = [
    { user: 'jon', age: 23 },
    { user: 'arya', age: 18 },
    { user: 'tyrion', age: 39 }
  ]
  for (const doc of dataset) {
    yield doc
  }
}

const client = new Client({
  cloud: { id: '<cloud-id>' },
  auth: { apiKey: 'base64EncodedKey' }
})
const result = await client.helpers.bulk({
  datasource: generator(),
  onDocument (doc) {
    return {
      index: { _index: 'my-index' }
    }
  }
})

console.log(result)

在操作前修改文档

编辑

新增于 v8.8.2

如果您需要在将文档发送到 Elasticsearch 之前修改数据源中的文档,则可以在 onDocument 函数中返回一个数组,而不是操作对象。数组中的第一项必须是操作对象,第二项必须是您希望发送到 Elasticsearch 的文档或部分文档对象。

const { Client } = require('@elastic/elasticsearch')

const client = new Client({
  cloud: { id: '<cloud-id>' },
  auth: { apiKey: 'base64EncodedKey' }
})
const result = await client.helpers.bulk({
  datasource: [...],
  onDocument (doc) {
    return [
      { index: { _index: 'my-index' } },
      { ...doc, favorite_color: 'mauve' },
    ]
  }
})

console.log(result)

多搜索助手

编辑

新增于 v7.8.0

如果您以较高的速率发送搜索请求,此助手可能对您有所帮助。它在后台使用多搜索 API 来批量处理请求,并提高应用程序的整体性能。result 也公开了一个 documents 属性,允许您直接访问命中源。

用法

编辑
const { Client } = require('@elastic/elasticsearch')

const client = new Client({
  cloud: { id: '<cloud-id>' },
  auth: { apiKey: 'base64EncodedKey' }
})
const m = client.helpers.msearch()

m.search(
    { index: 'stackoverflow' },
    { query: { match: { title: 'javascript' } } }
  )
  .then(result => console.log(result.body)) // or result.documents
  .catch(err => console.error(err))

要创建多搜索 (msearch) 助手的新的实例,您应该按上述示例中所示进行访问,配置选项如下:

操作

应在单个 msearch 请求中发送多少个搜索操作。
默认值: 5

const m = client.helpers.msearch({
  operations: 10
})

flushInterval

助手在刷新上次读取操作的操作之前等待的时间(以毫秒为单位)。
默认值: 500

const m = client.helpers.msearch({
  flushInterval: 500
})

并发度

同时执行的请求数。
默认值: 5

const m = client.helpers.msearch({
  concurrency: 10
})

重试次数

在解析请求之前重试操作的次数。仅在出现 429 错误时才重试操作。
默认值:客户端最大重试次数。

const m = client.helpers.msearch({
  retries: 3
})

等待时间

以毫秒为单位,重试前等待的时间。
默认值 5000.

const m = client.helpers.msearch({
  wait: 3000
})

停止 msearch 助手

编辑

如有需要,您可以随时停止 msearch 处理器。msearch 助手返回一个 可执行 thenable,它具有一个 stop 方法。

如果您正在创建多个 msearch 助手实例并在有限的时间内使用它们,请记住在完成使用后始终使用 stop 方法,否则您的应用程序将开始泄漏内存。

stop 方法接受一个可选错误,该错误将分派到每个后续搜索请求。

stop 方法会停止 msearch 处理器的执行,但是如果您使用的并发度大于 1,则已运行的操作不会停止。

const { Client } = require('@elastic/elasticsearch')

const client = new Client({
  cloud: { id: '<cloud-id>' },
  auth: { apiKey: 'base64EncodedKey' }
})
const m = client.helpers.msearch()

m.search(
    { index: 'stackoverflow' },
    { query: { match: { title: 'javascript' } } }
  )
  .then(result => console.log(result.body))
  .catch(err => console.error(err))

m.search(
    { index: 'stackoverflow' },
    { query: { match: { title: 'ruby' } } }
  )
  .then(result => console.log(result.body))
  .catch(err => console.error(err))

setImmediate(() => m.stop())

搜索助手

编辑

新增于 v7.7.0

搜索 API 的简单包装器。它不会返回整个 result 对象,而只返回搜索文档源。为了提高性能,此助手会自动将 filter_path=hits.hits._source 添加到查询字符串。

const documents = await client.helpers.search({
  index: 'stackoverflow',
  query: {
    match: {
      title: 'javascript'
    }
  }
})

for (const doc of documents) {
  console.log(doc)
}

滚动搜索助手

编辑

新增于 v7.7.0

此助手提供了一种简单直观的方式来使用滚动搜索 API。调用后,它返回一个 异步迭代器,可与 for-await…​of 结合使用。它会自动处理 429 错误并使用客户端的 maxRetries 选项。

const scrollSearch = client.helpers.scrollSearch({
  index: 'stackoverflow',
  query: {
    match: {
      title: 'javascript'
    }
  }
})

for await (const result of scrollSearch) {
  console.log(result)
}

清除滚动搜索

编辑

如有需要,您可以通过调用 result.clear() 来清除滚动搜索。

for await (const result of scrollSearch) {
  if (condition) {
    await result.clear()
  }
}

快速获取文档

编辑

如果您只需要滚动搜索结果中的文档,则可以通过 result.documents 访问它们。

for await (const result of scrollSearch) {
  console.log(result.documents)
}

滚动文档助手

编辑

新增于 v7.7.0

它的工作方式与滚动搜索助手相同,但它只返回文档。请注意,每个循环周期只返回一个文档,您不能使用 clear 方法。为了提高性能,此助手会自动将 filter_path=hits.hits._source 添加到查询字符串。

const scrollSearch = client.helpers.scrollDocuments({
  index: 'stackoverflow',
  query: {
    match: {
      title: 'javascript'
    }
  }
})

for await (const doc of scrollSearch) {
  console.log(doc)
}

ES|QL 助手

编辑

ES|QL 查询可以以 多种格式 返回其结果。ES|QL 查询返回的默认 JSON 格式包含每行的值数组,列名和类型分别返回。

用法

编辑
toRecords
编辑

新增于 v8.14.0

ES|QL 查询返回的默认 JSON 格式包含每行的值数组,列名和类型分别返回。

{
  "columns": [
    { "name": "@timestamp", "type": "date" },
    { "name": "client_ip", "type": "ip" },
    { "name": "event_duration", "type": "long" },
    { "name": "message", "type": "keyword" }
  ],
  "values": [
    [
      "2023-10-23T12:15:03.360Z",
      "172.21.2.162",
      3450233,
      "Connected to 10.1.0.3"
    ],
    [
      "2023-10-23T12:27:28.948Z",
      "172.21.2.113",
      2764889,
      "Connected to 10.1.0.2"
    ]
  ]
}

在许多情况下,最好对对象的数组进行操作(每行一个对象),而不是对数组的数组进行操作。ES|QL toRecords 助手将行数据转换为对象。

await client.helpers
  .esql({ query: 'FROM sample_data | LIMIT 2' })
  .toRecords()
// =>
// {
//   "columns": [
//     { "name": "@timestamp", "type": "date" },
//     { "name": "client_ip", "type": "ip" },
//     { "name": "event_duration", "type": "long" },
//     { "name": "message", "type": "keyword" }
//   ],
//   "records": [
//     {
//       "@timestamp": "2023-10-23T12:15:03.360Z",
//       "client_ip": "172.21.2.162",
//       "event_duration": 3450233,
//       "message": "Connected to 10.1.0.3"
//     },
//     {
//       "@timestamp": "2023-10-23T12:27:28.948Z",
//       "client_ip": "172.21.2.113",
//       "event_duration": 2764889,
//       "message": "Connected to 10.1.0.2"
//     },
//   ]
// }

在 TypeScript 中,您可以声明 toRecords 返回的类型。

type EventLog = {
  '@timestamp': string,
  client_ip: string,
  event_duration: number,
  message: string,
}

const result = await client.helpers
  .esql({ query: 'FROM sample_data | LIMIT 2' })
  .toRecords<EventLog>()
toArrowReader
编辑

新增于 v8.16.0

ES|QL 可以以多种二进制格式返回结果,包括 Apache Arrow 的流式格式。由于它是一种非常高效的读取格式,因此它对于执行高性能内存中分析非常有价值。而且,由于响应以记录批次的流式传输,因此它可用于生成聚合和其他计算,这些计算的数据集大于内存。

toArrowReader 返回一个 RecordBatchStreamReader

const reader = await client.helpers
  .esql({ query: 'FROM sample_data' })
  .toArrowReader()

// print each record as JSON
for (const recordBatch of reader) {
  for (const record of recordBatch) {
    console.log(record.toJSON())
  }
}
toArrowTable
编辑

新增于 v8.16.0

如果您想以 Arrow 格式提取整个数据集,但不进行流式传输,则可以使用 toArrowTable 助手来获取 Table

const table = await client.helpers
  .esql({ query: 'FROM sample_data' })
  .toArrowTable()

console.log(table.toArray())