滚动
编辑滚动
编辑虽然搜索请求返回单“页”结果,但可以使用滚动 API 从单个搜索请求中检索大量结果(甚至所有结果),这与在传统数据库中使用游标的方式非常相似。
滚动不适用于实时用户请求,而是用于处理大量数据,例如,为了将一个索引的内容重新索引到具有不同配置的新索引中。
从滚动请求返回的结果反映了初始搜索请求发出时索引的状态,就像时间快照一样。后续对文档的更改(索引、更新或删除)只会影响以后的搜索请求。
为了使用滚动,初始搜索请求应在查询字符串中指定 scroll 参数,该参数告诉 Elasticsearch 应将“搜索上下文”保持活动状态多长时间。
您是否知道我们提供了一个用于发送滚动请求的助手?您可以在此处找到它。
'use strict' const { Client } = require('@elastic/elasticsearch') const client = new Client({ cloud: { id: '<cloud-id>' }, auth: { apiKey: 'base64EncodedKey' } }) async function run () { const allQuotes = [] const responseQueue = [] // Let's index some data! const bulkResponse = await client.bulk({ // here we are forcing an index refresh, // otherwise we will not get any result // in the consequent search refresh: true, operations: [ // operation to perform { index: { _index: 'game-of-thrones' } }, // the document to index { character: 'Ned Stark', quote: 'Winter is coming.' }, { index: { _index: 'game-of-thrones' } }, { character: 'Daenerys Targaryen', quote: 'I am the blood of the dragon.' }, { index: { _index: 'game-of-thrones' } }, { character: 'Tyrion Lannister', quote: 'A mind needs books like a sword needs a whetstone.' } ] }) if (bulkResponse.errors) { console.log(bulkResponse) process.exit(1) } // start things off by searching, setting a scroll timeout, and pushing // our first response into the queue to be processed const response = await client.search({ index: 'game-of-thrones', // keep the search results "scrollable" for 30 seconds scroll: '30s', // for the sake of this example, we will get only one result per search size: 1, // filter the source to only include the quote field _source: ['quote'], query: { match_all: {} } }) responseQueue.push(response) while (responseQueue.length) { const body = responseQueue.shift() // collect the titles from this response body.hits.hits.forEach(function (hit) { allQuotes.push(hit._source.quote) }) // check to see if we have collected all of the quotes if (body.hits.total.value === allQuotes.length) { console.log('Every quote', allQuotes) break } // get the next response if there are more quotes to fetch responseQueue.push( await client.scroll({ scroll_id: body._scroll_id, scroll: '30s' }) ) } } run().catch(console.log)
通过使用异步迭代,可以使用 Node.js ≥ 10 完成 scroll
API 的另一个很酷的用法!
'use strict' const { Client } = require('@elastic/elasticsearch') const client = new Client({ cloud: { id: '<cloud-id>' }, auth: { apiKey: 'base64EncodedKey' } }) // Scroll utility async function * scrollSearch (params) { let response = await client.search(params) while (true) { const sourceHits = response.hits.hits if (sourceHits.length === 0) { break } for (const hit of sourceHits) { yield hit } if (!response._scroll_id) { break } response = await client.scroll({ scroll_id: response._scroll_id, scroll: params.scroll }) } } async function run () { await client.bulk({ refresh: true, operations: [ { index: { _index: 'game-of-thrones' } }, { character: 'Ned Stark', quote: 'Winter is coming.' }, { index: { _index: 'game-of-thrones' } }, { character: 'Daenerys Targaryen', quote: 'I am the blood of the dragon.' }, { index: { _index: 'game-of-thrones' } }, { character: 'Tyrion Lannister', quote: 'A mind needs books like a sword needs a whetstone.' } ] }) const params = { index: 'game-of-thrones', scroll: '30s', size: 1, _source: ['quote'], query: { match_all: {} } } for await (const hit of scrollSearch(params)) { console.log(hit._source) } } run().catch(console.log)