连接器 API 教程
编辑连接器 API 教程
编辑了解如何使用Elasticsearch 连接器 API设置自管理连接器。
在此示例中,我们将使用 connectors-postgresql,PostgreSQL 连接器将数据从 PostgreSQL 数据库同步到 Elasticsearch。我们将使用 Docker 启动一个简单的 PostgreSQL 实例,其中包含一些示例数据,创建一个连接器,并将数据同步到 Elasticsearch。您可以按照相同的步骤为其他数据源设置连接器。
本教程重点介绍在您自己的基础设施上运行自管理连接器,以及使用连接器 API 管理同步。请参阅连接器以概述连接器的工作原理。
如果您刚开始使用 Elasticsearch,本教程可能有点高级。请参阅快速入门,以获取更适合初学者的 Elasticsearch 入门介绍。
如果您刚开始使用连接器,您可能希望首先在 UI 中开始。我们有两个教程专注于使用 UI 管理连接器
- Elastic 托管连接器教程。设置一个原生 MongoDB 连接器,完全由 Elastic Cloud 托管。
- 自管理连接器教程。设置一个自管理的 PostgreSQL 连接器。
先决条件
编辑- 您应该熟悉连接器,连接器的工作原理,以了解 API 调用与整体连接器设置的关系。
- 您需要安装Docker Desktop。
- 您需要运行 Elasticsearch,并拥有一个访问它的 API 密钥。如果您还没有 Elasticsearch 部署,请参阅下一节了解详细信息。
设置 Elasticsearch
编辑如果您已经在 Elastic Cloud 上有 Elasticsearch 部署(托管部署或无服务器项目),那么您就可以开始了。为了在 Docker 中以本地开发模式启动 Elasticsearch 以进行测试,请打开下面的可折叠部分。
在 Docker 中运行本地 Elasticsearch
docker run -p 9200:9200 -d --name elasticsearch \ -e "discovery.type=single-node" \ -e "xpack.security.enabled=false" \ -e "xpack.security.http.ssl.enabled=false" \ -e "xpack.license.self_generated.type=trial" \ docker.elastic.co/elasticsearch/elasticsearch:8.16.0
此 Elasticsearch 设置仅用于开发目的。切勿在生产环境中使用此配置。请参阅设置 Elasticsearch以获取生产级安装说明,包括 Docker。
我们将使用默认密码changeme
用于elastic
用户。对于生产环境,始终确保您的集群在启用安全性的情况下运行。
export ELASTIC_PASSWORD="changeme"
由于我们在本地运行集群时禁用了安全性,因此我们不会使用 API 密钥对 Elasticsearch 进行身份验证。相反,在每个 cURL 请求中,我们将使用-u
标志进行身份验证。
让我们测试一下我们是否可以访问 Elasticsearch
curl -s -X GET -u elastic:$ELASTIC_PASSWORD https://127.0.0.1:9200
注意:当 Elasticsearch 在本地运行时,您需要将用户名和密码传递给连接器服务的配置文件,以对 Elasticsearch 进行身份验证。
在 Docker 中运行 PostgreSQL 实例(可选)
编辑在本教程中,我们将使用 Docker 设置一个 PostgreSQL 实例,其中包含一些示例数据。当然,如果您有自己的 PostgreSQL 实例,您可以跳过此步骤并使用您自己的现有 PostgreSQL 实例。请记住,使用不同的实例可能需要调整后续步骤中描述的连接器配置。
展开在 Docker 中运行简单的 PostgreSQL 实例并导入示例数据
让我们启动一个 PostgreSQL 容器,其中包含一个用户和密码,在端口5432
上公开
docker run --name postgres -e POSTGRES_USER=myuser -e POSTGRES_PASSWORD=mypassword -p 5432:5432 -d postgres
下载并导入示例数据
接下来,我们需要创建一个目录来存储本教程的示例数据集。在您的终端中,运行以下命令
mkdir -p ~/data
我们将使用Chinook 数据集示例数据。
运行以下命令将文件下载到~/data
目录
curl -L https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_PostgreSql.sql -o ~/data/Chinook_PostgreSql.sql
现在我们需要将示例数据导入 PostgreSQL 容器并创建表。
运行以下 Docker 命令将我们的示例数据复制到容器中并执行psql
脚本
docker cp ~/data/Chinook_PostgreSql.sql postgres:/ docker exec -it postgres psql -U myuser -f /Chinook_PostgreSql.sql
让我们验证表是否已在chinook
数据库中正确创建
docker exec -it postgres psql -U myuser -d chinook -c "\dt"
album
表应包含347个条目,而artist
表应包含275个条目。
本教程使用非常基本的设置。要使用高级功能(如过滤规则和增量同步),请在您的 PostgreSQL 数据库上启用track_commit_timestamp
。请参阅postgresql-connector-client-tutorial了解更多详细信息。
现在是真正有趣的时候了!我们将设置一个连接器,以便在 Elasticsearch 中创建我们 PostgreSQL 数据的可搜索镜像。
创建连接器
编辑我们将使用创建连接器 API来创建 PostgreSQL 连接器实例。
运行以下 API 调用,使用Dev Tools 控制台或curl
resp = client.connector.put( connector_id="my-connector-id", name="Music catalog", index_name="music", service_type="postgresql", ) print(resp)
const response = await client.connector.put({ connector_id: "my-connector-id", name: "Music catalog", index_name: "music", service_type: "postgresql", }); console.log(response);
PUT _connector/my-connector-id { "name": "Music catalog", "index_name": "music", "service_type": "postgresql" }
service_type
指的是您要连接到的第三方数据源。
请注意,我们指定了my-connector-id
ID作为PUT
请求的一部分。我们需要连接器 ID 来在本地设置和运行连接器服务。
如果您希望使用自动生成的 ID,请将PUT _connector/my-connector-id
替换为POST _connector
。
运行连接器服务
编辑如果您使用的是我们托管的 Elastic 托管连接器,则连接器服务会在 Elastic Cloud 中自动运行。因为我们正在运行自管理连接器,所以我们需要在本地启动此服务。
现在我们将运行连接器服务,以便我们可以开始将数据从我们的 PostgreSQL 实例同步到 Elasticsearch。我们将使用连接器-从 Docker 运行中概述的步骤。
在您自己的基础设施上运行连接器服务时,您需要提供一个包含以下详细信息的配置文件
- 您的 Elasticsearch 端点(
elasticsearch.host
) - Elasticsearch API 密钥(
elasticsearch.api_key
) - 您的第三方数据源类型(
service_type
) - 您的连接器 ID(
connector_id
)
创建 API 密钥
编辑如果您还没有创建 API 密钥来访问 Elasticsearch,则可以使用_security/api_key端点。
在这里,我们假设您的目标 Elasticsearch 索引名称为music
。如果您使用不同的索引名称,请相应调整请求主体。
resp = client.security.create_api_key( name="music-connector", role_descriptors={ "music-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "music", ".search-acl-filter-music", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": False } ] } }, ) print(resp)
const response = await client.security.createApiKey({ name: "music-connector", role_descriptors: { "music-connector-role": { cluster: ["monitor", "manage_connector"], indices: [ { names: ["music", ".search-acl-filter-music", ".elastic-connectors*"], privileges: ["all"], allow_restricted_indices: false, }, ], }, }, }); console.log(response);
POST /_security/api_key { "name": "music-connector", "role_descriptors": { "music-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "music", ".search-acl-filter-music", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": false } ] } } }
您需要使用响应中的encoded
值作为配置文件中的elasticsearch.api_key
。
您也可以在 Kibana 和无服务器 UI 中创建 API 密钥。
准备配置文件
编辑让我们创建一个目录和一个config.yml
文件来存储连接器配置
mkdir -p ~/connectors-config touch ~/connectors-config/config.yml
现在,让我们将我们的连接器详细信息添加到配置文件中。打开config.yml
并粘贴以下配置,将占位符替换为您自己的值
elasticsearch.host: <ELASTICSEARCH_ENDPOINT> # Your Elasticsearch endpoint elasticsearch.api_key: <ELASTICSEARCH_API_KEY> # Your Elasticsearch API key connectors: - connector_id: "my-connector-id" service_type: "postgresql"
我们在elastic/connectors
存储库中提供了一个示例配置文件以供参考。
运行连接器服务
编辑现在我们已经设置了配置文件,我们可以在本地运行连接器服务。这将使您的连接器实例指向您的 Elasticsearch 部署。
运行以下 Docker 命令以启动连接器服务
docker run \ -v "$HOME/connectors-config:/config" \ --rm \ --tty -i \ --network host \ docker.elastic.co/integrations/elastic-connectors:8.16.0.0 \ /app/bin/elastic-ingest \ -c /config/config.yml
通过获取连接器状态(应为needs_configuration
)和last_seen
字段(请注意,时间以 UTC 报告)来验证您的连接器是否已连接。last_seen
字段表示连接器已成功连接到 Elasticsearch。
resp = client.connector.get( connector_id="my-connector-id", ) print(resp)
const response = await client.connector.get({ connector_id: "my-connector-id", }); console.log(response);
GET _connector/my-connector-id
配置连接器
编辑现在我们的连接器实例已启动并正在运行,但它还不知道从哪里同步数据。拼图的最后一块是使用有关我们的 PostgreSQL 实例的详细信息配置我们的连接器。在 Elastic Cloud 或无服务器 UI 中设置连接器时,系统会提示您在用户界面中添加这些详细信息。
但是,由于本教程完全是关于以编程方式使用连接器,因此我们将使用更新连接器配置 API来添加我们的配置详细信息。
在配置连接器之前,请确保服务已注册配置模式。对于 Elastic 托管连接器,这会在通过 API 创建后不久发生。对于自管理连接器,模式会在服务启动时(一旦填充了config.yml
)注册。
仅在模式注册后才能通过 API 更新配置。通过检查GET _connector/my-connector-id
请求返回的配置属性来验证这一点。它应该是非空的。
运行以下 API 调用以使用我们的connectors-postgresql-client-configuration,PostgreSQL配置详细信息配置连接器
resp = client.connector.update_configuration( connector_id="my-connector-id", values={ "host": "127.0.0.1", "port": 5432, "username": "myuser", "password": "mypassword", "database": "chinook", "schema": "public", "tables": "album,artist" }, ) print(resp)
const response = await client.connector.updateConfiguration({ connector_id: "my-connector-id", values: { host: "127.0.0.1", port: 5432, username: "myuser", password: "mypassword", database: "chinook", schema: "public", tables: "album,artist", }, }); console.log(response);
PUT _connector/my-connector-id/_configuration { "values": { "host": "127.0.0.1", "port": 5432, "username": "myuser", "password": "mypassword", "database": "chinook", "schema": "public", "tables": "album,artist" } }
配置详情特定于连接器类型。密钥和值将根据您连接到的第三方数据源而有所不同。请参阅各个连接器参考,以了解这些配置详情。
同步数据
编辑本教程中我们使用的是自管理连接器。要将这些 API 与 Elastic 托管连接器一起使用,需要进行一些额外的 API 密钥设置。有关详细信息,请参阅管理 API 密钥。
现在,我们准备将 PostgreSQL 数据同步到 Elasticsearch。运行以下 API 调用以启动完整同步作业。
resp = client.perform_request( "POST", "/_connector/_sync_job", headers={"Content-Type": "application/json"}, body={ "id": "my-connector-id", "job_type": "full" }, ) print(resp)
const response = await client.transport.request({ method: "POST", path: "/_connector/_sync_job", body: { id: "my-connector-id", job_type: "full", }, }); console.log(response);
POST _connector/_sync_job { "id": "my-connector-id", "job_type": "full" }
为了将数据存储在 Elasticsearch 中,连接器需要创建一个索引。在创建连接器时,我们指定了music
索引。连接器将在启动同步作业之前创建并配置此 Elasticsearch 索引。
检查同步状态
编辑使用获取同步作业 API跟踪同步作业的状态和进度。默认情况下,最近的作业状态将首先返回。运行以下 API 调用以检查同步作业的状态。
resp = client.perform_request( "GET", "/_connector/_sync_job", params={ "connector_id": "my-connector-id", "size": "1" }, ) print(resp)
const response = await client.transport.request({ method: "GET", path: "/_connector/_sync_job", querystring: { connector_id: "my-connector-id", size: "1", }, }); console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1
随着同步的进行,作业文档将更新,您可以随时检查它以轮询更新。
作业完成后,状态应为completed
,indexed_document_count
应为622。
使用以下 API 调用验证数据是否存在于music
索引中。
resp = client.count( index="music", ) print(resp)
const response = await client.count({ index: "music", }); console.log(response);
GET music/_count
Elasticsearch 将数据存储在文档中,文档是 JSON 对象。使用以下 API 调用列出各个文档。
resp = client.search( index="music", ) print(resp)
const response = await client.search({ index: "music", }); console.log(response);
GET music/_search
故障排除
编辑使用以下命令检查最新同步作业的状态。
resp = client.perform_request( "GET", "/_connector/_sync_job", params={ "connector_id": "my-connector-id", "size": "1" }, ) print(resp)
const response = await client.transport.request({ method: "GET", path: "/_connector/_sync_job", querystring: { connector_id: "my-connector-id", size: "1", }, }); console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1
如果连接器在同步过程中遇到任何错误,您将在error
字段中找到这些错误。
清理
编辑要删除连接器及其关联的同步作业,请运行以下命令。
resp = client.connector.delete( connector_id="my-connector-id&delete_sync_jobs=true", ) print(resp)
const response = await client.connector.delete({ connector_id: "my-connector-id&delete_sync_jobs=true", }); console.log(response);
DELETE _connector/my-connector-id&delete_sync_jobs=true
这不会删除连接器创建的用于存储数据的 Elasticsearch 索引。通过运行以下命令删除music
索引。
resp = client.indices.delete( index="music", ) print(resp)
const response = await client.indices.delete({ index: "music", }); console.log(response);
DELETE music
要删除 PostgreSQL 容器,请运行以下命令。
docker stop postgres docker rm postgres
要删除连接器服务,请运行以下命令。
docker stop <container_id> docker rm <container_id>
后续步骤
编辑恭喜!您已成功使用连接器 API 设置了自管理连接器。
以下是一些可供探索的后续步骤。