连接器 API 教程
编辑连接器 API 教程
编辑了解如何使用 Elasticsearch 连接器 API 设置自管理连接器。
在本示例中,我们将使用 connectors-postgresql,PostgreSQL 连接器将数据从 PostgreSQL 数据库同步到 Elasticsearch。我们将在 Docker 中启动一个简单的 PostgreSQL 实例并提供一些示例数据,创建一个连接器,并将数据同步到 Elasticsearch。你可以按照相同的步骤设置其他数据源的连接器。
本教程重点介绍如何在您自己的基础设施上运行自管理连接器,并使用连接器 API 管理同步。有关连接器如何工作的概述,请参阅 连接器。
如果您刚开始使用 Elasticsearch,本教程可能有点高级。请参阅 快速入门,了解更适合初学者的 Elasticsearch 介绍。
如果您刚开始使用连接器,您可能想先从 UI 开始。我们有两个教程重点介绍如何使用 UI 管理连接器。
- Elastic 托管连接器教程。设置一个原生的 MongoDB 连接器,在 Elastic Cloud 中完全托管。
- 自管理连接器教程。设置一个自管理的 PostgreSQL 连接器。
先决条件
编辑- 您应该熟悉连接器,连接器的工作方式,以便了解 API 调用如何与整体连接器设置相关。
- 您需要安装 Docker Desktop。
- 您需要运行 Elasticsearch 和用于访问它的 API 密钥。如果您还没有 Elasticsearch 部署,请参阅下一节了解详细信息。
设置 Elasticsearch
编辑如果您已经在 Elastic Cloud(托管部署或无服务器项目)上部署了 Elasticsearch,那么您就可以开始了。要在 Docker 中以本地开发模式启动 Elasticsearch 进行测试,请打开下面的可折叠部分。
在 Docker 中运行本地 Elasticsearch
docker run -p 9200:9200 -d --name elasticsearch \ -e "discovery.type=single-node" \ -e "xpack.security.enabled=false" \ -e "xpack.security.http.ssl.enabled=false" \ -e "xpack.license.self_generated.type=trial" \ docker.elastic.co/elasticsearch/elasticsearch:8.17.0
此 Elasticsearch 设置仅用于开发目的。切勿在生产环境中使用此配置。有关生产级安装说明,包括 Docker,请参阅 设置 Elasticsearch。
我们将使用默认密码 changeme
作为 elastic
用户。对于生产环境,请始终确保您的集群启用安全性。
export ELASTIC_PASSWORD="changeme"
由于我们在本地运行集群时禁用了安全性,因此我们不会使用 API 密钥对 Elasticsearch 进行身份验证。相反,在每个 cURL 请求中,我们将使用 -u
标志进行身份验证。
让我们测试一下是否可以访问 Elasticsearch
curl -s -X GET -u elastic:$ELASTIC_PASSWORD https://127.0.0.1:9200
注意:当 Elasticsearch 在本地运行时,您需要在连接器服务的配置文件中传递用户名和密码才能对 Elasticsearch 进行身份验证。
在 Docker 中运行 PostgreSQL 实例(可选)
编辑在本教程中,我们将在 Docker 中设置一个包含一些示例数据的 PostgreSQL 实例。当然,如果您已经有 PostgreSQL 实例,可以跳过此步骤并使用您自己现有的 PostgreSQL 实例。请记住,使用不同的实例可能需要对后续步骤中描述的连接器配置进行调整。
展开以在 Docker 中运行简单的 PostgreSQL 实例并导入示例数据
让我们启动一个 PostgreSQL 容器,其中包含用户和密码,在端口 5432
公开
docker run --name postgres -e POSTGRES_USER=myuser -e POSTGRES_PASSWORD=mypassword -p 5432:5432 -d postgres
下载并导入示例数据
接下来,我们需要创建一个目录来存储本教程的示例数据集。在您的终端中,运行以下命令
mkdir -p ~/data
我们将使用 Chinook 数据集示例数据。
运行以下命令将文件下载到 ~/data
目录
curl -L https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_PostgreSql.sql -o ~/data/Chinook_PostgreSql.sql
现在我们需要将示例数据导入到 PostgreSQL 容器中并创建表。
运行以下 Docker 命令将我们的示例数据复制到容器中并执行 psql
脚本
docker cp ~/data/Chinook_PostgreSql.sql postgres:/ docker exec -it postgres psql -U myuser -f /Chinook_PostgreSql.sql
让我们验证表是否在 chinook
数据库中正确创建
docker exec -it postgres psql -U myuser -d chinook -c "\dt"
album
表应包含 347 个条目,而 artist
表应包含 275 个条目。
本教程使用非常基本的设置。要使用高级功能(例如筛选规则和增量同步),请在您的 PostgreSQL 数据库上启用 track_commit_timestamp
。有关更多详细信息,请参阅 postgresql-connector-client-tutorial。
现在是真正有趣的时候了!我们将设置一个连接器,在 Elasticsearch 中创建我们 PostgreSQL 数据的可搜索镜像。
创建连接器
编辑我们将使用 创建连接器 API 来创建 PostgreSQL 连接器实例。
使用 Dev Tools Console 或 curl
运行以下 API 调用
resp = client.connector.put( connector_id="my-connector-id", name="Music catalog", index_name="music", service_type="postgresql", ) print(resp)
const response = await client.connector.put({ connector_id: "my-connector-id", name: "Music catalog", index_name: "music", service_type: "postgresql", }); console.log(response);
PUT _connector/my-connector-id { "name": "Music catalog", "index_name": "music", "service_type": "postgresql" }
service_type
指的是您要连接的第三方数据源。
请注意,我们在 PUT
请求中指定了 my-connector-id
ID。我们需要连接器 ID 在本地设置和运行连接器服务。
如果您希望使用自动生成的 ID,请将 PUT _connector/my-connector-id
替换为 POST _connector
。
运行连接器服务
编辑如果您正在使用我们托管的 Elastic 托管连接器,则连接器服务会在 Elastic Cloud 中自动运行。因为我们正在运行自管理连接器,所以我们需要在本地启动此服务。
现在我们将运行连接器服务,以便我们可以开始将数据从我们的 PostgreSQL 实例同步到 Elasticsearch。我们将使用 connectors-run-from-docker 中概述的步骤。
在您自己的基础设施上运行连接器服务时,您需要提供一个包含以下详细信息的配置文件
- 您的 Elasticsearch 端点 (
elasticsearch.host
) - Elasticsearch API 密钥 (
elasticsearch.api_key
) - 您的第三方数据源类型 (
service_type
) - 您的连接器 ID (
connector_id
)
创建 API 密钥
编辑如果您尚未创建用于访问 Elasticsearch 的 API 密钥,则可以使用 _security/api_key 端点。
在这里,我们假设您的目标 Elasticsearch 索引名称是 music
。如果您使用不同的索引名称,请相应地调整请求正文。
resp = client.security.create_api_key( name="music-connector", role_descriptors={ "music-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "music", ".search-acl-filter-music", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": False } ] } }, ) print(resp)
const response = await client.security.createApiKey({ name: "music-connector", role_descriptors: { "music-connector-role": { cluster: ["monitor", "manage_connector"], indices: [ { names: ["music", ".search-acl-filter-music", ".elastic-connectors*"], privileges: ["all"], allow_restricted_indices: false, }, ], }, }, }); console.log(response);
POST /_security/api_key { "name": "music-connector", "role_descriptors": { "music-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "music", ".search-acl-filter-music", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": false } ] } } }
您需要使用响应中的 encoded
值作为配置文件中的 elasticsearch.api_key
。
您还可以在 Kibana 和无服务器 UI 中创建 API 密钥。
准备配置文件
编辑让我们创建一个目录和一个 config.yml
文件来存储连接器配置
mkdir -p ~/connectors-config touch ~/connectors-config/config.yml
现在,让我们将我们的连接器详细信息添加到配置文件中。打开 config.yml
并粘贴以下配置,将占位符替换为您自己的值
elasticsearch.host: <ELASTICSEARCH_ENDPOINT> # Your Elasticsearch endpoint elasticsearch.api_key: <ELASTICSEARCH_API_KEY> # Your Elasticsearch API key connectors: - connector_id: "my-connector-id" service_type: "postgresql"
我们在 elastic/connectors
存储库中提供了一个 示例配置文件供参考。
运行连接器服务
编辑现在我们已经设置好配置文件,我们可以在本地运行连接器服务。这将把您的连接器实例指向您的 Elasticsearch 部署。
运行以下 Docker 命令以启动连接器服务
docker run \ -v "$HOME/connectors-config:/config" \ --rm \ --tty -i \ --network host \ docker.elastic.co/enterprise-search/elastic-connectors:8.17.0.0 \ /app/bin/elastic-ingest \ -c /config/config.yml
通过获取连接器状态(应为 needs_configuration
)和 last_seen
字段(请注意时间以 UTC 报告)来验证您的连接器已连接。last_seen
字段表示连接器已成功连接到 Elasticsearch。
resp = client.connector.get( connector_id="my-connector-id", ) print(resp)
const response = await client.connector.get({ connector_id: "my-connector-id", }); console.log(response);
GET _connector/my-connector-id
配置连接器
编辑现在我们的连接器实例已启动并运行,但它还不知道要从哪里同步数据。最后一个难题是使用有关我们的 PostgreSQL 实例的详细信息配置我们的连接器。在 Elastic Cloud 或无服务器 UI 中设置连接器时,系统会提示您在用户界面中添加这些详细信息。
但是,由于本教程完全是关于以编程方式使用连接器的,我们将使用 更新连接器配置 API 来添加我们的配置详细信息。
在配置连接器之前,请确保该服务已注册配置模式。对于 Elastic 托管的连接器,这会在通过 API 创建后不久发生。对于自管理连接器,该模式会在服务启动时注册(一旦填充 config.yml
)。
仅在模式注册后才能通过 API 进行配置更新。通过检查 GET _connector/my-connector-id
请求返回的配置属性来验证这一点。它应该是非空的。
运行以下 API 调用,使用我们的 connectors-postgresql-client-configuration,PostgreSQL 配置详细信息配置连接器
resp = client.connector.update_configuration( connector_id="my-connector-id", values={ "host": "127.0.0.1", "port": 5432, "username": "myuser", "password": "mypassword", "database": "chinook", "schema": "public", "tables": "album,artist" }, ) print(resp)
const response = await client.connector.updateConfiguration({ connector_id: "my-connector-id", values: { host: "127.0.0.1", port: 5432, username: "myuser", password: "mypassword", database: "chinook", schema: "public", tables: "album,artist", }, }); console.log(response);
PUT _connector/my-connector-id/_configuration { "values": { "host": "127.0.0.1", "port": 5432, "username": "myuser", "password": "mypassword", "database": "chinook", "schema": "public", "tables": "album,artist" } }
配置详细信息特定于连接器类型。密钥和值会因您连接的第三方数据源而异。有关这些配置详细信息,请参阅各个 connectors-references,连接器参考。
同步数据
编辑在本教程中,我们使用的是自托管连接器。若要将这些 API 与 Elastic 托管的连接器一起使用,则需要额外设置 API 密钥。有关详细信息,请参阅管理 API 密钥。
现在,我们准备将 PostgreSQL 数据同步到 Elasticsearch。运行以下 API 调用以启动完整同步作业
resp = client.perform_request( "POST", "/_connector/_sync_job", headers={"Content-Type": "application/json"}, body={ "id": "my-connector-id", "job_type": "full" }, ) print(resp)
const response = await client.transport.request({ method: "POST", path: "/_connector/_sync_job", body: { id: "my-connector-id", job_type: "full", }, }); console.log(response);
POST _connector/_sync_job { "id": "my-connector-id", "job_type": "full" }
为了在 Elasticsearch 中存储数据,连接器需要创建一个索引。在创建连接器时,我们指定了 music
索引。连接器将在启动同步作业之前创建并配置此 Elasticsearch 索引。
检查同步状态
编辑使用获取同步作业 API来跟踪同步作业的状态和进度。默认情况下,最新的作业状态会最先返回。运行以下 API 调用以检查同步作业的状态
resp = client.perform_request( "GET", "/_connector/_sync_job", params={ "connector_id": "my-connector-id", "size": "1" }, ) print(resp)
const response = await client.transport.request({ method: "GET", path: "/_connector/_sync_job", querystring: { connector_id: "my-connector-id", size: "1", }, }); console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1
随着同步的进行,作业文档将更新,您可以根据需要经常检查它以轮询更新。
作业完成后,状态应为 completed
,indexed_document_count
应为 622。
使用以下 API 调用验证 music
索引中是否存在数据
resp = client.count( index="music", ) print(resp)
const response = await client.count({ index: "music", }); console.log(response);
GET music/_count
Elasticsearch 将数据存储在文档中,这些文档是 JSON 对象。使用以下 API 调用列出单个文档
resp = client.search( index="music", ) print(resp)
const response = await client.search({ index: "music", }); console.log(response);
GET music/_search
故障排除
编辑使用以下命令检查最新同步作业的状态
resp = client.perform_request( "GET", "/_connector/_sync_job", params={ "connector_id": "my-connector-id", "size": "1" }, ) print(resp)
const response = await client.transport.request({ method: "GET", path: "/_connector/_sync_job", querystring: { connector_id: "my-connector-id", size: "1", }, }); console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1
如果连接器在同步期间遇到任何错误,您可以在 error
字段中找到这些错误。
清理
编辑要删除连接器及其关联的同步作业,请运行此命令
resp = client.connector.delete( connector_id="my-connector-id&delete_sync_jobs=true", ) print(resp)
const response = await client.connector.delete({ connector_id: "my-connector-id&delete_sync_jobs=true", }); console.log(response);
DELETE _connector/my-connector-id&delete_sync_jobs=true
这不会删除连接器创建的用于存储数据的 Elasticsearch 索引。运行以下命令删除 music
索引
resp = client.indices.delete( index="music", ) print(resp)
const response = await client.indices.delete({ index: "music", }); console.log(response);
DELETE music
要删除 PostgreSQL 容器,请运行以下命令
docker stop postgres docker rm postgres
要删除连接器服务,请运行以下命令
docker stop <container_id> docker rm <container_id>
后续步骤
编辑恭喜!您已使用连接器 API 成功设置了自托管连接器。
以下是一些后续步骤,可供您探索