教程:将 Cohere 与 Elasticsearch 结合使用

编辑

教程:将 Cohere 与 Elasticsearch 结合使用

编辑

本教程中的说明将向您展示如何使用推理 API 通过 Cohere 计算嵌入,并将其存储在 Elasticsearch 中,以便进行高效的向量或混合搜索。本教程将使用 Python Elasticsearch 客户端来执行操作。

您将学习如何:

  • 使用 Cohere 服务为文本嵌入创建推理端点,
  • 为 Elasticsearch 索引创建必要的索引映射,
  • 构建推理管道,将文档连同嵌入一起摄取到索引中,
  • 对数据执行混合搜索,
  • 使用 Cohere 的重新排序模型重新排序搜索结果,
  • 使用 Cohere 的 Chat API 设计 RAG 系统。

本教程使用 SciFact 数据集。

请参阅 Cohere 的教程,了解使用不同数据集的示例。

您还可以查看本教程的 Colab notebook 版本

要求

编辑
  • 需要付费的 Cohere 帐户才能使用 Cohere 服务的推理 API,因为 Cohere 免费试用 API 的使用受到限制,
  • 一个 Elastic Cloud 帐户,
  • Python 3.7 或更高版本。

安装所需软件包

编辑

安装 Elasticsearch 和 Cohere

!pip install elasticsearch
!pip install cohere

导入所需的软件包

from elasticsearch import Elasticsearch, helpers
import cohere
import json
import requests

创建 Elasticsearch 客户端

编辑

要创建 Elasticsearch 客户端,您需要:

ELASTICSEARCH_ENDPOINT = "elastic_endpoint"
ELASTIC_API_KEY = "elastic_api_key"

client = Elasticsearch(
  cloud_id=ELASTICSEARCH_ENDPOINT,
  api_key=ELASTIC_API_KEY
)

# Confirm the client has connected
print(client.info())

创建推理端点

编辑

首先,创建推理端点。在此示例中,推理端点使用 Cohere 的 embed-english-v3.0 模型,并且 embedding_type 设置为 byte

COHERE_API_KEY = "cohere_api_key"

client.inference.put_model(
    task_type="text_embedding",
    inference_id="cohere_embeddings",
    body={
        "service": "cohere",
        "service_settings": {
            "api_key": COHERE_API_KEY,
            "model_id": "embed-english-v3.0",
            "embedding_type": "byte"
        }
    },
)

您可以在 Cohere 仪表板的 API 密钥部分找到您的 API 密钥。

创建索引映射

编辑

为将包含嵌入的索引创建索引映射。

client.indices.create(
    index="cohere-embeddings",
    settings={"index": {"default_pipeline": "cohere_embeddings"}},
    mappings={
        "properties": {
            "text_embedding": {
                "type": "dense_vector",
                "dims": 1024,
                "element_type": "byte",
            },
            "text": {"type": "text"},
            "id": {"type": "integer"},
            "title": {"type": "text"}
        }
    },
)

创建推理管道

编辑

现在,您有了一个推理端点和一个准备好存储嵌入的索引。下一步是创建一个带有推理处理器摄取管道,该处理器将使用推理端点创建嵌入并将它们存储在索引中。

client.ingest.put_pipeline(
    id="cohere_embeddings",
    description="Ingest pipeline for Cohere inference.",
    processors=[
        {
            "inference": {
                "model_id": "cohere_embeddings",
                "input_output": {
                    "input_field": "text",
                    "output_field": "text_embedding",
                },
            }
        }
    ],
)

准备数据并插入文档

编辑

此示例使用您可以在 HuggingFace 上找到的 SciFact 数据集。

url = 'https://hugging-face.cn/datasets/mteb/scifact/raw/main/corpus.jsonl'

# Fetch the JSONL data from the URL
response = requests.get(url)
response.raise_for_status()  # Ensure noticing bad responses

# Split the content by new lines and parse each line as JSON
data = [json.loads(line) for line in response.text.strip().split('\n') if line]
# Now data is a list of dictionaries

# Change `_id` key to `id` as `_id` is a reserved key in Elasticsearch.
for item in data:
    if '_id' in item:
        item['id'] = item.pop('_id')

# Prepare the documents to be indexed
documents = []
for line in data:
    data_dict = line
    documents.append({
        "_index": "cohere-embeddings",
        "_source": data_dict,
        }
      )

# Use the bulk endpoint to index
helpers.bulk(client, documents)

print("Data ingestion completed, text embeddings generated!")

您的索引已填充 SciFact 数据和文本字段的文本嵌入。

混合搜索

编辑

让我们开始查询索引!

以下代码执行混合搜索。 kNN 查询使用 text_embedding 字段计算基于向量相似性的搜索结果的相关性,词法搜索查询使用 BM25 检索来计算 titletext 字段上的关键词相似性。

query = "What is biosimilarity?"

response = client.search(
    index="cohere-embeddings",
    size=100,
    knn={
        "field": "text_embedding",
        "query_vector_builder": {
            "text_embedding": {
                "model_id": "cohere_embeddings",
                "model_text": query,
            }
        },
        "k": 10,
        "num_candidates": 50,
    },
    query={
        "multi_match": {
            "query": query,
            "fields": ["text", "title"]
        }
    }
)

raw_documents = response["hits"]["hits"]

# Display the first 10 results
for document in raw_documents[0:10]:
  print(f'Title: {document["_source"]["title"]}\nText: {document["_source"]["text"]}\n')

# Format the documents for ranking
documents = []
for hit in response["hits"]["hits"]:
    documents.append(hit["_source"]["text"])
重新排序搜索结果
编辑

为了更有效地组合结果,请使用 Cohere 的 Rerank v3 模型通过推理 API 提供更精确的结果语义重排序。

使用您的 Cohere API 密钥和已使用的模型名称作为 model_id(在本例中为 rerank-english-v3.0)创建一个推理端点。

client.inference.put_model(
    task_type="rerank",
    inference_id="cohere_rerank",
    body={
        "service": "cohere",
        "service_settings":{
            "api_key": COHERE_API_KEY,
            "model_id": "rerank-english-v3.0"
           },
        "task_settings": {
            "top_n": 10,
        },
    }
)

使用新的推理端点重新排序结果。

# Pass the query and the search results to the service
response = client.inference.inference(
    inference_id="cohere_rerank",
    body={
        "query": query,
        "input": documents,
        "task_settings": {
            "return_documents": False
            }
        }
)

# Reconstruct the input documents based on the index provided in the rereank response
ranked_documents = []
for document in response.body["rerank"]:
  ranked_documents.append({
      "title": raw_documents[int(document["index"])]["_source"]["title"],
      "text": raw_documents[int(document["index"])]["_source"]["text"]
  })

# Print the top 10 results
for document in ranked_documents[0:10]:
  print(f"Title: {document['title']}\nText: {document['text']}\n")

响应是一个文档列表,按相关性降序排列。每个文档都有一个相应的索引,该索引反映了文档发送到推理端点时的顺序。

使用 Cohere 和 Elasticsearch 进行检索增强生成 (RAG)

编辑

RAG 是一种使用从外部数据源获取的附加信息生成文本的方法。有了排名后的结果,您可以使用 Cohere 的 Chat API 在之前创建的内容之上构建 RAG 系统。

传入检索到的文档和查询,以使用 Cohere 最新的生成模型 Command R+ 接收基础响应。

然后将查询和文档传递给 Chat API,并打印出响应。

response = co.chat(message=query, documents=ranked_documents, model='command-r-plus')

source_documents = []
for citation in response.citations:
    for document_id in citation.document_ids:
        if document_id not in source_documents:
            source_documents.append(document_id)

print(f"Query: {query}")
print(f"Response: {response.text}")
print("Sources:")
for document in response.documents:
    if document['id'] in source_documents:
        print(f"{document['title']}: {document['text']}")

响应将类似于以下内容

Query: What is biosimilarity?
Response: Biosimilarity is based on the comparability concept, which has been used successfully for several decades to ensure close similarity of a biological product before and after a manufacturing change. Over the last 10 years, experience with biosimilars has shown that even complex biotechnology-derived proteins can be copied successfully.
Sources:
Interchangeability of Biosimilars: A European Perspective: (...)