Elastic PostgreSQL 连接器参考
编辑Elastic PostgreSQL 连接器参考
编辑Elastic PostgreSQL 连接器是用于 PostgreSQL 的连接器。此连接器使用 Elastic 连接器框架 以 Python 编写。
此连接器使用 通用数据库连接器源代码(分支 8.16,与 Elastic 8.16 兼容)。查看此连接器的特定 源代码(分支 8.16,与 Elastic 8.16 兼容)。
Elastic 托管连接器(Elastic Cloud)
编辑查看 Elastic 托管连接器 参考
可用性和先决条件
编辑此连接器在 Elastic 版本 8.8.0 及更高版本 中作为 Elastic 托管连接器 提供。要在 Elastic Cloud 中原生使用此连接器,请满足所有 Elastic 托管连接器要求。
创建 PostgreSQL 连接器
编辑使用 UI
编辑要创建一个新的 PostgreSQL 连接器
- 在 Kibana UI 中,从主菜单导航到 搜索 → 内容 → 连接器 页面,或使用 全局搜索字段。
- 按照说明创建新的原生 PostgreSQL 连接器。
有关其他操作,请参阅 Kibana 中的连接器 UI。
使用 API
编辑您可以使用 Elasticsearch 创建连接器 API 创建新的原生 PostgreSQL 连接器。
例如
resp = client.connector.put( connector_id="my-{service-name-stub}-connector", index_name="my-elasticsearch-index", name="Content synced from {service-name}", service_type="{service-name-stub}", is_native=True, ) print(resp)
const response = await client.connector.put({ connector_id: "my-{service-name-stub}-connector", index_name: "my-elasticsearch-index", name: "Content synced from {service-name}", service_type: "{service-name-stub}", is_native: true, }); console.log(response);
PUT _connector/my-postgresql-connector { "index_name": "my-elasticsearch-index", "name": "Content synced from PostgreSQL", "service_type": "postgresql", "is_native": true }
您还需要 为连接器创建一个 API 密钥。
用户需要集群权限 manage_api_key
、manage_connector
和 write_connector_secrets
以编程方式生成 API 密钥。
要为连接器创建 API 密钥
-
运行以下命令,替换指示的值。请注意响应中返回的
id
和encoded
值resp = client.security.create_api_key( name="my-connector-api-key", role_descriptors={ "my-connector-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "my-index_name", ".search-acl-filter-my-index_name", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": False } ] } }, ) print(resp)
const response = await client.security.createApiKey({ name: "my-connector-api-key", role_descriptors: { "my-connector-connector-role": { cluster: ["monitor", "manage_connector"], indices: [ { names: [ "my-index_name", ".search-acl-filter-my-index_name", ".elastic-connectors*", ], privileges: ["all"], allow_restricted_indices: false, }, ], }, }, }); console.log(response);
POST /_security/api_key { "name": "my-connector-api-key", "role_descriptors": { "my-connector-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "my-index_name", ".search-acl-filter-my-index_name", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": false } ] } } }
-
使用
encoded
值存储连接器密钥,并记下此响应返回的id
值resp = client.connector.secret_post( body={ "value": "encoded_api_key" }, ) print(resp)
const response = await client.connector.secretPost({ body: { value: "encoded_api_key", }, }); console.log(response);
POST _connector/_secret { "value": "encoded_api_key" }
-
使用 API 密钥
id
和连接器密钥id
更新连接器resp = client.connector.update_api_key_id( connector_id="my_connector_id>", api_key_id="API key_id", api_key_secret_id="secret_id", ) print(resp)
const response = await client.connector.updateApiKeyId({ connector_id: "my_connector_id>", api_key_id: "API key_id", api_key_secret_id: "secret_id", }); console.log(response);
PUT /_connector/my_connector_id>/_api_key_id { "api_key_id": "API key_id", "api_key_secret_id": "secret_id" }
有关所有可用连接器 API 的详细信息,请参阅 Elasticsearch API 文档。
用法
编辑要将此连接器用作 Elastic 托管连接器,请使用 连接器 工作流。请参阅 Elastic 托管连接器。
用户必须将 track_commit_timestamp
设置为 on
。为此,请在 PostgreSQL 服务器中运行 ALTER SYSTEM SET track_commit_timestamp = on;
。
有关其他操作,请参阅 <←esconnectors-usage>>。
有关连接器客户端工作流的端到端示例,请参阅 教程。
兼容性
编辑PostgreSQL 版本 11 到 15 与 Elastic 连接器兼容。
配置
编辑设置以下配置字段
- 主机
-
托管 PostgreSQL 实例的服务器主机地址。示例
-
192.158.1.38
-
demo.instance.demo-region.demo.service.com
-
- 端口
-
托管 PostgreSQL 实例的端口。示例
-
5432
(默认)
-
- 用户名
- PostgreSQL 帐户的用户名。
- 密码
- PostgreSQL 帐户的密码。
- 数据库
-
PostgreSQL 数据库的名称。示例
-
employee_database
-
customer_database
-
- 模式
- PostgreSQL 数据库的模式。
- 表名列表(以逗号分隔)
-
以逗号分隔的表列表。如果值为
*
,则 PostgreSQL 连接器将获取配置数据库中所有表的数据。默认值为*
。示例-
table_1, table_2
-
*
使用高级同步规则时可以绕过此字段。
-
- 启用 SSL
- 切换以启用 SSL 验证。默认情况下禁用。
- SSL 证书
-
SSL 证书的内容。如果禁用 SSL,则将忽略
ssl_ca
值。展开 以查看证书示例
-----BEGIN CERTIFICATE----- MIID+jCCAuKgAwIBAgIGAJJMzlxLMA0GCSqGSIb3DQEBCwUAMHoxCzAJBgNVBAYT AlVTMQwwCgYDVQQKEwNJQk0xFjAUBgNVBAsTDURlZmF1bHROb2RlMDExFjAUBgNV BAsTDURlZmF1bHRDZWxsMDExGTAXBgNVBAsTEFJvb3QgQ2VydGlmaWNhdGUxEjAQ BgNVBAMTCWxvY2FsaG9zdDAeFw0yMTEyMTQyMjA3MTZaFw0yMjEyMTQyMjA3MTZa MF8xCzAJBgNVBAYTAlVTMQwwCgYDVQQKEwNJQk0xFjAUBgNVBAsTDURlZmF1bHRO b2RlMDExFjAUBgNVBAsTDURlZmF1bHRDZWxsMDExEjAQBgNVBAMTCWxvY2FsaG9z dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMv5HCsJZIpI5zCy+jXV z6lmzNc9UcVSEEHn86h6zT6pxuY90TYeAhlZ9hZ+SCKn4OQ4GoDRZhLPTkYDt+wW CV3NTIy9uCGUSJ6xjCKoxClJmgSQdg5m4HzwfY4ofoEZ5iZQ0Zmt62jGRWc0zuxj hegnM+eO2reBJYu6Ypa9RPJdYJsmn1RNnC74IDY8Y95qn+WZj//UALCpYfX41hko i7TWD9GKQO8SBmAxhjCDifOxVBokoxYrNdzESl0LXvnzEadeZTd9BfUtTaBHhx6t njqqCPrbTY+3jAbZFd4RiERPnhLVKMytw5ot506BhPrUtpr2lusbN5svNXjuLeea MMUCAwEAAaOBoDCBnTATBgNVHSMEDDAKgAhOatpLwvJFqjAdBgNVHSUEFjAUBggr BgEFBQcDAQYIKwYBBQUHAwIwVAYDVR0RBE0wS4E+UHJvZmlsZVVVSUQ6QXBwU3J2 MDEtQkFTRS05MDkzMzJjMC1iNmFiLTQ2OTMtYWI5NC01Mjc1ZDI1MmFmNDiCCWxv Y2FsaG9zdDARBgNVHQ4ECgQITzqhA5sO8O4wDQYJKoZIhvcNAQELBQADggEBAKR0 gY/BM69S6BDyWp5dxcpmZ9FS783FBbdUXjVtTkQno+oYURDrhCdsfTLYtqUlP4J4 CHoskP+MwJjRIoKhPVQMv14Q4VC2J9coYXnePhFjE+6MaZbTjq9WaekGrpKkMaQA iQt5b67jo7y63CZKIo9yBvs7sxODQzDn3wZwyux2vPegXSaTHR/rop/s/mPk3YTS hQprs/IVtPoWU4/TsDN3gIlrAYGbcs29CAt5q9MfzkMmKsuDkTZD0ry42VjxjAmk xw23l/k8RoD1wRWaDVbgpjwSzt+kl+vJE/ip2w3h69eEZ9wbo6scRO5lCO2JM4Pr 7RhLQyWn2u00L7/9Omw= -----END CERTIFICATE-----
文档和同步
编辑- 表必须由 PostgreSQL 用户拥有。
- 跳过未定义主键的表。
- 要获取 PostgreSQL 中的上次更新时间,必须将
track_commit_timestamp
设置为on
。否则,每次同步都将索引所有数据。
- 不会提取大于 10 MB 的文件。
- 权限不会同步。所有文档 索引到 Elastic 部署后将对 所有有权访问 该 Elastic 部署的用户可见。
同步规则
编辑基本同步规则 对所有连接器都相同,并且默认情况下可用。
高级同步规则
编辑高级同步规则生效需要 完整同步。
高级同步规则通过特定于源的 DSL JSON 代码段定义。
示例数据
编辑以下示例中将使用一些示例数据。
emp_id | name | age |
---|---|---|
3 |
John |
28 |
10 |
Jane |
35 |
14 |
Alex |
22 |
c_id | name | age |
---|---|---|
2 |
Elm |
24 |
6 |
Pine |
30 |
9 |
Oak |
34 |
高级同步规则示例
编辑[ { "tables": [ "employee" ], "query": "SELECT * FROM employee" }, { "tables": [ "customer" ], "query": "SELECT * FROM customer" } ]
在 8.15.0 中,我们在 PostgreSQL 连接器的高级同步规则中添加了一个新的可选 id_columns
字段。使用 id_columns
字段摄取没有主键的表。包含唯一字段的名称,以便连接器可以使用它们为文档生成唯一 ID。
[ { "tables": [ "employee" ], "query": "SELECT * FROM employee", "id_columns": ["emp_id"] }, { "tables": [ "customer" ], "query": "SELECT * FROM customer", "id_columns": ["c_id"] } ]
此示例使用 id_columns
字段分别为 employee
和 customer
表指定唯一字段 emp_id
和 c_id
。
[ { "tables": ["employee"], "query": "SELECT * FROM employee WHERE emp_id > 5" } ]
[ { "tables": ["employee", "customer"], "query": "SELECT * FROM employee INNER JOIN customer ON employee.emp_id = customer.c_id" } ]
使用高级规则时,查询可以绕过配置字段 tables
。如果查询指定了配置中未出现的表,则会发生这种情况。如果配置指定 *
以获取所有表,而高级同步规则仅请求部分表,则也会发生这种情况。
已知问题
编辑此连接器没有已知问题。有关所有连接器的已知问题列表,请参阅 已知问题。
故障排除
编辑请参阅 故障排除。
安全
编辑请参阅 安全。
自托管连接器
编辑查看 自托管连接器 参考
可用性和先决条件
编辑此连接器作为自托管 自托管连接器 提供。要使用此连接器,请满足所有 自托管连接器要求。
创建 PostgreSQL 连接器
编辑使用 UI
编辑要创建一个新的 PostgreSQL 连接器
- 在 Kibana UI 中,从主菜单导航到 搜索 → 内容 → 连接器 页面,或使用 全局搜索字段。
- 按照说明创建新的 PostgreSQL 自托管连接器。
使用 API
编辑您可以使用 Elasticsearch 创建连接器 API 创建新的自托管 PostgreSQL 自托管连接器。
例如
resp = client.connector.put( connector_id="my-{service-name-stub}-connector", index_name="my-elasticsearch-index", name="Content synced from {service-name}", service_type="{service-name-stub}", ) print(resp)
const response = await client.connector.put({ connector_id: "my-{service-name-stub}-connector", index_name: "my-elasticsearch-index", name: "Content synced from {service-name}", service_type: "{service-name-stub}", }); console.log(response);
PUT _connector/my-postgresql-connector { "index_name": "my-elasticsearch-index", "name": "Content synced from PostgreSQL", "service_type": "postgresql" }
您还需要 为连接器创建一个 API 密钥。
用户需要集群权限 manage_api_key
、manage_connector
和 write_connector_secrets
以编程方式生成 API 密钥。
要为连接器创建 API 密钥
-
运行以下命令,替换指示的值。请注意响应中返回的
encoded
值resp = client.security.create_api_key( name="connector_name-connector-api-key", role_descriptors={ "connector_name-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "index_name", ".search-acl-filter-index_name", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": False } ] } }, ) print(resp)
const response = await client.security.createApiKey({ name: "connector_name-connector-api-key", role_descriptors: { "connector_name-connector-role": { cluster: ["monitor", "manage_connector"], indices: [ { names: [ "index_name", ".search-acl-filter-index_name", ".elastic-connectors*", ], privileges: ["all"], allow_restricted_indices: false, }, ], }, }, }); console.log(response);
POST /_security/api_key { "name": "connector_name-connector-api-key", "role_descriptors": { "connector_name-connector-role": { "cluster": [ "monitor", "manage_connector" ], "indices": [ { "names": [ "index_name", ".search-acl-filter-index_name", ".elastic-connectors*" ], "privileges": [ "all" ], "allow_restricted_indices": false } ] } } }
- 使用 API 密钥
encoded
值更新您的config.yml
文件。
有关所有可用连接器 API 的详细信息,请参阅 Elasticsearch API 文档。
用法
编辑要将此连接器用作 自托管连接器,请参阅 自托管连接器。
用户必须将 track_commit_timestamp
设置为 on
。为此,请在 PostgreSQL 服务器中运行 ALTER SYSTEM SET track_commit_timestamp = on;
。
有关其他操作,请参阅。
有关自托管连接器工作流的端到端示例,请参阅 教程。
兼容性
编辑PostgreSQL 版本 11 到 15 与 Elastic 连接器框架兼容。
配置
编辑设置以下配置字段
-
host
-
托管 PostgreSQL 实例的服务器主机地址。示例
-
192.158.1.38
-
demo.instance.demo-region.demo.service.com
-
-
port
-
托管 PostgreSQL 实例的端口。示例
-
5432
-
9090
-
-
username
- PostgreSQL 帐户的用户名。
-
password
- PostgreSQL 帐户的密码。
-
database
-
PostgreSQL 数据库的名称。示例
-
employee_database
-
customer_database
-
-
schema
- PostgreSQL 数据库的模式。
-
tables
-
以逗号分隔的表列表。如果值为
*
,则 PostgreSQL 连接器将获取配置数据库中所有表的数据。默认值为*
。示例-
table_1, table_2
-
*
使用高级同步规则时可以绕过此字段。
-
-
ssl_enabled
- 是否启用 SSL 验证。默认值为
True
。 -
ssl_ca
-
SSL 证书的内容(如果启用了 SSL)。如果禁用了 SSL,则将忽略
ssl_ca
值。展开 以查看证书示例
-----BEGIN CERTIFICATE----- MIID+jCCAuKgAwIBAgIGAJJMzlxLMA0GCSqGSIb3DQEBCwUAMHoxCzAJBgNVBAYT AlVTMQwwCgYDVQQKEwNJQk0xFjAUBgNVBAsTDURlZmF1bHROb2RlMDExFjAUBgNV BAsTDURlZmF1bHRDZWxsMDExGTAXBgNVBAsTEFJvb3QgQ2VydGlmaWNhdGUxEjAQ BgNVBAMTCWxvY2FsaG9zdDAeFw0yMTEyMTQyMjA3MTZaFw0yMjEyMTQyMjA3MTZa MF8xCzAJBgNVBAYTAlVTMQwwCgYDVQQKEwNJQk0xFjAUBgNVBAsTDURlZmF1bHRO b2RlMDExFjAUBgNVBAsTDURlZmF1bHRDZWxsMDExEjAQBgNVBAMTCWxvY2FsaG9z dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMv5HCsJZIpI5zCy+jXV z6lmzNc9UcVSEEHn86h6zT6pxuY90TYeAhlZ9hZ+SCKn4OQ4GoDRZhLPTkYDt+wW CV3NTIy9uCGUSJ6xjCKoxClJmgSQdg5m4HzwfY4ofoEZ5iZQ0Zmt62jGRWc0zuxj hegnM+eO2reBJYu6Ypa9RPJdYJsmn1RNnC74IDY8Y95qn+WZj//UALCpYfX41hko i7TWD9GKQO8SBmAxhjCDifOxVBokoxYrNdzESl0LXvnzEadeZTd9BfUtTaBHhx6t njqqCPrbTY+3jAbZFd4RiERPnhLVKMytw5ot506BhPrUtpr2lusbN5svNXjuLeea MMUCAwEAAaOBoDCBnTATBgNVHSMEDDAKgAhOatpLwvJFqjAdBgNVHSUEFjAUBggr BgEFBQcDAQYIKwYBBQUHAwIwVAYDVR0RBE0wS4E+UHJvZmlsZVVVSUQ6QXBwU3J2 MDEtQkFTRS05MDkzMzJjMC1iNmFiLTQ2OTMtYWI5NC01Mjc1ZDI1MmFmNDiCCWxv Y2FsaG9zdDARBgNVHQ4ECgQITzqhA5sO8O4wDQYJKoZIhvcNAQELBQADggEBAKR0 gY/BM69S6BDyWp5dxcpmZ9FS783FBbdUXjVtTkQno+oYURDrhCdsfTLYtqUlP4J4 CHoskP+MwJjRIoKhPVQMv14Q4VC2J9coYXnePhFjE+6MaZbTjq9WaekGrpKkMaQA iQt5b67jo7y63CZKIo9yBvs7sxODQzDn3wZwyux2vPegXSaTHR/rop/s/mPk3YTS hQprs/IVtPoWU4/TsDN3gIlrAYGbcs29CAt5q9MfzkMmKsuDkTZD0ry42VjxjAmk xw23l/k8RoD1wRWaDVbgpjwSzt+kl+vJE/ip2w3h69eEZ9wbo6scRO5lCO2JM4Pr 7RhLQyWn2u00L7/9Omw= -----END CERTIFICATE-----
使用 Docker 部署
编辑您可以使用 Docker 将 PostgreSQL 连接器部署为自管理连接器。请按照以下说明操作。
步骤 1:下载示例配置文件
下载示例配置文件。您可以手动下载,也可以运行以下命令
curl https://raw.githubusercontent.com/elastic/connectors/main/config.yml.example --output ~/connectors-config/config.yml
请记住,如果您的目录名称不同或您想要使用不同的配置文件名,则需要更新 --output
参数值。
步骤 2:更新自管理连接器的配置文件
使用以下设置更新配置文件以匹配您的环境
-
elasticsearch.host
-
elasticsearch.api_key
-
connectors
如果您针对 Elasticsearch 和 Kibana 的 Docker 版本运行连接器服务,则您的配置文件将如下所示
# When connecting to your cloud deployment you should edit the host value elasticsearch.host: http://host.docker.internal:9200 elasticsearch.api_key: <ELASTICSEARCH_API_KEY> connectors: - connector_id: <CONNECTOR_ID_FROM_KIBANA> service_type: postgresql api_key: <CONNECTOR_API_KEY_FROM_KIBANA> # Optional. If not provided, the connector will use the elasticsearch.api_key instead
使用 elasticsearch.api_key
是推荐的身份验证方法。但是,您也可以使用 elasticsearch.username
和 elasticsearch.password
对 Elasticsearch 实例进行身份验证。
注意:您可以通过简单地取消配置文件中特定设置的注释并修改其值来更改其他默认配置。
步骤 3:运行 Docker 镜像
使用以下命令运行包含连接器服务的 Docker 镜像
docker run \ -v ~/connectors-config:/config \ --network "elastic" \ --tty \ --rm \ docker.elastic.co/integrations/elastic-connectors:8.16.0.0 \ /app/bin/elastic-ingest \ -c /config/config.yml
有关更多详细信息,请参阅 elastic/connectors
存储库中的 DOCKER.md
。
在 官方注册表中查找所有可用的 Docker 镜像。
我们还提供了一个使用 Docker Compose 的快速启动自管理选项,因此您可以一次启动所有必需的服务:Elasticsearch、Kibana 和连接器服务。有关更多信息,请参阅 elastic/connectors
存储库中的此 自述文件。
文档和同步
编辑- 表必须由 PostgreSQL 用户拥有。
- 跳过未定义主键的表。
- 要获取 PostgreSQL 中的上次更新时间,必须将
track_commit_timestamp
设置为on
。否则,每次同步都将索引所有数据。
- 不会提取大于 10 MB 的文件。
- 权限不会同步。所有文档 索引到 Elastic 部署后将对 所有有权访问 该 Elastic 部署的用户可见。
同步规则
编辑高级同步规则
编辑高级同步规则生效需要进行 //connectors-sync-types-full 全同步。
高级同步规则通过特定于源的 DSL JSON 代码段定义。
示例数据
编辑以下示例中将使用一些示例数据。
emp_id | name | age |
---|---|---|
3 |
John |
28 |
10 |
Jane |
35 |
14 |
Alex |
22 |
c_id | name | age |
---|---|---|
2 |
Elm |
24 |
6 |
Pine |
30 |
9 |
Oak |
34 |
高级同步规则示例
编辑[ { "tables": [ "employee" ], "query": "SELECT * FROM employee" }, { "tables": [ "customer" ], "query": "SELECT * FROM customer" } ]
在 8.15.0 中,我们在 PostgreSQL 连接器的高级同步规则中添加了一个新的可选 id_columns
字段。使用 id_columns
字段摄取没有主键的表。包含唯一字段的名称,以便连接器可以使用它们为文档生成唯一 ID。
[ { "tables": [ "employee" ], "query": "SELECT * FROM employee", "id_columns": ["emp_id"] }, { "tables": [ "customer" ], "query": "SELECT * FROM customer", "id_columns": ["c_id"] } ]
此示例使用 id_columns
字段分别为 employee
和 customer
表指定唯一字段 emp_id
和 c_id
。
[ { "tables": ["employee"], "query": "SELECT * FROM employee WHERE emp_id > 5" } ]
[ { "tables": ["employee", "customer"], "query": "SELECT * FROM employee INNER JOIN customer ON employee.emp_id = customer.c_id" } ]
使用高级规则时,查询可以绕过配置字段 tables
。如果查询指定了配置中未出现的表,则会发生这种情况。如果配置指定 *
以获取所有表,而高级同步规则仅请求部分表,则也会发生这种情况。
端到端测试
编辑连接器框架使操作员能够针对真实数据源运行功能测试。有关更多详细信息,请参阅 连接器测试。
要对 PostgreSQL 连接器执行 E2E 测试,请运行以下命令
$ make ftest NAME=postgresql
为了加快测试速度,请添加 DATA_SIZE=small
标志
make ftest NAME=postgresql DATA_SIZE=small
已知问题
编辑此连接器没有已知问题。有关所有连接器的已知问题列表,请参阅 已知问题。
故障排除
编辑请参阅 故障排除。
安全
编辑请参阅 安全。