使用 Node.js 和 Azure 函数应用自动更新 Elasticsearch 索引

了解如何使用 Node.js 和 Azure 函数应用自动更新 Elasticsearch 索引。按照以下步骤确保您的索引保持最新。

维护一个最新的 Elasticsearch 索引至关重要,尤其是在处理频繁变化的动态数据集时。这篇博文将指导您如何使用 Node.js 和 Azure 函数应用自动更新 Elasticsearch 索引。

首先,我们将使用 Node.js 加载数据并确保其通过定期更新保持最新。然后,我们将利用 Azure 函数应用 的功能来自动执行这些更新,从而确保您的索引始终保持新鲜和可靠。

对于这篇博文,我们将使用 近地天体网络服务 (NeoWs),这是一种 RESTful 网络服务,提供有关近地小行星的详细信息。通过将 NeoWs 与集成到 Azure 无服务器函数中的 Node.js 服务集成,此示例将为您提供一个强大的框架来有效地处理管理动态数据的复杂性。这种方法将帮助您最大程度地降低使用过时信息带来的风险,并最大程度地提高数据的准确性和实用性。

先决条件

本地设置

在开始在本地索引和加载数据之前,设置您的环境至关重要。首先,创建一个目录并对其进行初始化。然后,下载必要的软件包并创建一个 .env 文件来存储您的配置设置。此初步设置可确保您的本地环境已准备好有效地处理数据。

mkdir Introduction-to-Data-Loading-in-Elasticsearch-with-Nodejs
cd Introduction-to-Data-Loading-in-Elasticsearch-with-Nodejs
npm init

您将使用 Elasticsearch 节点客户端 连接到 Elastic,使用 Axios 连接到 NASA API,以及使用 dotenv 解析您的密钥。您需要下载运行以下命令所需的软件包

npm install @elastic/elasticsearch axios dotenv

下载所需的软件包后,您可以在项目目录的根目录下创建一个 .env 文件。.env 文件允许您在本地安全地保存您的凭据。查看 .env 文件示例 以了解更多信息。要了解有关连接到 Elasticsearch 的更多信息,请务必查看 相关文档

要创建 .env 文件,您可以在项目根目录下使用以下命令

touch .env

在您的 .env 中,确保已输入以下内容。请务必添加您的完整端点

ELASTICSEARCH_ENDPOINT="https://...."
ELASTICSEARCH_API_KEY="YOUR_ELASTICSEARCh_API_KEY"
NASA_API_KEY="YOUR_NASA_API_KEY"

您还需要创建一个新的 JavaScript 文件

touch loading_data_into_a_index.js

创建索引并将数据加载到其中

现在您已经设置了正确的文件结构并下载了所需的软件包,您就可以创建一个脚本,该脚本创建索引并将数据加载到索引中。如果您在此过程中遇到问题,请务必查看 此部分中创建的文件的完整版本

在文件 loading_data_into_a_index.js 中,配置 dotenv 软件包以使用存储在 .env 文件中的密钥和令牌。您还应该导入 Elasticsearch 客户端 以连接到 Elasticsearch 和 Axios 并发出 HTTP 请求。

require('dotenv').config();

const { Client } = require('@elastic/elasticsearch');
const axios = require('axios');

由于您的密钥和令牌当前存储为环境变量,因此您需要检索它们并创建一个客户端以对 Elasticsearch 进行身份验证。

const elasticsearchEndpoint = process.env.ELASTICSEARCH_ENDPOINT;
const elasticsearchApiKey = process.env.ELASTICSEARCH_API_KEY;
const nasaApiKey = process.env.NASA_API_KEY;

const client = new Client({
  node: elasticsearchEndpoint,
  auth: {
    apiKey: elasticsearchApiKey
  }
});

您可以开发一个函数来异步地从 NASA 的 NEO(近地天体)网络服务检索数据。您将首先为 NASA API 请求配置基本 URL,并为今天和上周创建日期对象以建立查询期间。在您以 API 请求所需的 YYYY-MM-DD 格式格式化这些日期后,将日期设置为查询参数并对 NASA API 执行 GET 请求。此外,该函数还包括错误处理机制,以帮助在出现任何问题时进行调试。

async function fetchNasaData() {
  const url = "https://api.nasa.gov/neo/rest/v1/feed";
  const today = new Date();
  const lastWeek = new Date(today);
  lastWeek.setDate(today.getDate() - 7);

  const startDate = lastWeek.toISOString().split('T')[0];
  const endDate = today.toISOString().split('T')[0];
  const params = {
    api_key: nasaApiKey,
    start_date: startDate,
    end_date: endDate,
  };

  try {
    const response = await axios.get(url, { params });
    return response.data;
  } catch (error) {
    console.error('Error fetching data from NASA:', error);
    return null;
  }
}

现在,您可以创建一个函数将来自 NASA API 的原始数据转换为结构化格式。由于您获取的数据目前嵌套在一个复杂的 JSON 响应中。更简单的对象数组使处理数据更容易。

function createStructuredData(response) {
  const allObjects = [];
  const nearEarthObjects = response.near_earth_objects;

  Object.keys(nearEarthObjects).forEach(date => {
    nearEarthObjects[date].forEach(obj => {
      const simplifiedObject = {
        close_approach_date: date,
        name: obj.name,
        id: obj.id,
        miss_distance_km: obj.close_approach_data.length > 0 ? obj.close_approach_data[0].miss_distance.kilometers : null,
      };

      allObjects.push(simplifiedObject);
    });
  });

  return allObjects;
}

您需要创建一个索引来存储来自 API 的数据。Elasticsearch 中的 索引 是您可以在其中存储文档中的数据的地方。在此函数中,您将检查索引是否存在,如果不存在则创建一个新索引。您还将为索引指定正确的 字段映射。此函数还将数据作为文档加载到索引中,并将 NASA 数据中的 id 字段映射到 Elasticsearch 中的 _id 字段。

async function indexDataIntoElasticsearch(data) {
  const indexExists = await client.indices.exists({ index: 'nasa-node-js' });
  if (!indexExists.body) {
    await client.indices.create({
      index: 'nasa-node-js',
      body: {
        mappings: {
          properties: {
            close_approach_date: { type: 'date' },
            name: { type: 'text' },
            miss_distance_km: { type: 'float' },
          },
        },
      },
    });
  }

  const body = data.flatMap(doc => [{ index: { _index: 'nasa-node-js', _id: doc.id } }, doc]);
  await client.bulk({ refresh: false, body });
}

您需要创建一个主函数来获取、构造和索引数据。此函数还将打印出正在上传的记录数量,并记录数据是否已建立索引,是否没有要索引的数据,或者是否无法从 NASA API 获取数据。创建 run 函数后,您需要调用该函数并捕获可能出现的任何错误。

async function run() {
  const rawData = await fetchNasaData();
  if (rawData) {
    const structuredData = createStructuredData(rawData);
    console.log(`Number of records being uploaded: ${structuredData.length}`);
    if (structuredData.length > 0) {
      await indexDataIntoElasticsearch(structuredData);
      console.log('Data indexed successfully.');
    } else {
      console.log('No data to index.');
    }
  } else {
    console.log('Failed to fetch data from NASA.');
  }
}

run().catch(console.error);

您现在可以通过运行以下命令从命令行运行该文件

node loading_data_into_a_index.js

要确认您的索引已成功加载,您可以通过执行以下 API 调用在 Elastic Dev Tools 中进行检查

GET /nasa-node-js/_search

使用 Azure 函数应用保持索引更新

现在您已成功将数据加载到本地索引中,这些数据可能会很快过时。为了确保您的信息保持最新,您可以设置一个 Azure 函数应用,以便每天自动获取新数据并将其上传到您的 Elasticsearch 索引。

第一步是在 Azure 门户中配置您的函数应用。一个有用的入门资源是 Azure 快速入门指南

设置好函数后,您可以确保已为 ELASTICSEARCH_ENDPOINTELASTICSEARCH_API_KEYNASA_API_KEY 设置了环境变量。在函数应用中,环境变量称为应用程序设置。在您的函数应用中,单击左侧面板“设置”下的“配置”选项。在“应用程序设置”选项卡下,单击“+ 新建应用程序设置”。

您还需要确保已安装所需的库。如果转到 Azure 门户上的终端,则可以通过输入以下内容安装必要的软件包

npm install @elastic/elasticsearch axios

您要安装的软件包应该与之前的安装非常相似,除了您将使用 moment 解析日期,并且您不再需要加载 env 文件,因为您刚刚将密钥设置为应用程序设置。

您可以点击“创建”以在函数应用中创建一个新函数,选择名为“定时器触发器”的模板。现在,您将拥有一个名为 function.json 的文件供您设置。您需要将其调整为如下所示,以便每天上午 10 点运行此应用程序。

{
    "bindings": [
      {
        "name": "myTimer",
        "type": "timerTrigger",
        "direction": "in",
        "schedule": "0 0 10 * * *"
      }
    ]
  }

您还需要上传您的 package.json 文件并确保其显示如下

{
  "name": "introduction-to-data-loading-in-elasticsearch-with-nodejs",
  "version": "1.0.0",
  "description": "A simple script for loading data in Elasticsearch",
  "main": "loading_data_into_a_index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "repository": {
    "type": "git",
    "url": "git+https://github.com/JessicaGarson/Introduction-to-Data-Loading-in-Elasticsearch-with-Nodejs.git"
  },
  "author": "Jessica Garson",
  "license": "Apache-2.0",
  "bugs": {
    "url": "https://github.com/JessicaGarson/Introduction-to-Data-Loading-in-Elasticsearch-with-Nodejs/issues"
  },
  "homepage": "https://github.com/JessicaGarson/Introduction-to-Data-Loading-in-Elasticsearch-with-Nodejs#readme",
  "dependencies": {
    "@elastic/elasticsearch": "^8.12.0",
    "axios": "^0.21.1"
  }
}

下一步是创建一个index.js文件。此脚本旨在每天自动更新数据。它通过每天系统地获取和解析新数据,然后无缝更新数据集来实现此目的。Elasticsearch 可以使用相同的方法来摄取时间序列或不可变数据,例如 webhook 响应。此方法确保信息保持最新和准确,反映最新可用数据。您还可以查看完整代码

您在本地运行的脚本与此脚本之间主要区别如下

  • 您将不再需要加载.env文件,因为您已经设置了环境变量
  • 还有一些不同的日志记录,旨在创建更可持续的脚本
  • 您根据最新的close approach date保持索引更新
  • 有一个 Azure 函数应用的入口点

首先,您需要按照如下步骤设置库并向 Elasticsearch 进行身份验证

const elasticsearchEndpoint = process.env.ELASTICSEARCH_ENDPOINT;
const elasticsearchApiKey = process.env.ELASTICSEARCH_API_KEY;
const nasaApiKey = process.env.NASA_API_KEY;

const client = new Client({
 node: elasticsearchEndpoint,
 auth: {
   apiKey: elasticsearchApiKey
 }
});

之后,您需要从 Elasticsearch 获取上次更新日期,并配置一个备份方法,以便在出现任何问题时获取过去一天的数据。

async function getLastUpdateDate() {
  try {
    const response = await client.search({
      index: 'nasa-node-js',
      body: {
        size: 1,
        sort: [{ close_approach_date: { order: 'desc' } }],
        _source: ['close_approach_date']
      }
    });

    if (response.body && response.body.hits && response.body.hits.hits.length > 0) {
      return response.body.hits.hits[0]._source.close_approach_date;
    } else {
      // Default to one day ago if no records found
      const today = new Date();
      const lastWeek = new Date(today);
      lastWeek.setDate(today.getDate() - 1);
      return lastWeek.toISOString().split('T')[0];
    }
  } catch (error) {
    console.error('Error fetching last update date from Elasticsearch:', error);
    throw error;
  }
}

以下函数连接到 NASA 的 NEO(近地天体)Web 服务以获取数据,以保持索引更新。还有一些额外的错误处理,可以捕获可能出现的任何 API 错误。

async function fetchNasaData(startDate) {

  const url = "https://api.nasa.gov/neo/rest/v1/feed";
  const today = new Date();

  const endDate = today.toISOString().split('T')[0];

  const params = {
    api_key: nasaApiKey,
    start_date: startDate,
    end_date: endDate,
  };

  try {
    // Perform the GET request to the NASA API with query parameters
    const response = await axios.get(url, { params });
    return response.data;
  } catch (error) {
    // Log any errors encountered during the request
    console.error('Error fetching data from NASA:', error);
    return null;
  }
}

现在,您需要创建一个函数,通过迭代每个日期的对象来组织数据。

function createStructuredData(response) {
  const allObjects = [];
  const nearEarthObjects = response.near_earth_objects;

  Object.keys(nearEarthObjects).forEach(date => {
    nearEarthObjects[date].forEach(obj => {
      const simplifiedObject = {
        close_approach_date: date,
        name: obj.name,
        id: obj.id,
        miss_distance_km: obj.close_approach_data.length > 0 ? obj.close_approach_data[0].miss_distance.kilometers : null,
      };

      allObjects.push(simplifiedObject);
    });
  });

  return allObjects;
}

现在,您需要使用批量索引操作将数据加载到 Elasticsearch 中。此函数应类似于上一节中的函数。

async function indexDataIntoElasticsearch(data) {
  const body = data.flatMap(doc => [{ index: { _index: 'nasa-node-js', _id: doc.id } }, doc]);
  await client.bulk({ refresh: false, body });
}

最后,您需要为函数创建一个入口点,该函数将根据您设置的计时器运行。此函数类似于主函数,因为它调用了之前在文件中创建的函数。还有一些额外的日志记录,例如打印记录数并告知您数据是否已正确索引。

module.exports = async function (context, myTimer) {
  try {
    const lastUpdateDate = await getLastUpdateDate();
    context.log(`Last update date from Elasticsearch: ${lastUpdateDate}`);

    const rawData = await fetchNasaData(lastUpdateDate);
    if (rawData) {
      const structuredData = createStructuredData(rawData);
      context.log(`Number of records being uploaded: ${structuredData.length}`);
      
      if (structuredData.length > 0) {

        const flatFileData = JSON.stringify(structuredData, null, 2);
        context.log('Flat file data:', flatFileData);

        await indexDataIntoElasticsearch(structuredData);
        context.log('Data indexed successfully.');
      } else {
        context.log('No data to index.');
      }
    } else {
      context.log('Failed to fetch data from NASA.');
    }
  } catch (error) {
    context.log('Error in run process:', error);
  }

结论

使用 Node.js 和Azure 的函数应用,您应该能够确保 Elasticsearch 索引定期更新。通过结合使用 Node.js 的功能和 Azure 的函数应用,您可以有效地维护索引的定期更新。这种强大的组合提供了一个简化的自动化流程,减少了维护索引定期更新所需的人工工作量。此示例的完整代码可以在Search Labs GitHub上找到。如果您基于此博客构建了任何内容,或者您在我们的论坛社区 Slack 频道上有任何问题,请告知我们。

Elasticsearch 拥有众多新功能,可帮助您为您的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在您的本地机器上试用 Elastic。

想获得 Elastic 认证?了解下次Elasticsearch 工程师培训何时举行!

准备好构建最先进的搜索体验了吗?

足够先进的搜索并非一蹴而就。Elasticsearch 由数据科学家、ML ops、工程师等等许多对搜索充满热情的人提供支持,就像您一样。让我们联系起来,共同努力构建神奇的搜索体验,让您获得所需的结果。

亲自尝试一下