连接器 API 教程

编辑

了解如何使用 Elasticsearch 连接器 API 设置自管理连接器。

在本示例中,我们将使用 connectors-postgresql,PostgreSQL 连接器将数据从 PostgreSQL 数据库同步到 Elasticsearch。我们将在 Docker 中启动一个简单的 PostgreSQL 实例并提供一些示例数据,创建一个连接器,并将数据同步到 Elasticsearch。你可以按照相同的步骤设置其他数据源的连接器。

本教程重点介绍如何在您自己的基础设施上运行自管理连接器,并使用连接器 API 管理同步。有关连接器如何工作的概述,请参阅 连接器。

如果您刚开始使用 Elasticsearch,本教程可能有点高级。请参阅 快速入门,了解更适合初学者的 Elasticsearch 介绍。

如果您刚开始使用连接器,您可能想先从 UI 开始。我们有两个教程重点介绍如何使用 UI 管理连接器。

先决条件

编辑
  • 您应该熟悉连接器,连接器的工作方式,以便了解 API 调用如何与整体连接器设置相关。
  • 您需要安装 Docker Desktop
  • 您需要运行 Elasticsearch 和用于访问它的 API 密钥。如果您还没有 Elasticsearch 部署,请参阅下一节了解详细信息。

设置 Elasticsearch

编辑

如果您已经在 Elastic Cloud(托管部署无服务器项目)上部署了 Elasticsearch,那么您就可以开始了。要在 Docker 中以本地开发模式启动 Elasticsearch 进行测试,请打开下面的可折叠部分。

在 Docker 中运行本地 Elasticsearch
docker run -p 9200:9200 -d --name elasticsearch \
  -e "discovery.type=single-node" \
  -e "xpack.security.enabled=false" \
  -e "xpack.security.http.ssl.enabled=false" \
  -e "xpack.license.self_generated.type=trial" \
  docker.elastic.co/elasticsearch/elasticsearch:8.17.0

此 Elasticsearch 设置仅用于开发目的。切勿在生产环境中使用此配置。有关生产级安装说明,包括 Docker,请参阅 设置 Elasticsearch

我们将使用默认密码 changeme 作为 elastic 用户。对于生产环境,请始终确保您的集群启用安全性。

export ELASTIC_PASSWORD="changeme"

由于我们在本地运行集群时禁用了安全性,因此我们不会使用 API 密钥对 Elasticsearch 进行身份验证。相反,在每个 cURL 请求中,我们将使用 -u 标志进行身份验证。

让我们测试一下是否可以访问 Elasticsearch

curl -s -X GET -u elastic:$ELASTIC_PASSWORD https://127.0.0.1:9200

注意:当 Elasticsearch 在本地运行时,您需要在连接器服务的配置文件中传递用户名和密码才能对 Elasticsearch 进行身份验证。

在 Docker 中运行 PostgreSQL 实例(可选)

编辑

在本教程中,我们将在 Docker 中设置一个包含一些示例数据的 PostgreSQL 实例。当然,如果您已经有 PostgreSQL 实例,可以跳过此步骤并使用您自己现有的 PostgreSQL 实例。请记住,使用不同的实例可能需要对后续步骤中描述的连接器配置进行调整。

展开以在 Docker 中运行简单的 PostgreSQL 实例并导入示例数据

让我们启动一个 PostgreSQL 容器,其中包含用户和密码,在端口 5432 公开

docker run --name postgres -e POSTGRES_USER=myuser -e POSTGRES_PASSWORD=mypassword -p 5432:5432 -d postgres

下载并导入示例数据

接下来,我们需要创建一个目录来存储本教程的示例数据集。在您的终端中,运行以下命令

mkdir -p ~/data

我们将使用 Chinook 数据集示例数据。

运行以下命令将文件下载到 ~/data 目录

curl -L https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_PostgreSql.sql -o ~/data/Chinook_PostgreSql.sql

现在我们需要将示例数据导入到 PostgreSQL 容器中并创建表。

运行以下 Docker 命令将我们的示例数据复制到容器中并执行 psql 脚本

docker cp ~/data/Chinook_PostgreSql.sql postgres:/
docker exec -it postgres psql -U myuser -f /Chinook_PostgreSql.sql

让我们验证表是否在 chinook 数据库中正确创建

docker exec -it postgres psql -U myuser -d chinook -c "\dt"

album 表应包含 347 个条目,而 artist 表应包含 275 个条目。

本教程使用非常基本的设置。要使用高级功能(例如筛选规则和增量同步),请在您的 PostgreSQL 数据库上启用 track_commit_timestamp。有关更多详细信息,请参阅 postgresql-connector-client-tutorial。

现在是真正有趣的时候了!我们将设置一个连接器,在 Elasticsearch 中创建我们 PostgreSQL 数据的可搜索镜像。

创建连接器

编辑

我们将使用 创建连接器 API 来创建 PostgreSQL 连接器实例。

使用 Dev Tools Consolecurl 运行以下 API 调用

resp = client.connector.put(
    connector_id="my-connector-id",
    name="Music catalog",
    index_name="music",
    service_type="postgresql",
)
print(resp)
const response = await client.connector.put({
  connector_id: "my-connector-id",
  name: "Music catalog",
  index_name: "music",
  service_type: "postgresql",
});
console.log(response);
PUT _connector/my-connector-id
{
  "name": "Music catalog",
  "index_name":  "music",
  "service_type": "postgresql"
}

service_type 指的是您要连接的第三方数据源。

请注意,我们在 PUT 请求中指定了 my-connector-id ID。我们需要连接器 ID 在本地设置和运行连接器服务。

如果您希望使用自动生成的 ID,请将 PUT _connector/my-connector-id 替换为 POST _connector

运行连接器服务

编辑

如果您正在使用我们托管的 Elastic 托管连接器,则连接器服务会在 Elastic Cloud 中自动运行。因为我们正在运行自管理连接器,所以我们需要在本地启动此服务。

现在我们将运行连接器服务,以便我们可以开始将数据从我们的 PostgreSQL 实例同步到 Elasticsearch。我们将使用 connectors-run-from-docker 中概述的步骤。

在您自己的基础设施上运行连接器服务时,您需要提供一个包含以下详细信息的配置文件

  • 您的 Elasticsearch 端点 (elasticsearch.host)
  • Elasticsearch API 密钥 (elasticsearch.api_key)
  • 您的第三方数据源类型 (service_type)
  • 您的连接器 ID (connector_id)
创建 API 密钥
编辑

如果您尚未创建用于访问 Elasticsearch 的 API 密钥,则可以使用 _security/api_key 端点。

在这里,我们假设您的目标 Elasticsearch 索引名称是 music。如果您使用不同的索引名称,请相应地调整请求正文。

resp = client.security.create_api_key(
    name="music-connector",
    role_descriptors={
        "music-connector-role": {
            "cluster": [
                "monitor",
                "manage_connector"
            ],
            "indices": [
                {
                    "names": [
                        "music",
                        ".search-acl-filter-music",
                        ".elastic-connectors*"
                    ],
                    "privileges": [
                        "all"
                    ],
                    "allow_restricted_indices": False
                }
            ]
        }
    },
)
print(resp)
const response = await client.security.createApiKey({
  name: "music-connector",
  role_descriptors: {
    "music-connector-role": {
      cluster: ["monitor", "manage_connector"],
      indices: [
        {
          names: ["music", ".search-acl-filter-music", ".elastic-connectors*"],
          privileges: ["all"],
          allow_restricted_indices: false,
        },
      ],
    },
  },
});
console.log(response);
POST /_security/api_key
{
  "name": "music-connector",
  "role_descriptors": {
    "music-connector-role": {
      "cluster": [
        "monitor",
        "manage_connector"
      ],
      "indices": [
        {
          "names": [
            "music",
            ".search-acl-filter-music",
            ".elastic-connectors*"
          ],
          "privileges": [
            "all"
          ],
          "allow_restricted_indices": false
        }
      ]
    }
  }
}

您需要使用响应中的 encoded 值作为配置文件中的 elasticsearch.api_key

您还可以在 Kibana 和无服务器 UI 中创建 API 密钥。

准备配置文件
编辑

让我们创建一个目录和一个 config.yml 文件来存储连接器配置

mkdir -p ~/connectors-config
touch ~/connectors-config/config.yml

现在,让我们将我们的连接器详细信息添加到配置文件中。打开 config.yml 并粘贴以下配置,将占位符替换为您自己的值

elasticsearch.host: <ELASTICSEARCH_ENDPOINT> # Your Elasticsearch endpoint
elasticsearch.api_key: <ELASTICSEARCH_API_KEY> # Your Elasticsearch API key

connectors:
  - connector_id: "my-connector-id"
    service_type: "postgresql"

我们在 elastic/connectors 存储库中提供了一个 示例配置文件供参考。

运行连接器服务
编辑

现在我们已经设置好配置文件,我们可以在本地运行连接器服务。这将把您的连接器实例指向您的 Elasticsearch 部署。

运行以下 Docker 命令以启动连接器服务

docker run \
-v "$HOME/connectors-config:/config" \
--rm \
--tty -i \
--network host \
docker.elastic.co/enterprise-search/elastic-connectors:8.17.0.0 \
/app/bin/elastic-ingest \
-c /config/config.yml

通过获取连接器状态(应为 needs_configuration)和 last_seen 字段(请注意时间以 UTC 报告)来验证您的连接器已连接。last_seen 字段表示连接器已成功连接到 Elasticsearch。

resp = client.connector.get(
    connector_id="my-connector-id",
)
print(resp)
const response = await client.connector.get({
  connector_id: "my-connector-id",
});
console.log(response);
GET _connector/my-connector-id

配置连接器

编辑

现在我们的连接器实例已启动并运行,但它还不知道要从哪里同步数据。最后一个难题是使用有关我们的 PostgreSQL 实例的详细信息配置我们的连接器。在 Elastic Cloud 或无服务器 UI 中设置连接器时,系统会提示您在用户界面中添加这些详细信息。

但是,由于本教程完全是关于以编程方式使用连接器的,我们将使用 更新连接器配置 API 来添加我们的配置详细信息。

在配置连接器之前,请确保该服务已注册配置模式。对于 Elastic 托管的连接器,这会在通过 API 创建后不久发生。对于自管理连接器,该模式会在服务启动时注册(一旦填充 config.yml)。

仅在模式注册后才能通过 API 进行配置更新。通过检查 GET _connector/my-connector-id 请求返回的配置属性来验证这一点。它应该是非空的。

运行以下 API 调用,使用我们的 connectors-postgresql-client-configuration,PostgreSQL 配置详细信息配置连接器

resp = client.connector.update_configuration(
    connector_id="my-connector-id",
    values={
        "host": "127.0.0.1",
        "port": 5432,
        "username": "myuser",
        "password": "mypassword",
        "database": "chinook",
        "schema": "public",
        "tables": "album,artist"
    },
)
print(resp)
const response = await client.connector.updateConfiguration({
  connector_id: "my-connector-id",
  values: {
    host: "127.0.0.1",
    port: 5432,
    username: "myuser",
    password: "mypassword",
    database: "chinook",
    schema: "public",
    tables: "album,artist",
  },
});
console.log(response);
PUT _connector/my-connector-id/_configuration
{
  "values": {
    "host": "127.0.0.1",
    "port": 5432,
    "username": "myuser",
    "password": "mypassword",
    "database": "chinook",
    "schema": "public",
    "tables": "album,artist"
  }
}

配置详细信息特定于连接器类型。密钥和值会因您连接的第三方数据源而异。有关这些配置详细信息,请参阅各个 connectors-references,连接器参考。

同步数据

编辑

在本教程中,我们使用的是自托管连接器。若要将这些 API 与 Elastic 托管的连接器一起使用,则需要额外设置 API 密钥。有关详细信息,请参阅管理 API 密钥

现在,我们准备将 PostgreSQL 数据同步到 Elasticsearch。运行以下 API 调用以启动完整同步作业

resp = client.perform_request(
    "POST",
    "/_connector/_sync_job",
    headers={"Content-Type": "application/json"},
    body={
        "id": "my-connector-id",
        "job_type": "full"
    },
)
print(resp)
const response = await client.transport.request({
  method: "POST",
  path: "/_connector/_sync_job",
  body: {
    id: "my-connector-id",
    job_type: "full",
  },
});
console.log(response);
POST _connector/_sync_job
{
    "id": "my-connector-id",
    "job_type": "full"
}

为了在 Elasticsearch 中存储数据,连接器需要创建一个索引。在创建连接器时,我们指定了 music 索引。连接器将在启动同步作业之前创建并配置此 Elasticsearch 索引。

在我们此处使用的方法中,连接器将使用动态映射来自动推断字段的数据类型。在实际场景中,您可以使用 Elasticsearch 创建索引 API 来首先创建具有所需字段映射和索引设置的索引。预先定义自己的映射可以更好地控制数据的索引方式。

检查同步状态
编辑

使用获取同步作业 API来跟踪同步作业的状态和进度。默认情况下,最新的作业状态会最先返回。运行以下 API 调用以检查同步作业的状态

resp = client.perform_request(
    "GET",
    "/_connector/_sync_job",
    params={
        "connector_id": "my-connector-id",
        "size": "1"
    },
)
print(resp)
const response = await client.transport.request({
  method: "GET",
  path: "/_connector/_sync_job",
  querystring: {
    connector_id: "my-connector-id",
    size: "1",
  },
});
console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1

随着同步的进行,作业文档将更新,您可以根据需要经常检查它以轮询更新。

作业完成后,状态应为 completedindexed_document_count 应为 622

使用以下 API 调用验证 music 索引中是否存在数据

resp = client.count(
    index="music",
)
print(resp)
const response = await client.count({
  index: "music",
});
console.log(response);
GET music/_count

Elasticsearch 将数据存储在文档中,这些文档是 JSON 对象。使用以下 API 调用列出单个文档

resp = client.search(
    index="music",
)
print(resp)
const response = await client.search({
  index: "music",
});
console.log(response);
GET music/_search

故障排除

编辑

使用以下命令检查最新同步作业的状态

resp = client.perform_request(
    "GET",
    "/_connector/_sync_job",
    params={
        "connector_id": "my-connector-id",
        "size": "1"
    },
)
print(resp)
const response = await client.transport.request({
  method: "GET",
  path: "/_connector/_sync_job",
  querystring: {
    connector_id: "my-connector-id",
    size: "1",
  },
});
console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1

如果连接器在同步期间遇到任何错误,您可以在 error 字段中找到这些错误。

清理

编辑

要删除连接器及其关联的同步作业,请运行此命令

resp = client.connector.delete(
    connector_id="my-connector-id&delete_sync_jobs=true",
)
print(resp)
const response = await client.connector.delete({
  connector_id: "my-connector-id&delete_sync_jobs=true",
});
console.log(response);
DELETE _connector/my-connector-id&delete_sync_jobs=true

这不会删除连接器创建的用于存储数据的 Elasticsearch 索引。运行以下命令删除 music 索引

resp = client.indices.delete(
    index="music",
)
print(resp)
const response = await client.indices.delete({
  index: "music",
});
console.log(response);
DELETE music

要删除 PostgreSQL 容器,请运行以下命令

docker stop postgres
docker rm postgres

要删除连接器服务,请运行以下命令

docker stop <container_id>
docker rm <container_id>

后续步骤

编辑

恭喜!您已使用连接器 API 成功设置了自托管连接器。

以下是一些后续步骤,可供您探索

  • 了解有关连接器 API的更多信息。
  • 了解如何使用 Docker Compose 在我们的快速入门指南中部署 Elasticsearch、Kibana 和连接器服务。