滚动

编辑

虽然搜索请求返回单“页”结果,但可以使用滚动 API 从单个搜索请求中检索大量结果(甚至所有结果),这与在传统数据库中使用游标的方式非常相似。

滚动不适用于实时用户请求,而是用于处理大量数据,例如,为了将一个索引的内容重新索引到具有不同配置的新索引中。

从滚动请求返回的结果反映了初始搜索请求发出时索引的状态,就像时间快照一样。后续对文档的更改(索引、更新或删除)只会影响以后的搜索请求。

为了使用滚动,初始搜索请求应在查询字符串中指定 scroll 参数,该参数告诉 Elasticsearch 应将“搜索上下文”保持活动状态多长时间。

您是否知道我们提供了一个用于发送滚动请求的助手?您可以在此处找到它。

'use strict'

const { Client } = require('@elastic/elasticsearch')
const client = new Client({
  cloud: { id: '<cloud-id>' },
  auth: { apiKey: 'base64EncodedKey' }
})

async function run () {
  const allQuotes = []
  const responseQueue = []

  // Let's index some data!
  const bulkResponse = await client.bulk({
    // here we are forcing an index refresh,
    // otherwise we will not get any result
    // in the consequent search
    refresh: true,
    operations: [
      // operation to perform
      { index: { _index: 'game-of-thrones' } },
      // the document to index
      {
        character: 'Ned Stark',
        quote: 'Winter is coming.'
      },

      { index: { _index: 'game-of-thrones' } },
      {
        character: 'Daenerys Targaryen',
        quote: 'I am the blood of the dragon.'
      },

      { index: { _index: 'game-of-thrones' } },
      {
        character: 'Tyrion Lannister',
        quote: 'A mind needs books like a sword needs a whetstone.'
      }
    ]
  })

  if (bulkResponse.errors) {
    console.log(bulkResponse)
    process.exit(1)
  }

  // start things off by searching, setting a scroll timeout, and pushing
  // our first response into the queue to be processed
  const response = await client.search({
    index: 'game-of-thrones',
    // keep the search results "scrollable" for 30 seconds
    scroll: '30s',
    // for the sake of this example, we will get only one result per search
    size: 1,
    // filter the source to only include the quote field
    _source: ['quote'],
    query: {
      match_all: {}
    }
  })

  responseQueue.push(response)

  while (responseQueue.length) {
    const body = responseQueue.shift()

    // collect the titles from this response
    body.hits.hits.forEach(function (hit) {
      allQuotes.push(hit._source.quote)
    })

    // check to see if we have collected all of the quotes
    if (body.hits.total.value === allQuotes.length) {
      console.log('Every quote', allQuotes)
      break
    }

    // get the next response if there are more quotes to fetch
    responseQueue.push(
      await client.scroll({
        scroll_id: body._scroll_id,
        scroll: '30s'
      })
    )
  }
}

run().catch(console.log)

通过使用异步迭代,可以使用 Node.js ≥ 10 完成 scroll API 的另一个很酷的用法!

'use strict'

const { Client } = require('@elastic/elasticsearch')
const client = new Client({
  cloud: { id: '<cloud-id>' },
  auth: { apiKey: 'base64EncodedKey' }
})

// Scroll utility
async function * scrollSearch (params) {
  let response = await client.search(params)

  while (true) {
    const sourceHits = response.hits.hits

    if (sourceHits.length === 0) {
      break
    }

    for (const hit of sourceHits) {
      yield hit
    }

    if (!response._scroll_id) {
      break
    }

    response = await client.scroll({
      scroll_id: response._scroll_id,
      scroll: params.scroll
    })
  }
}

async function run () {
  await client.bulk({
    refresh: true,
    operations: [
      { index: { _index: 'game-of-thrones' } },
      {
        character: 'Ned Stark',
        quote: 'Winter is coming.'
      },

      { index: { _index: 'game-of-thrones' } },
      {
        character: 'Daenerys Targaryen',
        quote: 'I am the blood of the dragon.'
      },

      { index: { _index: 'game-of-thrones' } },
      {
        character: 'Tyrion Lannister',
        quote: 'A mind needs books like a sword needs a whetstone.'
      }
    ]
  })

  const params = {
    index: 'game-of-thrones',
    scroll: '30s',
    size: 1,
    _source: ['quote'],
    query: {
      match_all: {}
    }
  }

  for await (const hit of scrollSearch(params)) {
    console.log(hit._source)
  }
}

run().catch(console.log)