配置
编辑配置编辑
此页面包含有关 Python Elasticsearch 客户端最重要的配置选项的信息。
TLS/SSL编辑
本节中的选项仅在节点配置为 HTTPS 时才可以使用。如果将这些选项与 HTTP 节点一起使用,则会引发错误。
验证服务器证书编辑
验证集群证书的典型方法是通过“CA 捆绑包”,可以通过 ca_certs
参数指定。如果未给出任何选项并且安装了 certifi 包,则默认情况下使用 certifi 的 CA 捆绑包。
如果您有自己的 CA 捆绑包要使用,可以通过 ca_certs
参数进行配置
client = Elasticsearch( "https://...", ca_certs="/path/to/certs.pem" )
如果使用生成的证书或具有已知指纹的证书,则可以使用 ssl_assert_fingerprint
指定指纹,该指纹尝试在 TLS 握手期间与服务器的叶证书匹配。如果存在任何匹配的证书,则连接将被验证,否则将引发 TlsError
。
在 Python 3.9 及更早版本中,只会验证叶证书,但在 Python 3.10+ 中,将使用私有 API 来验证证书链中的任何证书。这在使用在多节点集群上生成的证书时很有帮助。
client = Elasticsearch( "https://...", ssl_assert_fingerprint=( "315f5bdb76d078c43b8ac0064e4a0164612b1fce77c869345bfc94c75894edd3" ) )
要禁用证书验证,请使用 verify_certs=False
参数。此选项应在生产环境中避免,而应使用其他选项来验证集群的证书。
client = Elasticsearch( "https://...", verify_certs=False )
TLS 版本编辑
通过 ssl_version
参数配置要连接到的最小 TLS 版本。默认情况下,这设置为 TLSv1.2 的最小值。使用 ssl.TLSVersion
枚举来指定版本。
import ssl client = Elasticsearch( ..., ssl_version=ssl.TLSVersion.TLSv1_2 )
客户端 TLS 证书身份验证编辑
Elasticsearch 可以配置为通过 TLS 客户端证书对客户端进行身份验证。可以通过 client_cert
和 client_key
参数配置客户端证书和密钥
client = Elasticsearch( ..., client_cert="/path/to/cert.pem", client_key="/path/to/key.pem", )
使用 SSLContext编辑
对于高级用户,可以使用 ssl.SSLContext
对象通过 ssl_context
参数配置 TLS。 ssl_context
参数不能与除 ssl_assert_fingerprint
参数之外的任何其他 TLS 选项组合使用。
import ssl # Create and configure an SSLContext ctx = ssl.create_default_context() ctx.load_verify_locations(...) client = Elasticsearch( ..., ssl_context=ctx )
HTTP 压缩编辑
可以使用 http_compress
参数启用 HTTP 请求和响应主体压缩。如果启用,则 HTTP 请求主体将使用 gzip
压缩,并且 HTTP 响应将包含 Accept-Encoding: gzip
HTTP 标头。默认情况下,压缩被禁用。
client = Elasticsearch( ..., http_compress=True # Enable compression! )
建议在请求遍历网络时启用 HTTP 压缩。连接到 Elastic Cloud 时会自动启用压缩。
请求超时编辑
可以配置请求,如果服务时间过长,则会超时。可以通过客户端构造函数或客户端 .options()
方法传递 request_timeout
参数。当请求超时时,节点将引发 ConnectionTimeout
异常,这可能会触发重试。
将 request_timeout
设置为 None
将禁用超时。
client = Elasticsearch( ..., request_timeout=10 # 10 second timeout ) # Search request will timeout in 5 seconds client.options(request_timeout=5).search(...)
API 和服务器超时编辑
在进行请求时,需要考虑 API 级别的超时,这可能会导致请求在服务器端而不是客户端端超时。对于长时间运行的操作,您可能需要配置传输和 API 级别的超时。
在下面的示例中,cluster.health
API 有三个不同的可配置超时,它们对请求具有不同的含义
client.options( # Amount of time to wait for an HTTP response to start. request_timeout=30 ).cluster.health( # Amount of time to wait to collect info on all nodes. timeout=30, # Amount of time to wait for info from the master node. master_timeout=10, )
重试编辑
如果请求未返回成功响应,则可以重试。这提供了一种方法,使请求能够抵御瞬态故障或过载节点。
可以通过 max_retries
参数配置每个请求的最大重试次数。将此参数设置为 0 将禁用重试。此参数可以在客户端构造函数中设置,也可以通过客户端 .options()
方法在每个请求中设置
client = Elasticsearch( ..., max_retries=5 ) # For this API request we disable retries with 'max_retries=0' client.options(max_retries=0).index( index="blogs", document={ "title": "..." } )
在连接错误和超时时重试编辑
如果启用了重试,则会自动重试连接错误。可以通过 retry_on_timeout
参数启用或禁用在连接超时时重试请求。此参数可以在客户端构造函数中设置,也可以通过客户端 .options()
方法设置
client = Elasticsearch( ..., retry_on_timeout=True ) client.options(retry_on_timeout=False).info()
重试状态代码编辑
默认情况下,如果启用了重试,则 retry_on_status
设置为 (429, 502, 503, 504)
。此参数可以在客户端构造函数中设置,也可以通过客户端 .options()
方法设置。将此值设置为 ()
将禁用默认行为。
client = Elasticsearch( ..., retry_on_status=() ) # Retry this API on '500 Internal Error' statuses client.options(retry_on_status=[500]).index( index="blogs", document={ "title": "..." } )
忽略状态代码编辑
默认情况下,对于任何非 2XX HTTP 请求,如果耗尽了重试(如果有),则会引发 ApiError
异常。如果您期望 API 返回 HTTP 错误,但不想引发异常,则可以使用客户端 .options()
方法中的 ignore_status
参数。
这在以稳健的方式设置或清理集群中的资源时很有用
client = Elasticsearch(...) # API request is robust against the index not existing: resp = client.options(ignore_status=404).indices.delete(index="delete-this") resp.meta.status # Can be either '2XX' or '404' # API request is robust against the index already existing: resp = client.options(ignore_status=[400]).indices.create( index="create-this", mapping={ "properties": {"field": {"type": "integer"}} } ) resp.meta.status # Can be either '2XX' or '400'
使用 ignore_status
参数时,错误响应将像非错误响应一样被序列化返回。在这些情况下,检查响应的 HTTP 状态可能很有用。为此,您可以检查 resp.meta.status
。
嗅探新节点编辑
可以通过称为“嗅探”的过程发现其他节点,在该过程中,客户端将查询集群以获取更多可以处理请求的节点。
嗅探可以在三个不同的时间发生:在客户端实例化时、在请求之前以及在节点故障时。这三种行为可以通过 sniff_on_start
、sniff_before_requests
和 sniff_on_node_failure
参数启用和禁用。
使用 HTTP 负载均衡器或代理时,不能使用嗅探功能,因为集群会向客户端提供 IP 地址以直接连接到集群,从而绕过负载均衡器。根据您的配置,这可能是您不希望做的事情,或者完全破坏它。
在嗅探尝试之间等待编辑
为了避免不必要地过频繁地嗅探,在尝试发现新节点之间会有延迟。可以通过 min_delay_between_sniffing
参数控制此值。
过滤被嗅探的节点编辑
默认情况下,仅标记为 master
角色的节点将不会被使用。要更改行为,可以使用参数 sniffed_node_callback
。要标记一个嗅探到的节点,不要将其添加到节点池中,请从 sniffed_node_callback
返回 None
,否则返回 NodeConfig
实例。
from typing import Optional, Dict, Any from elastic_transport import NodeConfig from elasticsearch import Elasticsearch def filter_master_eligible_nodes( node_info: Dict[str, Any], node_config: NodeConfig ) -> Optional[NodeConfig]: # This callback ignores all nodes that are master eligible # instead of master-only nodes (default behavior) if "master" in node_info.get("roles", ()): return None return node_config client = Elasticsearch( "https://127.0.0.1:9200", sniffed_node_callback=filter_master_eligible_nodes )
node_info
参数是 nodes.info()
API 响应的一部分,下面是一个示例,说明该对象的外观
{ "name": "SRZpKFZ", "transport_address": "127.0.0.1:9300", "host": "127.0.0.1", "ip": "127.0.0.1", "version": "5.0.0", "build_hash": "253032b", "roles": ["master", "data", "ingest"], "http": { "bound_address": ["[fe80::1]:9200", "[::1]:9200", "127.0.0.1:9200"], "publish_address": "1.1.1.1:123", "max_content_length_in_bytes": 104857600 } }
节点池编辑
从池中选择节点编辑
您可以通过 node_selector_class
参数指定节点选择器模式。支持的值是 round_robin
和 random
。默认值为 round_robin
。
client = Elasticsearch( ..., node_selector_class="round_robin" )
也支持自定义选择器
from elastic_transport import NodeSelector class CustomSelector(NodeSelector): def select(nodes): ... client = Elasticsearch( ..., node_selector_class=CustomSelector )
标记节点为死节点和活节点编辑
Elasticsearch 的单个节点可能存在瞬态连接或负载问题,这可能会导致它们无法服务请求。为了解决这个问题,节点池将检测何时节点由于传输或 API 错误而无法服务请求。
节点超时后将被移回“活动”节点集,但只有在节点返回成功响应后,才会在连续错误方面将节点标记为“活动”。
可以使用 dead_node_backoff_factor
和 max_dead_node_backoff
参数配置节点池将节点置于超时状态的持续时间,每次连续失败都会增加超时时间。这两个参数都使用秒为单位。
计算公式为 min(dead_node_backoff_factor * (2 ** (consecutive_failures - 1)), max_dead_node_backoff)
。
序列化器编辑
序列化器将网络上的字节转换为原生 Python 对象,反之亦然。默认情况下,客户端附带了针对 application/json
、application/x-ndjson
、text/*
和 application/mapbox-vector-tile
的序列化器。
您可以通过 serializers
参数定义自定义序列化器。
from elasticsearch import Elasticsearch, JsonSerializer class JsonSetSerializer(JsonSerializer): """Custom JSON serializer that handles Python sets""" def default(self, data: Any) -> Any: if isinstance(data, set): return list(data) return super().default(data) client = Elasticsearch( ..., # Serializers are a mapping of 'mimetype' to Serializer class. serializers={"application/json": JsonSetSerializer()} )
如果安装了 orjson
包,您可以使用更快的 ``OrjsonSerializer`` 来处理默认的 MIME 类型 (``application/json``)
from elasticsearch import Elasticsearch, OrjsonSerializer es = Elasticsearch( ..., serializer=OrjsonSerializer() )
orjson 在序列化向量时特别快,因为它具有本机 numpy 支持。这将在未来的版本中成为默认设置。请注意,您可以使用 orjson
额外参数安装 orjson。
$ python -m pip install elasticsearch[orjson]
节点编辑
节点实现编辑
同步 I/O 的默认节点类是 urllib3
,异步 I/O 的默认节点类是 aiohttp
。
对于所有内置的 HTTP 节点实现,例如 urllib3
、requests
和 aiohttp
,您可以使用简单的字符串指定 node_class
参数。
from elasticsearch import Elasticsearch client = Elasticsearch( ..., node_class="requests" )
您还可以通过 node_class
参数指定自定义节点实现。
from elasticsearch import Elasticsearch from elastic_transport import Urllib3HttpNode class CustomHttpNode(Urllib3HttpNode): ... client = Elasticsearch( ... node_class=CustomHttpNode )
每个节点的 HTTP 连接编辑
每个节点都包含自己的 HTTP 连接池,以允许并发请求。此值可以通过 connections_per_node
参数进行配置。
client = Elasticsearch( ..., connections_per_node=5 )