传输

编辑

elastic-transport 库为连接到 Elasticsearch 集群提供了一个低级别的 Ruby 客户端。它目前为 Elasticsearch RubyEnterprise Search Ruby 客户端提供支持。

当可用时,它会处理连接到集群中的多个节点、轮换连接、记录和跟踪请求和响应、维护失败的连接、发现集群中的节点,并为数据序列化和传输提供抽象。

它不处理调用 Elasticsearch 或 Enterprise Search API。

默认情况下,此库使用 Faraday 作为 HTTP 传输实现。我们使用 Faraday 1.x 和 2.x 版本对其进行测试。

为了获得最佳性能,请使用支持持久(“keep-alive”)连接的 HTTP 库,例如 patronTyphoeus。在你的代码中引入该库(require 'patron')用于 Faraday 1.x 或适配器(require 'faraday/patron')用于 Faraday 2.x,它将被自动使用。

目前支持以下库

请使用 Typhoeus v1.4.0 或更高版本,因为旧版本与 Faraday 1.0 不兼容。

你可以自定义 Faraday 并实现自己的 HTTP 传输。有关详细信息,请参阅下面的示例配置和更多信息。

功能概述

  • 可插拔的日志记录和跟踪
  • 可插拔的连接选择策略(循环、随机、自定义)
  • 可插拔的传输实现,可自定义和扩展
  • 可插拔的序列化器实现
  • 请求重试和死连接处理
  • 基于集群状态或按需的节点重新加载

请参阅高级配置以了解更多配置选项。

安装

编辑

Rubygems 安装软件包

gem install elastic-transport

要使用未发布的版本,请将其添加到你的 Gemfile 以用于 Bundler

gem 'elastic-transport', git: '[email protected]:elastic/elastic-transport-ruby.git'

或从源代码检出安装

git clone https://github.com/elastic/elastic-transport-ruby.git
cd elastic-transport
bundle install
rake install

示例用法

编辑

以最简单的形式,连接到运行在 https://127.0.0.1:9200 上的 Elasticsearch,无需任何配置

require 'elastic/transport'

client = Elastic::Transport::Client.new
response = client.perform_request('GET', '_cluster/health')
# => #<Elastic::Transport::Transport::Response:0x007fc5d506ce38 @status=200, @body={ ... } >

文档以 RDoc 注释的形式包含在源代码中,并可在 RubyDoc 在线获取。

传输实现

编辑

默认情况下,客户端使用 Faraday HTTP 库作为传输实现。

客户端会自动检测并使用基于代码中加载的 gem 的 Faraday适配器,优先选择支持持久连接的 HTTP 客户端。Faraday 2 更改了适配器的使用方式(在此处阅读更多内容)。如果你使用的是 Faraday 1.x,你可以引入 HTTP 库。例如,要使用 Patron HTTP,请引入它

例如,要使用 Patron HTTP,请引入它

require 'patron'

如果你使用的是 Faraday 2.x,则需要在引入 faraday 后,将相应的适配器 gem 添加到你的 Gemfile 并引入它

# Gemfile
gem 'faraday-patron'

# Code
require 'faraday'
require 'faraday/patron'

然后,创建一个新的客户端,Patron gem 将被用作“驱动程序”

client = Elastic::Transport::Client.new

client.transport.connections.first.connection.builder.adapter
# => Faraday::Adapter::Patron

10.times do
  client.nodes.stats(metric: 'http')['nodes'].values.each do |n|
    puts "#{n['name']} : #{n['http']['total_opened']}"
  end
end

# => Stiletoo : 24
# => Stiletoo : 24
# => Stiletoo : 24
# => ...

要为 Faraday 使用特定的适配器,请将其作为 adapter 参数传递

client = Elastic::Client.new(adapter: :net_http_persistent)

client.transport.connections.first.connection.builder.handlers
# => [Faraday::Adapter::NetHttpPersistent]

如果看到此错误

Faraday::Error: :net_http_persistent is not registered on Faraday::Adapter

当你使用 Faraday 2 时,需要在实例化客户端之前引入适配器

> client = Elasticsearch::Client.new(adapter: :net_http_persistent)
Faraday::Error: :net_http_persistent is not registered on Faraday::Adapter
> require 'faraday/net_http_persistent'
=> true
> client = Elasticsearch::Client.new(adapter: :net_http_persistent)
=> #<Elasticsearch::Client:0x00007eff2e7728e0

当使用 Elasticsearch 或 Enterprise Search 客户端时,你可以在初始化客户端时传递 adapter 参数。

要将选项传递给 Faraday::Connection 构造函数,请使用 transport_options

client = Elastic::Client.new(
  transport_options: {
    request: { open_timeout: 1 },
    headers: { user_agent:   'MyApp' },
    params:  { :format => 'yaml' },
    ssl:     { verify: false }
  }
)

要直接配置 Faraday 实例,请使用代码块

require 'patron'

client = Elastic::Client.new(host: 'localhost', port: '9200') do |f|
  f.response :logger
  f.adapter  :patron
end

你可以在配置块中使用任何标准的 Faraday 中间件和插件。

你还可以自己初始化传输类,并将其作为 transport 参数传递给客户端构造函数。Elasticsearch 和 Enterprise Search 客户端在初始化客户端时接受 :transport 作为参数。因此,你可以传入使用以下选项初始化的传输

require 'patron'

transport_configuration = lambda do |f|
  f.response :logger
  f.adapter  :patron
end

transport = Elastic::Transport::Transport::HTTP::Faraday.new(
  hosts: [ { host: 'localhost', port: '9200' } ],
  &transport_configuration
)

# Pass the transport to the client
#
client = Elastic::Client.new(transport: transport)

除了将传输传递给构造函数之外,你还可以在运行时注入它

# Set up the transport
#
faraday_configuration = lambda do |f|
  f.instance_variable_set :@ssl, { verify: false }
  f.adapter :excon
end

faraday_client = Elastic::Transport::Transport::HTTP::Faraday.new(
  hosts: [
    {
      host: 'my-protected-host',
      port: '443',
      user: 'USERNAME',
      password: 'PASSWORD',
      scheme: 'https'
    }
  ],
  &faraday_configuration
)

# Create a default client
#
client = Elastic::Client.new

# Inject the transport to the client
#
client.transport = faraday_client

你还可以使用捆绑的基于 Curb 的传输实现

require 'curb'
require 'elastic/transport/transport/http/curb'

client = Elastic::Client.new(transport_class: Elastic::Transport::Transport::HTTP::Curb)

client.transport.connections.first.connection
# => #<Curl::Easy https://127.0.0.1:9200/>

也可以通过将代码块传递给构造函数来自定义 Curb 实例(在这种情况下,作为内联代码块)

transport = Elastic::Transport::Transport::HTTP::Curb.new(
  hosts: [ { host: 'localhost', port: '9200' } ],
  & lambda { |c| c.verbose = true }
)

client = Elastic::Client.new(transport: transport)

你可以通过包含 {Elastic::Transport::Transport::Base} 模块、实现所需的约定,并将其作为 transport_class 参数传递给客户端,或直接注入它来编写自己的传输实现。

传输架构

编辑
  • Elastic::Transport::ClientElastic::Transport::Transport 组成。
  • Elastic::Transport::TransportElastic::Transport::Transport::Connections 和 logger、tracer、serializer 和 sniffer 的实例组成。
  • Logger 和 tracer 可以是符合 Ruby 日志接口的任何对象,例如 Loggerlog4rlogging 等的实例。
  • Elastic::Transport::Transport::Serializer::Base 实现处理为 Elasticsearch 转换数据(例如,转换为 JSON)。你可以实现自己的序列化器。
  • Elastic::Transport::Transport::Sniffer 允许发现集群中的节点并将其用作连接。
  • Elastic::Transport::Transport::Connections::CollectionElastic::Transport::Transport::Connections::Connection 实例和一个选择器实例组成。
  • Elastic::Transport::Transport::Connections::Connection 包含连接属性,例如主机名和端口,以及连接到特定节点的具体持久“会话”。
  • Elastic::Transport::Transport::Connections::Selector::Base 实现允许从池中选择连接,例如,以循环或随机方式。你可以实现自己的选择器策略。
  • Elastic::Transport::Transport::Response 对象包装了 Elasticsearch JSON 响应。它提供 bodystatusheaders 方法,但你可以将其视为哈希并直接访问键。