背景
Elasticsearch 中的索引是您可以在其中以文档形式存储数据的存储位置。在使用索引时,如果您正在处理动态数据集,则数据可能会很快过时。为了避免此问题,您可以创建一个 Python 脚本以更新您的索引,并使用Google Cloud Platform (GCP) 的Cloud Functions和Cloud Scheduler部署它,以便自动保持索引最新。
为了保持索引最新,您可以首先设置一个 Jupyter Notebook 以在本地进行测试,并创建一个脚本框架,如果存在新信息,该脚本将更新您的索引。您可以调整您的脚本使其更具可重用性,并将其作为 Cloud Function 运行。使用 Cloud Scheduler,您可以使用 cron 类型格式设置 Cloud Function 中的代码以按计划运行。
自动更新索引的先决条件
- 此示例使用 Elasticsearch 版本 8.12;如果您是新手,请查看我们在Elasticsearch上的快速入门指南。
- 如果您尚未在计算机上安装 Python,请下载最新版本。此示例使用 Python 3.12.1。
- NASA API 的 API 密钥。
- 您将使用Requests包连接到 NASA API,使用Pandas操作数据,使用Elasticsearch Python 客户端将数据加载到索引中并保持其最新状态,以及使用Jupyter Notebooks在测试时以交互方式处理您的数据。您可以运行以下行以安装这些必需的包
pip3 install requests pandas elasticsearch notebook
加载和更新您的数据集
在您可以在 GCP 中运行更新脚本之前,您需要上传您的数据并测试您将用于使脚本保持更新的过程。您将首先连接到 API 的数据,将其保存为 Pandas DataFrame,连接到 Elasticsearch,将 DataFrame 上传到索引,检查索引上次更新的时间,并在有新数据可用时更新它。您可以在此搜索实验室笔记本中找到此部分的完整代码。
加载您的数据
让我们开始使用 Jupyter Notebook 在本地进行测试,以交互方式处理您的数据。为此,您可以在终端中运行以下命令。
jupyter notebook
在右上角,您可以选择显示“新建”的位置以创建一个新的 Jupyter Notebook。
首先,您需要导入您将要使用的包。您将导入之前安装的所有包,以及getpass
以处理 API 密钥等机密信息,以及datetime
以处理日期对象。
import requests
from getpass import getpass
import pandas as pd
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch, helpers
您将使用的数据集是近地天体网络服务 (NeoWs),这是一种 RESTful 网络服务,提供近地小行星信息。此数据集允许您根据小行星与地球最近的接近日期搜索小行星,查找特定的小行星,以及浏览整个数据集。
使用以下函数,您可以连接到 NASA 的 NeoWs API,获取过去一周的数据,并将您的响应转换为 JSON 对象。
def connect_to_nasa():
url = "https://api.nasa.gov/neo/rest/v1/feed"
nasa_api_key = getpass("NASA API Key: ")
today = datetime.now()
params = {
"api_key": nasa_api_key,
"start_date": today - timedelta(days=7),
"end_date": datetime.now(),
}
return requests.get(url, params).json()
现在,您可以将 API 调用的结果保存到名为 response 的变量中。
response = connect_to_nasa()
要将 JSON 对象转换为 pandas DataFrame,您必须将嵌套对象规范化为一个 DataFrame 并删除包含嵌套 JSON 的列。
def create_df(response):
all_objects = []
for date, objects in response["near_earth_objects"].items():
for obj in objects:
obj["close_approach_date"] = date
all_objects.append(obj)
df = pd.json_normalize(all_objects)
return df.drop("close_approach_data", axis=1)
要调用此函数并查看数据集的前五行,您可以运行以下命令
df = create_df(response)
df.head()
连接到 Elasticsearch
您可以通过提供您的 Elastic Cloud ID 和 API 密钥进行身份验证,从而从 Python 客户端访问 Elasticsearch。
def connect_to_elastic():
elastic_cloud_id = getpass("Elastic Cloud ID: ")
elastic_api_key = getpass("Elastic API Key: ")
return Elasticsearch(cloud_id=elastic_cloud_id, api_key=elastic_api_key)
现在,您可以将连接函数的结果保存到名为es
的变量中。
es = connect_to_elastic()
Elasticsearch 中的索引是数据的存储容器。您可以将索引命名为asteroid_data_set
。
index_name = "asteroid_data_set"
es.indices.create(index=index_name)
您获得的结果将如下所示
ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'asteroids_data'})
现在,您可以创建一个辅助函数,允许您将 DataFrame 转换为正确的格式以上传到您的索引中。
def doc_generator(df, index_name):
for index, document in df.iterrows():
yield {
"_index": index_name,
"_id": f"{document['id']}",
"_source": document.to_dict(),
}
接下来,您可以将 DataFrame 的内容批量上传到 Elastic,并调用您刚刚创建的辅助函数。
helpers.bulk(es, doc_generator(df, index_name))
您应该会得到一个类似于以下结果的结果,它会告诉您已上传的行数
(146, [])
您上次更新数据是什么时候?
将数据上传到 Elastic 后,您可以检查索引上次更新的时间,并设置日期格式,以便它可以与 NASA API 一起使用。
def updated_last(es, index_name):
query = {
"size": 0,
"aggs": {"last_date": {"max": {"field": "close_approach_date"}}},
}
response = es.search(index=index_name, body=query)
last_updated_date_string = response["aggregations"]["last_date"]["value_as_string"]
datetime_obj = datetime.strptime(last_updated_date_string, "%Y-%m-%dT%H:%M:%S.%fZ")
return datetime_obj.strftime("%Y-%m-%d")
您可以将索引上次更新的日期保存到一个变量中并打印出日期。
last_update_date = updated_last(es, index_name)
print(last_update_date)
更新您的数据
现在,您可以创建一个函数来检查自索引上次更新以来是否存在任何新数据以及当前日期。如果对象有效且数据不为空,它将更新索引并让您知道是否没有新数据要更新,或者 DataFrame 是否返回类型为None
,这表示可能存在问题。
def update_new_data(df, es, last_update_date, index_name):
if isinstance(last_update_date, str):
last_update_date = datetime.strptime(last_update_date, "%Y-%m-%d")
last_update_date = pd.Timestamp(last_update_date).normalize()
if not df.empty and "close_approach_date" in df.columns:
df["close_approach_date"] = pd.to_datetime(df["close_approach_date"])
today = pd.Timestamp(datetime.now().date()).normalize()
if df is not None and not df.empty:
update_range = df.loc[
(df["close_approach_date"] > last_update_date)
& (df["close_approach_date"] < today)
]
if not update_range.empty:
helpers.bulk(es, doc_generator(update_range, index_name))
else:
print("No new data to update.")
else:
print("The DataFrame is None.")
如果 DataFrame 是一个有效对象,它将调用您编写的函数,并在适用时更新索引。它还将打印出索引上次更新的日期,以帮助您在需要时进行调试。否则,它将告诉您可能存在问题。
try:
if df is None:
raise ValueError("DataFrame is None. There may be a problem.")
update_new_data(df, es, last_update_date, index_name)
print(updated_last(es, index_name))
except Exception as e:
print(f"An error occurred: {e}")
保持索引最新
现在您已为本地测试创建了一个框架,您就可以设置一个环境,以便您可以每天运行您的脚本以检查是否有任何新数据可用并相应地更新您的索引。
创建 Cloud Function
现在,您可以部署您的 Cloud Function。为此,您需要将环境选择为第 2 代函数,命名您的函数,并选择一个云区域。您还可以将其绑定到 Cloud Pub/Sub 触发器,并选择创建一个新主题(如果您尚未创建)。您可以在GitHub 上查看此部分的完整代码。
创建 Pub/Sub 主题
创建新主题时,您可以命名您的主题 ID 并选择使用 Google 托管的加密密钥进行加密。
设置 Cloud Function 的环境变量
在显示“运行时环境变量”的位置,您可以添加NASA_API_KEY
、ELASTIC_CLOUD_ID
和ELASTIC_API_KEY
的环境变量。您需要将其保存为原始值,周围没有单引号。因此,如果您之前在终端中输入了值'xxxxlsdgzxxxxx'
,则您需要将其设置为xxxxlsdgzxxxxx
。
调整您的代码并将其添加到 Cloud Function
输入环境变量后,您可以按下“下一步”按钮,这将带您进入代码编辑器。您需要选择 Python 3.12.1 的运行时,或匹配您正在使用的 Python 版本。之后,将入口点更新为update_index
。入口点在 Python 中的作用类似于主函数。
您需要使用os
而不是使用getpass
来检索机密信息,以执行更自动化的过程。示例如下所示
elastic_cloud_id = os.getenv("ELASTIC_CLOUD_ID")
elastic_api_key = os.getenv("ELASTIC_API_KEY")
您需要调整脚本的顺序,以便首先执行连接到 Elasticsearch 的函数。之后,您需要知道索引上次更新的时间,连接到您正在使用的 NASA API,将其保存到 DataFrame,并加载可能可用的任何新数据。
您可能会注意到底部有一个名为update_index
的新函数,它将您的代码连接在一起。在此函数中,您定义索引的名称,连接到 Elastic,找出索引上次更新的日期,连接到 NASA API,将结果保存到数据帧中,并在需要时更新索引。为了表明入口点函数是云事件,您可以使用装饰器@functions_framework.cloud_event
来表示。
@functions_framework.cloud_event
def update_index(cloud_event):
index_name = "asteroid_data_set"
es = connect_to_elastic()
last_update_date = updated_last(es, index_name)
print(last_update_date)
response = connect_to_nasa(last_update_date)
df = create_df(response)
if df is not None:
update_new_data(df, es, last_update_date, index_name)
print(updated_last(es, index_name))
else:
print("No new data was retrieved.")
这是完整的更新代码示例
import functions_framework
import requests
import os
import pandas as pd
from datetime import datetime
from elasticsearch import Elasticsearch, helpers
def connect_to_elastic():
elastic_cloud_id = os.getenv("ELASTIC_CLOUD_ID")
elastic_api_key = os.getenv("ELASTIC_API_KEY")
return Elasticsearch(cloud_id=elastic_cloud_id, api_key=elastic_api_key)
def connect_to_nasa(last_update_date):
url = "https://api.nasa.gov/neo/rest/v1/feed"
nasa_api_key = os.getenv("NASA_API_KEY")
params = {
"api_key": nasa_api_key,
"start_date": last_update_date,
"end_date": datetime.now(),
}
return requests.get(url, params).json()
def create_df(response):
all_objects = []
for date, objects in response["near_earth_objects"].items():
for obj in objects:
obj["close_approach_date"] = date
all_objects.append(obj)
df = pd.json_normalize(all_objects)
return df.drop("close_approach_data", axis=1)
def doc_generator(df, index_name):
for index, document in df.iterrows():
yield {
"_index": index_name,
"_id": f"{document['close_approach_date']}",
"_source": document.to_dict(),
}
def updated_last(es, index_name):
query = {
"size": 0,
"aggs": {"last_date": {"max": {"field": "close_approach_date"}}},
}
response = es.search(index=index_name, body=query)
last_updated_date_string = response["aggregations"]["last_date"]["value_as_string"]
datetime_obj = datetime.strptime(last_updated_date_string, "%Y-%m-%dT%H:%M:%S.%fZ")
return datetime_obj.strftime("%Y-%m-%d")
def update_new_data(df, es, last_update_date, index_name):
if isinstance(last_update_date, str):
last_update_date = datetime.strptime(last_update_date, "%Y-%m-%d")
last_update_date = pd.Timestamp(last_update_date).normalize()
if not df.empty and "close_approach_date" in df.columns:
df["close_approach_date"] = pd.to_datetime(df["close_approach_date"])
today = pd.Timestamp(datetime.now().date()).normalize()
if df is not None and not df.empty:
update_range = df.loc[
(df["close_approach_date"] > last_update_date)
& (df["close_approach_date"] < today)
]
print(update_range)
if not update_range.empty:
helpers.bulk(es, doc_generator(update_range, index_name))
else:
print("No new data to update.")
else:
print("The DataFrame is empty or None.")
# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def hello_pubsub(cloud_event):
index_name = "asteroid_data_set"
es = connect_to_elastic()
last_update_date = updated_last(es, index_name)
print(last_update_date)
response = connect_to_nasa(last_update_date)
df = create_df(response)
try:
if df is None:
raise ValueError("DataFrame is None. There may be a problem.")
update_new_data(df, es, last_update_date, index_name)
print(updated_last(es, index_name))
except Exception as e:
print(f"An error occurred: {e}")
添加 requirements.txt 文件
您还需要定义一个requirements.txt
文件,其中包含运行代码所需的所有指定包。
functions-framework==3.*
requests==2.31.0
elasticsearch==8.12.0
pandas==2.1.4
计划您的 Cloud Function
在 Cloud Scheduler 中,您可以使用 Unix cron 格式设置函数以定期运行。我的代码设置为每天早上 8 点在我的时区运行。
您还需要配置执行以连接到之前创建的 Pub/Sub 主题。我目前将消息正文设置为“hello”。
现在您已经设置了 Pub/Sub 主题和 Cloud Function,并设置了 Cloud Function 按计划运行,您的索引应在出现新数据时自动更新。
结论
使用 Python、Google Cloud Platform 函数和 Google Cloud Scheduler,您应该能够确保您的索引定期更新。您可以在此处找到完整代码,以及本地测试的搜索实验室笔记本。我们还与Google Cloud 举办了一个按需网络研讨会,如果您希望构建搜索应用程序,这可能是下一步的好选择。如果您根据此博客构建了任何内容,或者您在我们Discuss 论坛和社区 Slack 频道上存在疑问,请告知我们。
想获得 Elastic 认证?了解下一场Elasticsearch 工程师培训何时开始!
Elasticsearch 拥有大量新功能,可帮助您为您的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在您的本地机器上试用 Elastic。