配置

编辑

本页包含有关 Python Elasticsearch 客户端最重要的配置选项的信息。

TLS/SSL

编辑

本节中的选项仅当节点配置为 HTTPS 时才可以使用。如果将这些选项与 HTTP 节点一起使用,则会引发错误。

验证服务器证书

编辑

验证集群证书的典型方法是通过“CA 捆绑包”,该捆绑包可以通过 ca_certs 参数指定。如果没有提供任何选项并且安装了 certifi 包,则默认使用 certifi 的 CA 捆绑包。

如果您有自己的 CA 捆绑包要使用,可以通过 ca_certs 参数进行配置。

client = Elasticsearch(
    "https://...",
    ca_certs="/path/to/certs.pem"
)

如果使用生成的证书或具有已知指纹的证书,则可以使用 ssl_assert_fingerprint 来指定在 TLS 握手期间尝试匹配服务器叶子证书的指纹。如果有任何匹配的证书,则验证连接,否则将引发 TlsError

在 Python 3.9 及更早版本中,只会验证叶子证书,但在 Python 3.10+ 中,会使用私有 API 来验证证书链中的任何证书。这在使用在多节点集群上生成的证书时很有帮助。

client = Elasticsearch(
    "https://...",
    ssl_assert_fingerprint=(
        "315f5bdb76d078c43b8ac0064e4a0164612b1fce77c869345bfc94c75894edd3"
    )
)

要禁用证书验证,请使用 verify_certs=False 参数。在生产环境中应避免使用此选项,而是使用其他选项来验证集群的证书。

client = Elasticsearch(
    "https://...",
    verify_certs=False
)

TLS 版本

编辑

配置连接的最低 TLS 版本是通过 ssl_version 参数完成的。默认情况下,此值设置为 TLSv1.2 的最小值。使用 ssl.TLSVersion 枚举来指定版本。

import ssl

client = Elasticsearch(
    ...,
    ssl_version=ssl.TLSVersion.TLSv1_2
)

客户端 TLS 证书身份验证

编辑

可以将 Elasticsearch 配置为通过 TLS 客户端证书对客户端进行身份验证。客户端证书和密钥可以通过 client_certclient_key 参数进行配置。

client = Elasticsearch(
    ...,
    client_cert="/path/to/cert.pem",
    client_key="/path/to/key.pem",
)

使用 SSLContext

编辑

对于高级用户,可以使用 ssl.SSLContext 对象通过 ssl_context 参数配置 TLS。ssl_context 参数不能与任何其他 TLS 选项组合使用,除了 ssl_assert_fingerprint 参数。

import ssl

# Create and configure an SSLContext
ctx = ssl.create_default_context()
ctx.load_verify_locations(...)

client = Elasticsearch(
    ...,
    ssl_context=ctx
)

HTTP 压缩

编辑

可以使用 http_compress 参数启用 HTTP 请求和响应正文的压缩。如果启用,则 HTTP 请求正文将使用 gzip 压缩,并且 HTTP 响应将包括 Accept-Encoding: gzip HTTP 标头。默认情况下,压缩处于禁用状态。

client = Elasticsearch(
    ...,
    http_compress=True  # Enable compression!
)

建议在请求遍历网络时启用 HTTP 压缩。连接到 Elastic Cloud 时会自动启用压缩。

请求超时

编辑

如果请求花费的时间过长而无法处理,则可以将其配置为超时。可以通过客户端构造函数或客户端 .options() 方法传递 request_timeout 参数。当请求超时时,节点将引发 ConnectionTimeout 异常,该异常可以触发重试。

request_timeout 设置为 None 将禁用超时。

client = Elasticsearch(
    ...,
    request_timeout=10  # 10 second timeout
)

# Search request will timeout in 5 seconds
client.options(request_timeout=5).search(...)

API 和服务器超时

编辑

在发出请求时,需要考虑 API 级别的超时,这可能会导致请求在服务器端而不是客户端超时。对于长时间运行的操作,您可能需要配置传输和 API 级别的超时。

在下面的示例中,cluster.health API 有三个不同的可配置超时,所有这些超时对于请求都有不同的含义。

client.options(
    # Amount of time to wait for an HTTP response to start.
    request_timeout=30
).cluster.health(
    # Amount of time to wait to collect info on all nodes.
    timeout=30,
    # Amount of time to wait for info from the master node.
    master_timeout=10,
)

重试

编辑

如果请求没有返回成功的响应,则可以重试。这提供了一种使请求能够抵御瞬时故障或节点过载的方法。

可以通过 max_retries 参数配置每个请求的最大重试次数。将此参数设置为 0 会禁用重试。可以在客户端构造函数中或通过客户端 .options() 方法按请求设置此参数。

client = Elasticsearch(
    ...,
    max_retries=5
)

# For this API request we disable retries with 'max_retries=0'
client.options(max_retries=0).index(
    index="blogs",
    document={
        "title": "..."
    }
)

在连接错误和超时时重试

编辑

如果启用了重试,则会自动重试连接错误。可以通过 retry_on_timeout 参数启用或禁用在连接超时时重试请求。可以在客户端构造函数中或通过客户端 .options() 方法设置此参数。

client = Elasticsearch(
    ...,
    retry_on_timeout=True
)
client.options(retry_on_timeout=False).info()

重试状态代码

编辑

默认情况下,如果启用了重试,则 retry_on_status 设置为 (429, 502, 503, 504)。可以在客户端构造函数中或通过客户端 .options() 方法设置此参数。将此值设置为 () 将禁用默认行为。

client = Elasticsearch(
    ...,
    retry_on_status=()
)

# Retry this API on '500 Internal Error' statuses
client.options(retry_on_status=[500]).index(
    index="blogs",
    document={
        "title": "..."
    }
)

忽略状态代码

编辑

默认情况下,对于耗尽重试的任何非 2XX HTTP 请求,将引发 ApiError 异常(如果有)。如果您期望来自 API 的 HTTP 错误,但不想引发异常,则可以通过客户端 .options() 方法使用 ignore_status 参数。

一个很好的例子是在集群中以可靠的方式设置或清理资源。

client = Elasticsearch(...)

# API request is robust against the index not existing:
resp = client.options(ignore_status=404).indices.delete(index="delete-this")
resp.meta.status  # Can be either '2XX' or '404'

# API request is robust against the index already existing:
resp = client.options(ignore_status=[400]).indices.create(
    index="create-this",
    mapping={
        "properties": {"field": {"type": "integer"}}
    }
)
resp.meta.status  # Can be either '2XX' or '400'

使用 ignore_status 参数时,错误响应将像非错误响应一样被序列化后返回。在这些情况下,检查响应的 HTTP 状态可能很有用。为此,您可以检查 resp.meta.status

嗅探新节点

编辑

可以通过一个称为“嗅探”的过程来发现其他节点,在该过程中,客户端将查询集群以获取更多可以处理请求的节点。

嗅探可能会在三个不同的时间发生:客户端实例化时、在请求之前以及在节点发生故障时。可以使用 sniff_on_startsniff_before_requestssniff_on_node_failure 参数启用和禁用这三种行为。

当使用 HTTP 负载均衡器或代理时,您不能使用嗅探功能,因为集群会向客户端提供直接连接到集群的 IP 地址,从而绕过负载均衡器。根据您的配置,这可能不是您想要的,或者会完全中断。

在嗅探尝试之间等待

编辑

为了避免不必要地过于频繁地嗅探,在尝试发现新节点之间会有一个延迟。可以通过 min_delay_between_sniffing 参数控制此值。

筛选嗅探的节点

编辑

默认情况下,仅标记为 master 角色的节点将不会使用。要更改此行为,可以使用参数 sniffed_node_callback。要标记一个嗅探的节点不添加到节点池中,请从 sniffed_node_callback 返回 None,否则返回一个 NodeConfig 实例。

from typing import Optional, Dict, Any
from elastic_transport import NodeConfig
from elasticsearch import Elasticsearch

def filter_master_eligible_nodes(
    node_info: Dict[str, Any],
    node_config: NodeConfig
) -> Optional[NodeConfig]:
    # This callback ignores all nodes that are master eligible
    # instead of master-only nodes (default behavior)
    if "master" in node_info.get("roles", ()):
        return None
    return node_config

client = Elasticsearch(
    "https://127.0.0.1:9200",
    sniffed_node_callback=filter_master_eligible_nodes
)

node_info 参数是 nodes.info() API 响应的一部分,下面是该对象外观的示例。

{
  "name": "SRZpKFZ",
  "transport_address": "127.0.0.1:9300",
  "host": "127.0.0.1",
  "ip": "127.0.0.1",
  "version": "5.0.0",
  "build_hash": "253032b",
  "roles": ["master", "data", "ingest"],
  "http": {
    "bound_address": ["[fe80::1]:9200", "[::1]:9200", "127.0.0.1:9200"],
    "publish_address": "1.1.1.1:123",
    "max_content_length_in_bytes": 104857600
  }
}

节点池

编辑

从池中选择节点

编辑

您可以通过 node_selector_class 参数指定节点选择器模式。支持的值为 round_robinrandom。默认为 round_robin

client = Elasticsearch(
    ...,
    node_selector_class="round_robin"
)

还支持自定义选择器。

from elastic_transport import NodeSelector

class CustomSelector(NodeSelector):
    def select(nodes): ...

client = Elasticsearch(
    ...,
    node_selector_class=CustomSelector
)

标记节点为死节点和活动节点

编辑

Elasticsearch 的各个节点可能存在瞬时的连接或负载问题,这可能会导致它们无法处理请求。为了解决这个问题,节点池将检测到何时由于传输或 API 错误而导致节点无法处理请求。

节点超时后,它将移回“活动”节点集中,但只有在节点返回成功响应后,该节点才会在连续错误方面被标记为“活动”。

可以使用 dead_node_backoff_factormax_dead_node_backoff 参数配置节点池将节点放入超时的时长,以及每次连续失败后的时长。这两个参数都使用秒作为单位。

计算公式等于 min(dead_node_backoff_factor * (2 ** (consecutive_failures - 1)), max_dead_node_backoff)

序列化程序

编辑

序列化程序将网络上的字节转换为本机 Python 对象,反之亦然。默认情况下,客户端附带用于 application/jsonapplication/x-ndjsontext/*application/vnd.apache.arrow.streamapplication/mapbox-vector-tile 的序列化程序。

您可以通过 serializers 参数定义自定义序列化程序。

from elasticsearch import Elasticsearch, JsonSerializer

class JsonSetSerializer(JsonSerializer):
    """Custom JSON serializer that handles Python sets"""
    def default(self, data: Any) -> Any:
        if isinstance(data, set):
            return list(data)
        return super().default(data)

client = Elasticsearch(
    ...,
    # Serializers are a mapping of 'mimetype' to Serializer class.
    serializers={"application/json": JsonSetSerializer()}
)

如果安装了 orjson 包,则可以为默认 mimetype (``application/json``) 使用更快的 ``OrjsonSerializer``。

from elasticsearch import Elasticsearch, OrjsonSerializer

es = Elasticsearch(
    ...,
    serializer=OrjsonSerializer()
)

orjson 在序列化向量时特别快,因为它具有本机 numpy 支持。这将是未来版本中的默认值。请注意,您可以使用 orjson 额外选项安装 orjson。

$ python -m pip install elasticsearch[orjson]

节点

编辑

节点实现

编辑

同步 I/O 的默认节点类是 urllib3,异步 I/O 的默认节点类是 aiohttp

对于所有内置的 HTTP 节点实现,例如 urllib3requestsaiohttp,您可以使用一个简单的字符串来指定 node_class 参数。

from elasticsearch import Elasticsearch

client = Elasticsearch(
    ...,
    node_class="requests"
)

您还可以通过 node_class 参数指定自定义节点实现。

from elasticsearch import Elasticsearch
from elastic_transport import Urllib3HttpNode

class CustomHttpNode(Urllib3HttpNode):
    ...

client = Elasticsearch(
    ...
    node_class=CustomHttpNode
)

每个节点的 HTTP 连接数

编辑

每个节点都包含自己的 HTTP 连接池,以允许并发请求。此值可以通过 connections_per_node 参数进行配置。

client = Elasticsearch(
    ...,
    connections_per_node=5
)