连接器 API 教程

编辑

了解如何使用Elasticsearch 连接器 API设置自管理连接器。

在此示例中,我们将使用 connectors-postgresql,PostgreSQL 连接器将数据从 PostgreSQL 数据库同步到 Elasticsearch。我们将使用 Docker 启动一个简单的 PostgreSQL 实例,其中包含一些示例数据,创建一个连接器,并将数据同步到 Elasticsearch。您可以按照相同的步骤为其他数据源设置连接器。

本教程重点介绍在您自己的基础设施上运行自管理连接器,以及使用连接器 API 管理同步。请参阅连接器以概述连接器的工作原理。

如果您刚开始使用 Elasticsearch,本教程可能有点高级。请参阅快速入门,以获取更适合初学者的 Elasticsearch 入门介绍。

如果您刚开始使用连接器,您可能希望首先在 UI 中开始。我们有两个教程专注于使用 UI 管理连接器

先决条件

编辑
  • 您应该熟悉连接器,连接器的工作原理,以了解 API 调用与整体连接器设置的关系。
  • 您需要安装Docker Desktop
  • 您需要运行 Elasticsearch,并拥有一个访问它的 API 密钥。如果您还没有 Elasticsearch 部署,请参阅下一节了解详细信息。

设置 Elasticsearch

编辑

如果您已经在 Elastic Cloud 上有 Elasticsearch 部署(托管部署无服务器项目),那么您就可以开始了。为了在 Docker 中以本地开发模式启动 Elasticsearch 以进行测试,请打开下面的可折叠部分。

在 Docker 中运行本地 Elasticsearch
docker run -p 9200:9200 -d --name elasticsearch \
  -e "discovery.type=single-node" \
  -e "xpack.security.enabled=false" \
  -e "xpack.security.http.ssl.enabled=false" \
  -e "xpack.license.self_generated.type=trial" \
  docker.elastic.co/elasticsearch/elasticsearch:8.16.0

此 Elasticsearch 设置仅用于开发目的。切勿在生产环境中使用此配置。请参阅设置 Elasticsearch以获取生产级安装说明,包括 Docker。

我们将使用默认密码changeme用于elastic用户。对于生产环境,始终确保您的集群在启用安全性的情况下运行。

export ELASTIC_PASSWORD="changeme"

由于我们在本地运行集群时禁用了安全性,因此我们不会使用 API 密钥对 Elasticsearch 进行身份验证。相反,在每个 cURL 请求中,我们将使用-u标志进行身份验证。

让我们测试一下我们是否可以访问 Elasticsearch

curl -s -X GET -u elastic:$ELASTIC_PASSWORD https://127.0.0.1:9200

注意:当 Elasticsearch 在本地运行时,您需要将用户名和密码传递给连接器服务的配置文件,以对 Elasticsearch 进行身份验证。

在 Docker 中运行 PostgreSQL 实例(可选)

编辑

在本教程中,我们将使用 Docker 设置一个 PostgreSQL 实例,其中包含一些示例数据。当然,如果您有自己的 PostgreSQL 实例,您可以跳过此步骤并使用您自己的现有 PostgreSQL 实例。请记住,使用不同的实例可能需要调整后续步骤中描述的连接器配置。

展开在 Docker 中运行简单的 PostgreSQL 实例并导入示例数据

让我们启动一个 PostgreSQL 容器,其中包含一个用户和密码,在端口5432上公开

docker run --name postgres -e POSTGRES_USER=myuser -e POSTGRES_PASSWORD=mypassword -p 5432:5432 -d postgres

下载并导入示例数据

接下来,我们需要创建一个目录来存储本教程的示例数据集。在您的终端中,运行以下命令

mkdir -p ~/data

我们将使用Chinook 数据集示例数据。

运行以下命令将文件下载到~/data目录

curl -L https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_PostgreSql.sql -o ~/data/Chinook_PostgreSql.sql

现在我们需要将示例数据导入 PostgreSQL 容器并创建表。

运行以下 Docker 命令将我们的示例数据复制到容器中并执行psql脚本

docker cp ~/data/Chinook_PostgreSql.sql postgres:/
docker exec -it postgres psql -U myuser -f /Chinook_PostgreSql.sql

让我们验证表是否已在chinook数据库中正确创建

docker exec -it postgres psql -U myuser -d chinook -c "\dt"

album表应包含347个条目,而artist表应包含275个条目。

本教程使用非常基本的设置。要使用高级功能(如过滤规则和增量同步),请在您的 PostgreSQL 数据库上启用track_commit_timestamp。请参阅postgresql-connector-client-tutorial了解更多详细信息。

现在是真正有趣的时候了!我们将设置一个连接器,以便在 Elasticsearch 中创建我们 PostgreSQL 数据的可搜索镜像。

创建连接器

编辑

我们将使用创建连接器 API来创建 PostgreSQL 连接器实例。

运行以下 API 调用,使用Dev Tools 控制台curl

resp = client.connector.put(
    connector_id="my-connector-id",
    name="Music catalog",
    index_name="music",
    service_type="postgresql",
)
print(resp)
const response = await client.connector.put({
  connector_id: "my-connector-id",
  name: "Music catalog",
  index_name: "music",
  service_type: "postgresql",
});
console.log(response);
PUT _connector/my-connector-id
{
  "name": "Music catalog",
  "index_name":  "music",
  "service_type": "postgresql"
}

service_type指的是您要连接到的第三方数据源。

请注意,我们指定了my-connector-idID作为PUT请求的一部分。我们需要连接器 ID 来在本地设置和运行连接器服务。

如果您希望使用自动生成的 ID,请将PUT _connector/my-connector-id替换为POST _connector

运行连接器服务

编辑

如果您使用的是我们托管的 Elastic 托管连接器,则连接器服务会在 Elastic Cloud 中自动运行。因为我们正在运行自管理连接器,所以我们需要在本地启动此服务。

现在我们将运行连接器服务,以便我们可以开始将数据从我们的 PostgreSQL 实例同步到 Elasticsearch。我们将使用连接器-从 Docker 运行中概述的步骤。

在您自己的基础设施上运行连接器服务时,您需要提供一个包含以下详细信息的配置文件

  • 您的 Elasticsearch 端点(elasticsearch.host
  • Elasticsearch API 密钥(elasticsearch.api_key
  • 您的第三方数据源类型(service_type
  • 您的连接器 ID(connector_id
创建 API 密钥
编辑

如果您还没有创建 API 密钥来访问 Elasticsearch,则可以使用_security/api_key端点。

在这里,我们假设您的目标 Elasticsearch 索引名称为music。如果您使用不同的索引名称,请相应调整请求主体。

resp = client.security.create_api_key(
    name="music-connector",
    role_descriptors={
        "music-connector-role": {
            "cluster": [
                "monitor",
                "manage_connector"
            ],
            "indices": [
                {
                    "names": [
                        "music",
                        ".search-acl-filter-music",
                        ".elastic-connectors*"
                    ],
                    "privileges": [
                        "all"
                    ],
                    "allow_restricted_indices": False
                }
            ]
        }
    },
)
print(resp)
const response = await client.security.createApiKey({
  name: "music-connector",
  role_descriptors: {
    "music-connector-role": {
      cluster: ["monitor", "manage_connector"],
      indices: [
        {
          names: ["music", ".search-acl-filter-music", ".elastic-connectors*"],
          privileges: ["all"],
          allow_restricted_indices: false,
        },
      ],
    },
  },
});
console.log(response);
POST /_security/api_key
{
  "name": "music-connector",
  "role_descriptors": {
    "music-connector-role": {
      "cluster": [
        "monitor",
        "manage_connector"
      ],
      "indices": [
        {
          "names": [
            "music",
            ".search-acl-filter-music",
            ".elastic-connectors*"
          ],
          "privileges": [
            "all"
          ],
          "allow_restricted_indices": false
        }
      ]
    }
  }
}

您需要使用响应中的encoded值作为配置文件中的elasticsearch.api_key

您也可以在 Kibana 和无服务器 UI 中创建 API 密钥。

准备配置文件
编辑

让我们创建一个目录和一个config.yml文件来存储连接器配置

mkdir -p ~/connectors-config
touch ~/connectors-config/config.yml

现在,让我们将我们的连接器详细信息添加到配置文件中。打开config.yml并粘贴以下配置,将占位符替换为您自己的值

elasticsearch.host: <ELASTICSEARCH_ENDPOINT> # Your Elasticsearch endpoint
elasticsearch.api_key: <ELASTICSEARCH_API_KEY> # Your Elasticsearch API key

connectors:
  - connector_id: "my-connector-id"
    service_type: "postgresql"

我们在elastic/connectors存储库中提供了一个示例配置文件以供参考。

运行连接器服务
编辑

现在我们已经设置了配置文件,我们可以在本地运行连接器服务。这将使您的连接器实例指向您的 Elasticsearch 部署。

运行以下 Docker 命令以启动连接器服务

docker run \
-v "$HOME/connectors-config:/config" \
--rm \
--tty -i \
--network host \
docker.elastic.co/integrations/elastic-connectors:8.16.0.0 \
/app/bin/elastic-ingest \
-c /config/config.yml

通过获取连接器状态(应为needs_configuration)和last_seen字段(请注意,时间以 UTC 报告)来验证您的连接器是否已连接。last_seen字段表示连接器已成功连接到 Elasticsearch。

resp = client.connector.get(
    connector_id="my-connector-id",
)
print(resp)
const response = await client.connector.get({
  connector_id: "my-connector-id",
});
console.log(response);
GET _connector/my-connector-id

配置连接器

编辑

现在我们的连接器实例已启动并正在运行,但它还不知道从哪里同步数据。拼图的最后一块是使用有关我们的 PostgreSQL 实例的详细信息配置我们的连接器。在 Elastic Cloud 或无服务器 UI 中设置连接器时,系统会提示您在用户界面中添加这些详细信息。

但是,由于本教程完全是关于以编程方式使用连接器,因此我们将使用更新连接器配置 API来添加我们的配置详细信息。

在配置连接器之前,请确保服务已注册配置模式。对于 Elastic 托管连接器,这会在通过 API 创建后不久发生。对于自管理连接器,模式会在服务启动时(一旦填充了config.yml)注册。

仅在模式注册后才能通过 API 更新配置。通过检查GET _connector/my-connector-id请求返回的配置属性来验证这一点。它应该是非空的。

运行以下 API 调用以使用我们的connectors-postgresql-client-configuration,PostgreSQL配置详细信息配置连接器

resp = client.connector.update_configuration(
    connector_id="my-connector-id",
    values={
        "host": "127.0.0.1",
        "port": 5432,
        "username": "myuser",
        "password": "mypassword",
        "database": "chinook",
        "schema": "public",
        "tables": "album,artist"
    },
)
print(resp)
const response = await client.connector.updateConfiguration({
  connector_id: "my-connector-id",
  values: {
    host: "127.0.0.1",
    port: 5432,
    username: "myuser",
    password: "mypassword",
    database: "chinook",
    schema: "public",
    tables: "album,artist",
  },
});
console.log(response);
PUT _connector/my-connector-id/_configuration
{
  "values": {
    "host": "127.0.0.1",
    "port": 5432,
    "username": "myuser",
    "password": "mypassword",
    "database": "chinook",
    "schema": "public",
    "tables": "album,artist"
  }
}

配置详情特定于连接器类型。密钥和值将根据您连接到的第三方数据源而有所不同。请参阅各个连接器参考,以了解这些配置详情。

同步数据

编辑

本教程中我们使用的是自管理连接器。要将这些 API 与 Elastic 托管连接器一起使用,需要进行一些额外的 API 密钥设置。有关详细信息,请参阅管理 API 密钥

现在,我们准备将 PostgreSQL 数据同步到 Elasticsearch。运行以下 API 调用以启动完整同步作业。

resp = client.perform_request(
    "POST",
    "/_connector/_sync_job",
    headers={"Content-Type": "application/json"},
    body={
        "id": "my-connector-id",
        "job_type": "full"
    },
)
print(resp)
const response = await client.transport.request({
  method: "POST",
  path: "/_connector/_sync_job",
  body: {
    id: "my-connector-id",
    job_type: "full",
  },
});
console.log(response);
POST _connector/_sync_job
{
    "id": "my-connector-id",
    "job_type": "full"
}

为了将数据存储在 Elasticsearch 中,连接器需要创建一个索引。在创建连接器时,我们指定了music索引。连接器将在启动同步作业之前创建并配置此 Elasticsearch 索引。

在我们这里使用的这种方法中,连接器将使用动态映射来自动推断字段的数据类型。在实际场景中,您将使用 Elasticsearch 的创建索引 API首先创建具有所需字段映射和索引设置的索引。预先定义自己的映射可以让您更好地控制数据的索引方式。

检查同步状态
编辑

使用获取同步作业 API跟踪同步作业的状态和进度。默认情况下,最近的作业状态将首先返回。运行以下 API 调用以检查同步作业的状态。

resp = client.perform_request(
    "GET",
    "/_connector/_sync_job",
    params={
        "connector_id": "my-connector-id",
        "size": "1"
    },
)
print(resp)
const response = await client.transport.request({
  method: "GET",
  path: "/_connector/_sync_job",
  querystring: {
    connector_id: "my-connector-id",
    size: "1",
  },
});
console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1

随着同步的进行,作业文档将更新,您可以随时检查它以轮询更新。

作业完成后,状态应为completedindexed_document_count应为622

使用以下 API 调用验证数据是否存在于music索引中。

resp = client.count(
    index="music",
)
print(resp)
const response = await client.count({
  index: "music",
});
console.log(response);
GET music/_count

Elasticsearch 将数据存储在文档中,文档是 JSON 对象。使用以下 API 调用列出各个文档。

resp = client.search(
    index="music",
)
print(resp)
const response = await client.search({
  index: "music",
});
console.log(response);
GET music/_search

故障排除

编辑

使用以下命令检查最新同步作业的状态。

resp = client.perform_request(
    "GET",
    "/_connector/_sync_job",
    params={
        "connector_id": "my-connector-id",
        "size": "1"
    },
)
print(resp)
const response = await client.transport.request({
  method: "GET",
  path: "/_connector/_sync_job",
  querystring: {
    connector_id: "my-connector-id",
    size: "1",
  },
});
console.log(response);
GET _connector/_sync_job?connector_id=my-connector-id&size=1

如果连接器在同步过程中遇到任何错误,您将在error字段中找到这些错误。

清理

编辑

要删除连接器及其关联的同步作业,请运行以下命令。

resp = client.connector.delete(
    connector_id="my-connector-id&delete_sync_jobs=true",
)
print(resp)
const response = await client.connector.delete({
  connector_id: "my-connector-id&delete_sync_jobs=true",
});
console.log(response);
DELETE _connector/my-connector-id&delete_sync_jobs=true

这不会删除连接器创建的用于存储数据的 Elasticsearch 索引。通过运行以下命令删除music索引。

resp = client.indices.delete(
    index="music",
)
print(resp)
const response = await client.indices.delete({
  index: "music",
});
console.log(response);
DELETE music

要删除 PostgreSQL 容器,请运行以下命令。

docker stop postgres
docker rm postgres

要删除连接器服务,请运行以下命令。

docker stop <container_id>
docker rm <container_id>

后续步骤

编辑

恭喜!您已成功使用连接器 API 设置了自管理连接器。

以下是一些可供探索的后续步骤。