配置编辑

此页面包含有关 Python Elasticsearch 客户端最重要的配置选项的信息。

TLS/SSL编辑

本节中的选项仅在节点配置为 HTTPS 时才可以使用。如果将这些选项与 HTTP 节点一起使用,则会引发错误。

验证服务器证书编辑

验证集群证书的典型方法是通过“CA 捆绑包”,可以通过 ca_certs 参数指定。如果未给出任何选项并且安装了 certifi 包,则默认情况下使用 certifi 的 CA 捆绑包。

如果您有自己的 CA 捆绑包要使用,可以通过 ca_certs 参数进行配置

client = Elasticsearch(
    "https://...",
    ca_certs="/path/to/certs.pem"
)

如果使用生成的证书或具有已知指纹的证书,则可以使用 ssl_assert_fingerprint 指定指纹,该指纹尝试在 TLS 握手期间与服务器的叶证书匹配。如果存在任何匹配的证书,则连接将被验证,否则将引发 TlsError

在 Python 3.9 及更早版本中,只会验证叶证书,但在 Python 3.10+ 中,将使用私有 API 来验证证书链中的任何证书。这在使用在多节点集群上生成的证书时很有帮助。

client = Elasticsearch(
    "https://...",
    ssl_assert_fingerprint=(
        "315f5bdb76d078c43b8ac0064e4a0164612b1fce77c869345bfc94c75894edd3"
    )
)

要禁用证书验证,请使用 verify_certs=False 参数。此选项应在生产环境中避免,而应使用其他选项来验证集群的证书。

client = Elasticsearch(
    "https://...",
    verify_certs=False
)

TLS 版本编辑

通过 ssl_version 参数配置要连接到的最小 TLS 版本。默认情况下,这设置为 TLSv1.2 的最小值。使用 ssl.TLSVersion 枚举来指定版本。

import ssl

client = Elasticsearch(
    ...,
    ssl_version=ssl.TLSVersion.TLSv1_2
)

客户端 TLS 证书身份验证编辑

Elasticsearch 可以配置为通过 TLS 客户端证书对客户端进行身份验证。可以通过 client_certclient_key 参数配置客户端证书和密钥

client = Elasticsearch(
    ...,
    client_cert="/path/to/cert.pem",
    client_key="/path/to/key.pem",
)

使用 SSLContext编辑

对于高级用户,可以使用 ssl.SSLContext 对象通过 ssl_context 参数配置 TLS。 ssl_context 参数不能与除 ssl_assert_fingerprint 参数之外的任何其他 TLS 选项组合使用。

import ssl

# Create and configure an SSLContext
ctx = ssl.create_default_context()
ctx.load_verify_locations(...)

client = Elasticsearch(
    ...,
    ssl_context=ctx
)

HTTP 压缩编辑

可以使用 http_compress 参数启用 HTTP 请求和响应主体压缩。如果启用,则 HTTP 请求主体将使用 gzip 压缩,并且 HTTP 响应将包含 Accept-Encoding: gzip HTTP 标头。默认情况下,压缩被禁用。

client = Elasticsearch(
    ...,
    http_compress=True  # Enable compression!
)

建议在请求遍历网络时启用 HTTP 压缩。连接到 Elastic Cloud 时会自动启用压缩。

请求超时编辑

可以配置请求,如果服务时间过长,则会超时。可以通过客户端构造函数或客户端 .options() 方法传递 request_timeout 参数。当请求超时时,节点将引发 ConnectionTimeout 异常,这可能会触发重试。

request_timeout 设置为 None 将禁用超时。

client = Elasticsearch(
    ...,
    request_timeout=10  # 10 second timeout
)

# Search request will timeout in 5 seconds
client.options(request_timeout=5).search(...)

API 和服务器超时编辑

在进行请求时,需要考虑 API 级别的超时,这可能会导致请求在服务器端而不是客户端端超时。对于长时间运行的操作,您可能需要配置传输和 API 级别的超时。

在下面的示例中,cluster.health API 有三个不同的可配置超时,它们对请求具有不同的含义

client.options(
    # Amount of time to wait for an HTTP response to start.
    request_timeout=30
).cluster.health(
    # Amount of time to wait to collect info on all nodes.
    timeout=30,
    # Amount of time to wait for info from the master node.
    master_timeout=10,
)

重试编辑

如果请求未返回成功响应,则可以重试。这提供了一种方法,使请求能够抵御瞬态故障或过载节点。

可以通过 max_retries 参数配置每个请求的最大重试次数。将此参数设置为 0 将禁用重试。此参数可以在客户端构造函数中设置,也可以通过客户端 .options() 方法在每个请求中设置

client = Elasticsearch(
    ...,
    max_retries=5
)

# For this API request we disable retries with 'max_retries=0'
client.options(max_retries=0).index(
    index="blogs",
    document={
        "title": "..."
    }
)

在连接错误和超时时重试编辑

如果启用了重试,则会自动重试连接错误。可以通过 retry_on_timeout 参数启用或禁用在连接超时时重试请求。此参数可以在客户端构造函数中设置,也可以通过客户端 .options() 方法设置

client = Elasticsearch(
    ...,
    retry_on_timeout=True
)
client.options(retry_on_timeout=False).info()

重试状态代码编辑

默认情况下,如果启用了重试,则 retry_on_status 设置为 (429, 502, 503, 504)。此参数可以在客户端构造函数中设置,也可以通过客户端 .options() 方法设置。将此值设置为 () 将禁用默认行为。

client = Elasticsearch(
    ...,
    retry_on_status=()
)

# Retry this API on '500 Internal Error' statuses
client.options(retry_on_status=[500]).index(
    index="blogs",
    document={
        "title": "..."
    }
)

忽略状态代码编辑

默认情况下,对于任何非 2XX HTTP 请求,如果耗尽了重试(如果有),则会引发 ApiError 异常。如果您期望 API 返回 HTTP 错误,但不想引发异常,则可以使用客户端 .options() 方法中的 ignore_status 参数。

这在以稳健的方式设置或清理集群中的资源时很有用

client = Elasticsearch(...)

# API request is robust against the index not existing:
resp = client.options(ignore_status=404).indices.delete(index="delete-this")
resp.meta.status  # Can be either '2XX' or '404'

# API request is robust against the index already existing:
resp = client.options(ignore_status=[400]).indices.create(
    index="create-this",
    mapping={
        "properties": {"field": {"type": "integer"}}
    }
)
resp.meta.status  # Can be either '2XX' or '400'

使用 ignore_status 参数时,错误响应将像非错误响应一样被序列化返回。在这些情况下,检查响应的 HTTP 状态可能很有用。为此,您可以检查 resp.meta.status

嗅探新节点编辑

可以通过称为“嗅探”的过程发现其他节点,在该过程中,客户端将查询集群以获取更多可以处理请求的节点。

嗅探可以在三个不同的时间发生:在客户端实例化时、在请求之前以及在节点故障时。这三种行为可以通过 sniff_on_startsniff_before_requestssniff_on_node_failure 参数启用和禁用。

使用 HTTP 负载均衡器或代理时,不能使用嗅探功能,因为集群会向客户端提供 IP 地址以直接连接到集群,从而绕过负载均衡器。根据您的配置,这可能是您不希望做的事情,或者完全破坏它。

在嗅探尝试之间等待编辑

为了避免不必要地过频繁地嗅探,在尝试发现新节点之间会有延迟。可以通过 min_delay_between_sniffing 参数控制此值。

过滤被嗅探的节点编辑

默认情况下,仅标记为 master 角色的节点将不会被使用。要更改行为,可以使用参数 sniffed_node_callback。要标记一个嗅探到的节点,不要将其添加到节点池中,请从 sniffed_node_callback 返回 None,否则返回 NodeConfig 实例。

from typing import Optional, Dict, Any
from elastic_transport import NodeConfig
from elasticsearch import Elasticsearch

def filter_master_eligible_nodes(
    node_info: Dict[str, Any],
    node_config: NodeConfig
) -> Optional[NodeConfig]:
    # This callback ignores all nodes that are master eligible
    # instead of master-only nodes (default behavior)
    if "master" in node_info.get("roles", ()):
        return None
    return node_config

client = Elasticsearch(
    "https://127.0.0.1:9200",
    sniffed_node_callback=filter_master_eligible_nodes
)

node_info 参数是 nodes.info() API 响应的一部分,下面是一个示例,说明该对象的外观

{
  "name": "SRZpKFZ",
  "transport_address": "127.0.0.1:9300",
  "host": "127.0.0.1",
  "ip": "127.0.0.1",
  "version": "5.0.0",
  "build_hash": "253032b",
  "roles": ["master", "data", "ingest"],
  "http": {
    "bound_address": ["[fe80::1]:9200", "[::1]:9200", "127.0.0.1:9200"],
    "publish_address": "1.1.1.1:123",
    "max_content_length_in_bytes": 104857600
  }
}

节点池编辑

从池中选择节点编辑

您可以通过 node_selector_class 参数指定节点选择器模式。支持的值是 round_robinrandom。默认值为 round_robin

client = Elasticsearch(
    ...,
    node_selector_class="round_robin"
)

也支持自定义选择器

from elastic_transport import NodeSelector

class CustomSelector(NodeSelector):
    def select(nodes): ...

client = Elasticsearch(
    ...,
    node_selector_class=CustomSelector
)

标记节点为死节点和活节点编辑

Elasticsearch 的单个节点可能存在瞬态连接或负载问题,这可能会导致它们无法服务请求。为了解决这个问题,节点池将检测何时节点由于传输或 API 错误而无法服务请求。

节点超时后将被移回“活动”节点集,但只有在节点返回成功响应后,才会在连续错误方面将节点标记为“活动”。

可以使用 dead_node_backoff_factormax_dead_node_backoff 参数配置节点池将节点置于超时状态的持续时间,每次连续失败都会增加超时时间。这两个参数都使用秒为单位。

计算公式为 min(dead_node_backoff_factor * (2 ** (consecutive_failures - 1)), max_dead_node_backoff)

序列化器编辑

序列化器将网络上的字节转换为原生 Python 对象,反之亦然。默认情况下,客户端附带了针对 application/jsonapplication/x-ndjsontext/*application/mapbox-vector-tile 的序列化器。

您可以通过 serializers 参数定义自定义序列化器。

from elasticsearch import Elasticsearch, JsonSerializer

class JsonSetSerializer(JsonSerializer):
    """Custom JSON serializer that handles Python sets"""
    def default(self, data: Any) -> Any:
        if isinstance(data, set):
            return list(data)
        return super().default(data)

client = Elasticsearch(
    ...,
    # Serializers are a mapping of 'mimetype' to Serializer class.
    serializers={"application/json": JsonSetSerializer()}
)

如果安装了 orjson 包,您可以使用更快的 ``OrjsonSerializer`` 来处理默认的 MIME 类型 (``application/json``)

from elasticsearch import Elasticsearch, OrjsonSerializer

es = Elasticsearch(
    ...,
    serializer=OrjsonSerializer()
)

orjson 在序列化向量时特别快,因为它具有本机 numpy 支持。这将在未来的版本中成为默认设置。请注意,您可以使用 orjson 额外参数安装 orjson。

$ python -m pip install elasticsearch[orjson]

节点编辑

节点实现编辑

同步 I/O 的默认节点类是 urllib3,异步 I/O 的默认节点类是 aiohttp

对于所有内置的 HTTP 节点实现,例如 urllib3requestsaiohttp,您可以使用简单的字符串指定 node_class 参数。

from elasticsearch import Elasticsearch

client = Elasticsearch(
    ...,
    node_class="requests"
)

您还可以通过 node_class 参数指定自定义节点实现。

from elasticsearch import Elasticsearch
from elastic_transport import Urllib3HttpNode

class CustomHttpNode(Urllib3HttpNode):
    ...

client = Elasticsearch(
    ...
    node_class=CustomHttpNode
)

每个节点的 HTTP 连接编辑

每个节点都包含自己的 HTTP 连接池,以允许并发请求。此值可以通过 connections_per_node 参数进行配置。

client = Elasticsearch(
    ...,
    connections_per_node=5
)