配置
编辑配置
编辑本页包含有关 Python Elasticsearch 客户端最重要的配置选项的信息。
TLS/SSL
编辑本节中的选项仅当节点配置为 HTTPS 时才可以使用。如果将这些选项与 HTTP 节点一起使用,则会引发错误。
验证服务器证书
编辑验证集群证书的典型方法是通过“CA 捆绑包”,该捆绑包可以通过 ca_certs
参数指定。如果没有提供任何选项并且安装了 certifi 包,则默认使用 certifi 的 CA 捆绑包。
如果您有自己的 CA 捆绑包要使用,可以通过 ca_certs
参数进行配置。
client = Elasticsearch( "https://...", ca_certs="/path/to/certs.pem" )
如果使用生成的证书或具有已知指纹的证书,则可以使用 ssl_assert_fingerprint
来指定在 TLS 握手期间尝试匹配服务器叶子证书的指纹。如果有任何匹配的证书,则验证连接,否则将引发 TlsError
。
在 Python 3.9 及更早版本中,只会验证叶子证书,但在 Python 3.10+ 中,会使用私有 API 来验证证书链中的任何证书。这在使用在多节点集群上生成的证书时很有帮助。
client = Elasticsearch( "https://...", ssl_assert_fingerprint=( "315f5bdb76d078c43b8ac0064e4a0164612b1fce77c869345bfc94c75894edd3" ) )
要禁用证书验证,请使用 verify_certs=False
参数。在生产环境中应避免使用此选项,而是使用其他选项来验证集群的证书。
client = Elasticsearch( "https://...", verify_certs=False )
TLS 版本
编辑配置连接的最低 TLS 版本是通过 ssl_version
参数完成的。默认情况下,此值设置为 TLSv1.2 的最小值。使用 ssl.TLSVersion
枚举来指定版本。
import ssl client = Elasticsearch( ..., ssl_version=ssl.TLSVersion.TLSv1_2 )
客户端 TLS 证书身份验证
编辑可以将 Elasticsearch 配置为通过 TLS 客户端证书对客户端进行身份验证。客户端证书和密钥可以通过 client_cert
和 client_key
参数进行配置。
client = Elasticsearch( ..., client_cert="/path/to/cert.pem", client_key="/path/to/key.pem", )
使用 SSLContext
编辑对于高级用户,可以使用 ssl.SSLContext
对象通过 ssl_context
参数配置 TLS。ssl_context
参数不能与任何其他 TLS 选项组合使用,除了 ssl_assert_fingerprint
参数。
import ssl # Create and configure an SSLContext ctx = ssl.create_default_context() ctx.load_verify_locations(...) client = Elasticsearch( ..., ssl_context=ctx )
HTTP 压缩
编辑可以使用 http_compress
参数启用 HTTP 请求和响应正文的压缩。如果启用,则 HTTP 请求正文将使用 gzip
压缩,并且 HTTP 响应将包括 Accept-Encoding: gzip
HTTP 标头。默认情况下,压缩处于禁用状态。
client = Elasticsearch( ..., http_compress=True # Enable compression! )
建议在请求遍历网络时启用 HTTP 压缩。连接到 Elastic Cloud 时会自动启用压缩。
请求超时
编辑如果请求花费的时间过长而无法处理,则可以将其配置为超时。可以通过客户端构造函数或客户端 .options()
方法传递 request_timeout
参数。当请求超时时,节点将引发 ConnectionTimeout
异常,该异常可以触发重试。
将 request_timeout
设置为 None
将禁用超时。
client = Elasticsearch( ..., request_timeout=10 # 10 second timeout ) # Search request will timeout in 5 seconds client.options(request_timeout=5).search(...)
API 和服务器超时
编辑在发出请求时,需要考虑 API 级别的超时,这可能会导致请求在服务器端而不是客户端超时。对于长时间运行的操作,您可能需要配置传输和 API 级别的超时。
在下面的示例中,cluster.health
API 有三个不同的可配置超时,所有这些超时对于请求都有不同的含义。
client.options( # Amount of time to wait for an HTTP response to start. request_timeout=30 ).cluster.health( # Amount of time to wait to collect info on all nodes. timeout=30, # Amount of time to wait for info from the master node. master_timeout=10, )
重试
编辑如果请求没有返回成功的响应,则可以重试。这提供了一种使请求能够抵御瞬时故障或节点过载的方法。
可以通过 max_retries
参数配置每个请求的最大重试次数。将此参数设置为 0 会禁用重试。可以在客户端构造函数中或通过客户端 .options()
方法按请求设置此参数。
client = Elasticsearch( ..., max_retries=5 ) # For this API request we disable retries with 'max_retries=0' client.options(max_retries=0).index( index="blogs", document={ "title": "..." } )
在连接错误和超时时重试
编辑如果启用了重试,则会自动重试连接错误。可以通过 retry_on_timeout
参数启用或禁用在连接超时时重试请求。可以在客户端构造函数中或通过客户端 .options()
方法设置此参数。
client = Elasticsearch( ..., retry_on_timeout=True ) client.options(retry_on_timeout=False).info()
重试状态代码
编辑默认情况下,如果启用了重试,则 retry_on_status
设置为 (429, 502, 503, 504)
。可以在客户端构造函数中或通过客户端 .options()
方法设置此参数。将此值设置为 ()
将禁用默认行为。
client = Elasticsearch( ..., retry_on_status=() ) # Retry this API on '500 Internal Error' statuses client.options(retry_on_status=[500]).index( index="blogs", document={ "title": "..." } )
忽略状态代码
编辑默认情况下,对于耗尽重试的任何非 2XX HTTP 请求,将引发 ApiError
异常(如果有)。如果您期望来自 API 的 HTTP 错误,但不想引发异常,则可以通过客户端 .options()
方法使用 ignore_status
参数。
一个很好的例子是在集群中以可靠的方式设置或清理资源。
client = Elasticsearch(...) # API request is robust against the index not existing: resp = client.options(ignore_status=404).indices.delete(index="delete-this") resp.meta.status # Can be either '2XX' or '404' # API request is robust against the index already existing: resp = client.options(ignore_status=[400]).indices.create( index="create-this", mapping={ "properties": {"field": {"type": "integer"}} } ) resp.meta.status # Can be either '2XX' or '400'
使用 ignore_status
参数时,错误响应将像非错误响应一样被序列化后返回。在这些情况下,检查响应的 HTTP 状态可能很有用。为此,您可以检查 resp.meta.status
。
嗅探新节点
编辑可以通过一个称为“嗅探”的过程来发现其他节点,在该过程中,客户端将查询集群以获取更多可以处理请求的节点。
嗅探可能会在三个不同的时间发生:客户端实例化时、在请求之前以及在节点发生故障时。可以使用 sniff_on_start
、sniff_before_requests
和 sniff_on_node_failure
参数启用和禁用这三种行为。
当使用 HTTP 负载均衡器或代理时,您不能使用嗅探功能,因为集群会向客户端提供直接连接到集群的 IP 地址,从而绕过负载均衡器。根据您的配置,这可能不是您想要的,或者会完全中断。
在嗅探尝试之间等待
编辑为了避免不必要地过于频繁地嗅探,在尝试发现新节点之间会有一个延迟。可以通过 min_delay_between_sniffing
参数控制此值。
筛选嗅探的节点
编辑默认情况下,仅标记为 master
角色的节点将不会使用。要更改此行为,可以使用参数 sniffed_node_callback
。要标记一个嗅探的节点不添加到节点池中,请从 sniffed_node_callback
返回 None
,否则返回一个 NodeConfig
实例。
from typing import Optional, Dict, Any from elastic_transport import NodeConfig from elasticsearch import Elasticsearch def filter_master_eligible_nodes( node_info: Dict[str, Any], node_config: NodeConfig ) -> Optional[NodeConfig]: # This callback ignores all nodes that are master eligible # instead of master-only nodes (default behavior) if "master" in node_info.get("roles", ()): return None return node_config client = Elasticsearch( "https://127.0.0.1:9200", sniffed_node_callback=filter_master_eligible_nodes )
node_info
参数是 nodes.info()
API 响应的一部分,下面是该对象外观的示例。
{ "name": "SRZpKFZ", "transport_address": "127.0.0.1:9300", "host": "127.0.0.1", "ip": "127.0.0.1", "version": "5.0.0", "build_hash": "253032b", "roles": ["master", "data", "ingest"], "http": { "bound_address": ["[fe80::1]:9200", "[::1]:9200", "127.0.0.1:9200"], "publish_address": "1.1.1.1:123", "max_content_length_in_bytes": 104857600 } }
节点池
编辑从池中选择节点
编辑您可以通过 node_selector_class
参数指定节点选择器模式。支持的值为 round_robin
和 random
。默认为 round_robin
。
client = Elasticsearch( ..., node_selector_class="round_robin" )
还支持自定义选择器。
from elastic_transport import NodeSelector class CustomSelector(NodeSelector): def select(nodes): ... client = Elasticsearch( ..., node_selector_class=CustomSelector )
标记节点为死节点和活动节点
编辑Elasticsearch 的各个节点可能存在瞬时的连接或负载问题,这可能会导致它们无法处理请求。为了解决这个问题,节点池将检测到何时由于传输或 API 错误而导致节点无法处理请求。
节点超时后,它将移回“活动”节点集中,但只有在节点返回成功响应后,该节点才会在连续错误方面被标记为“活动”。
可以使用 dead_node_backoff_factor
和 max_dead_node_backoff
参数配置节点池将节点放入超时的时长,以及每次连续失败后的时长。这两个参数都使用秒作为单位。
计算公式等于 min(dead_node_backoff_factor * (2 ** (consecutive_failures - 1)), max_dead_node_backoff)
。
序列化程序
编辑序列化程序将网络上的字节转换为本机 Python 对象,反之亦然。默认情况下,客户端附带用于 application/json
、application/x-ndjson
、text/*
、application/vnd.apache.arrow.stream
和 application/mapbox-vector-tile
的序列化程序。
您可以通过 serializers
参数定义自定义序列化程序。
from elasticsearch import Elasticsearch, JsonSerializer class JsonSetSerializer(JsonSerializer): """Custom JSON serializer that handles Python sets""" def default(self, data: Any) -> Any: if isinstance(data, set): return list(data) return super().default(data) client = Elasticsearch( ..., # Serializers are a mapping of 'mimetype' to Serializer class. serializers={"application/json": JsonSetSerializer()} )
如果安装了 orjson
包,则可以为默认 mimetype (``application/json``) 使用更快的 ``OrjsonSerializer``。
from elasticsearch import Elasticsearch, OrjsonSerializer es = Elasticsearch( ..., serializer=OrjsonSerializer() )
orjson 在序列化向量时特别快,因为它具有本机 numpy 支持。这将是未来版本中的默认值。请注意,您可以使用 orjson
额外选项安装 orjson。
$ python -m pip install elasticsearch[orjson]
节点
编辑节点实现
编辑同步 I/O 的默认节点类是 urllib3
,异步 I/O 的默认节点类是 aiohttp
。
对于所有内置的 HTTP 节点实现,例如 urllib3
、requests
和 aiohttp
,您可以使用一个简单的字符串来指定 node_class
参数。
from elasticsearch import Elasticsearch client = Elasticsearch( ..., node_class="requests" )
您还可以通过 node_class
参数指定自定义节点实现。
from elasticsearch import Elasticsearch from elastic_transport import Urllib3HttpNode class CustomHttpNode(Urllib3HttpNode): ... client = Elasticsearch( ... node_class=CustomHttpNode )
每个节点的 HTTP 连接数
编辑每个节点都包含自己的 HTTP 连接池,以允许并发请求。此值可以通过 connections_per_node
参数进行配置。
client = Elasticsearch( ..., connections_per_node=5 )