加载中

教程

让我们来看一个直接写成 dict 的典型搜索请求

from elasticsearch import Elasticsearch
client = Elasticsearch("https://localhost:9200")

response = client.search(
    index="my-index",
    body={
      "query": {
        "bool": {
          "must": [{"match": {"title": "python"}}],
          "must_not": [{"match": {"description": "beta"}}],
          "filter": [{"term": {"category": "search"}}]
        }
      },
      "aggs" : {
        "per_tag": {
          "terms": {"field": "tags"},
          "aggs": {
            "max_lines": {"max": {"field": "lines"}}
          }
        }
      }
    }
)

for hit in response['hits']['hits']:
    print(hit['_score'], hit['_source']['title'])

for tag in response['aggregations']['per_tag']['buckets']:
    print(tag['key'], tag['max_lines']['value'])

这种方法的缺点在于它非常冗长,容易出现语法错误(例如错误的嵌套),难以修改(例如添加另一个过滤器),而且写起来肯定不好玩。

让我们使用 DSL 模块重写这个例子

from elasticsearch import Elasticsearch
from elasticsearch.dsl import Search, query, aggs

client = Elasticsearch("https://localhost:9200")

s = Search(using=client, index="my-index") \
    .query(query.Match("title", "python"))   \
    .filter(query.Term("category", "search")) \
    .exclude(query.Match("description", "beta"))

s.aggs.bucket('per_tag', aggs.Terms(field="tags")) \
    .metric('max_lines', aggs.Max(field='lines'))

response = s.execute()

for hit in response:
    print(hit.meta.score, hit.title)

for tag in response.aggregations.per_tag.buckets:
    print(tag.key, tag.max_lines.value)

正如你所见,DSL 模块负责处理了

  • 从类创建合适的 Query 对象
  • 将查询组合成一个复合 bool 查询
  • term 查询置于 bool 查询的过滤器上下文
  • 提供方便的响应数据访问方式
  • 不再到处都是大括号或方括号

让我们来看一个表示博客系统中文章的简单 Python 类

from datetime import datetime
from elasticsearch.dsl import Document, Date, Integer, Keyword, Text, connections

# Define a default Elasticsearch client
connections.create_connection(hosts="https://localhost:9200")

class Article(Document):
    title: str = mapped_field(Text(analyzer='snowball', fields={'raw': Keyword()}))
    body: str = mapped_field(Text(analyzer='snowball'))
    tags: str = mapped_field(Keyword())
    published_from: datetime
    lines: int

    class Index:
        name = 'blog'
        settings = {
          "number_of_shards": 2,
        }

    def save(self, **kwargs):
        self.lines = len(self.body.split())
        return super(Article, self).save(** kwargs)

    def is_published(self):
        return datetime.now() > self.published_from

# create the mappings in elasticsearch
Article.init()

# create and save and article
article = Article(meta={'id': 42}, title='Hello world!', tags=['test'])
article.body = ''' looong text '''
article.published_from = datetime.now()
article.save()

article = Article.get(id=42)
print(article.is_published())

# Display cluster health
print(connections.get_connection().cluster.health())

在此示例中,你可以看到

  • 提供默认连接
  • 使用 Python 类型提示定义字段,并在必要时添加额外的映射配置
  • 设置索引名称
  • 定义自定义方法
  • 覆盖内置的 .save() 方法以介入持久化生命周期
  • 将对象检索并保存到 Elasticsearch
  • 访问底层客户端以使用其他 API

你可以在 persistence 章节中看到更多内容。

如果你定义了 Documents,可以非常轻松地创建一个分面搜索类来简化搜索和过滤。

from elasticsearch.dsl import FacetedSearch, TermsFacet, DateHistogramFacet

class BlogSearch(FacetedSearch):
    doc_types = [Article, ]
    # fields that should be searched
    fields = ['tags', 'title', 'body']

    facets = {
        # use bucket aggregations to define facets
        'tags': TermsFacet(field='tags'),
        'publishing_frequency': DateHistogramFacet(field='published_from', interval='month')
    }

# empty search
bs = BlogSearch()
response = bs.execute()

for hit in response:
    print(hit.meta.score, hit.title)

for (tag, count, selected) in response.facets.tags:
    print(tag, ' (SELECTED):' if selected else ':', count)

for (month, count, selected) in response.facets.publishing_frequency:
    print(month.strftime('%B %Y'), ' (SELECTED):' if selected else ':', count)

你可以在 faceted_search 章节中找到更多详情。

让我们回到关于博客文章的简单示例,并假设每篇文章都有点赞数。在此示例中,想象我们想对所有匹配某个标签但不匹配某个描述的文章的点赞数加 1。将其写成 dict,我们将得到以下代码

from elasticsearch import Elasticsearch
client = Elasticsearch()

response = client.update_by_query(
    index="my-index",
    body={
      "query": {
        "bool": {
          "must": [{"match": {"tag": "python"}}],
          "must_not": [{"match": {"description": "beta"}}]
        }
      },
      "script"={
        "source": "ctx._source.likes++",
        "lang": "painless"
      }
    },
  )

使用 DSL,我们现在可以将此查询表示为

from elasticsearch import Elasticsearch
from elasticsearch.dsl import Search, UpdateByQuery
from elasticsearch.dsl.query import Match

client = Elasticsearch()
ubq = UpdateByQuery(using=client, index="my-index") \
      .query(Match("title", "python"))   \
      .exclude(Match("description", "beta")) \
      .script(source="ctx._source.likes++", lang="painless")

response = ubq.execute()

正如你所见,Update By Query 对象提供了 Search 对象带来的许多便利,此外还允许通过以相同方式分配的脚本来更新搜索结果。

你无需移植整个应用程序即可获得 DSL 模块的优势,你可以通过从现有的 dict 创建 Search 对象、使用 API 修改它并将其序列化回 dict 来逐步开始。

body = {...}

# Convert to Search object
s = Search.from_dict(body)

# Add some filters, aggregations, queries, ...
s.filter(query.Term("tags", "python"))

# Convert back to dict to plug back into existing code
body = s.to_dict()
  1. 在此处插入复杂的查询
© . All rights reserved.