将 Cohere 与 Elasticsearch 结合使用
Elastic Stack 无服务器
本教程中的说明将向您展示如何使用推理 API 通过 Cohere 计算嵌入,并将它们存储在 Elasticsearch 中以进行高效的向量或混合搜索。本教程将使用 Python Elasticsearch 客户端来执行操作。
您将学习如何
- 使用 Cohere 服务为文本嵌入创建推理端点,
- 为 Elasticsearch 索引创建必要的索引映射,
- 构建一个推理管道,将文档连同嵌入一起提取到索引中,
- 对数据执行混合搜索,
- 通过使用 Cohere 的重新排序模型来重新排序搜索结果,
- 使用 Cohere 的 Chat API 设计一个 RAG 系统。
本教程使用 SciFact 数据集。
有关使用不同数据集的示例,请参阅 Cohere 的教程。
您还可以查看 此教程的 Colab notebook 版本。
- 需要一个付费的 Cohere 帐户才能将推理 API 与 Cohere 服务一起使用,因为 Cohere 免费试用 API 的使用受到限制。
- 一个 Elastic Cloud 帐户,
- Python 3.7 或更高版本。
安装 Elasticsearch 和 Cohere
!pip install elasticsearch
!pip install cohere
导入所需的软件包
from elasticsearch import Elasticsearch, helpers
import cohere
import json
import requests
要创建您的 Elasticsearch 客户端,您需要
ELASTICSEARCH_ENDPOINT = "elastic_endpoint"
ELASTIC_API_KEY = "elastic_api_key"
client = Elasticsearch(
cloud_id=ELASTICSEARCH_ENDPOINT,
api_key=ELASTIC_API_KEY
)
# Confirm the client has connected
print(client.info())
首先创建推理端点。在此示例中,推理端点使用 Cohere 的 embed-english-v3.0
模型,并且 embedding_type
设置为 byte
。
COHERE_API_KEY = "cohere_api_key"
client.inference.put_model(
task_type="text_embedding",
inference_id="cohere_embeddings",
body={
"service": "cohere",
"service_settings": {
"api_key": COHERE_API_KEY,
"model_id": "embed-english-v3.0",
"embedding_type": "byte"
}
},
)
您可以在 Cohere 仪表板中的 API 密钥部分找到您的 API 密钥。
为将包含嵌入的索引创建索引映射。
client.indices.create(
index="cohere-embeddings",
settings={"index": {"default_pipeline": "cohere_embeddings"}},
mappings={
"properties": {
"text_embedding": {
"type": "dense_vector",
"dims": 1024,
"element_type": "byte",
},
"text": {"type": "text"},
"id": {"type": "integer"},
"title": {"type": "text"}
}
},
)
现在,您有了一个推理端点和一个准备好存储嵌入的索引。下一步是使用 提取管道 和一个 推理处理器,该处理器将使用推理端点创建嵌入并将它们存储在索引中。
client.ingest.put_pipeline(
id="cohere_embeddings",
description="Ingest pipeline for Cohere inference.",
processors=[
{
"inference": {
"model_id": "cohere_embeddings",
"input_output": {
"input_field": "text",
"output_field": "text_embedding",
},
}
}
],
)
此示例使用您可以在 HuggingFace 上找到的 SciFact 数据集。
url = 'https://hugging-face.cn/datasets/mteb/scifact/raw/main/corpus.jsonl'
# Fetch the JSONL data from the URL
response = requests.get(url)
response.raise_for_status()
# Split the content by new lines and parse each line as JSON
data = [json.loads(line) for line in response.text.strip().split('\n') if line]
# Now data is a list of dictionaries
# Change `_id` key to `id` as `_id` is a reserved key in Elasticsearch.
for item in data:
if '_id' in item:
item['id'] = item.pop('_id')
# Prepare the documents to be indexed
documents = []
for line in data:
data_dict = line
documents.append({
"_index": "cohere-embeddings",
"_source": data_dict,
}
)
# Use the bulk endpoint to index
helpers.bulk(client, documents)
print("Data ingestion completed, text embeddings generated!")
- 确保注意到错误的响应
您的索引填充了 SciFact 数据和 text 字段的文本嵌入。
让我们开始查询索引!
下面的代码执行混合搜索。 kNN
查询使用 text_embedding
字段计算基于向量相似性的搜索结果的相关性,词法搜索查询使用 BM25 检索来计算 title
和 text
字段上的关键字相似性。
query = "What is biosimilarity?"
response = client.search(
index="cohere-embeddings",
size=100,
knn={
"field": "text_embedding",
"query_vector_builder": {
"text_embedding": {
"model_id": "cohere_embeddings",
"model_text": query,
}
},
"k": 10,
"num_candidates": 50,
},
query={
"multi_match": {
"query": query,
"fields": ["text", "title"]
}
}
)
raw_documents = response["hits"]["hits"]
# Display the first 10 results
for document in raw_documents[0:10]:
print(f'Title: {document["_source"]["title"]}\nText: {document["_source"]["text"]}\n')
# Format the documents for ranking
documents = []
for hit in response["hits"]["hits"]:
documents.append(hit["_source"]["text"])
为了更有效地组合结果,请使用 Cohere 的 Rerank v3 模型,通过推理 API 提供更精确的语义结果排序。
使用您的 Cohere API 密钥和用作 model_id
的模型名称 (在本示例中为 rerank-english-v3.0
) 创建一个推理端点。
client.inference.put_model(
task_type="rerank",
inference_id="cohere_rerank",
body={
"service": "cohere",
"service_settings":{
"api_key": COHERE_API_KEY,
"model_id": "rerank-english-v3.0"
},
"task_settings": {
"top_n": 10,
},
}
)
使用新的推理端点重新排序结果。
# Pass the query and the search results to the service
response = client.inference.rerank(
inference_id="cohere_rerank",
body={
"query": query,
"input": documents,
"task_settings": {
"return_documents": False
}
}
)
# Reconstruct the input documents based on the index provided in the rereank response
ranked_documents = []
for document in response.body["rerank"]:
ranked_documents.append({
"title": raw_documents[int(document["index"])]["_source"]["title"],
"text": raw_documents[int(document["index"])]["_source"]["text"]
})
# Print the top 10 results
for document in ranked_documents[0:10]:
print(f"Title: {document['title']}\nText: {document['text']}\n")
响应是按相关性降序排列的文档列表。每个文档都有一个相应的索引,反映了文档发送到推理端点时的顺序。
RAG 是一种使用从外部数据源获取的附加信息生成文本的方法。通过排名结果,您可以使用 Cohere 的 Chat API 在先前创建的内容之上构建一个 RAG 系统。
传入检索到的文档和查询,以使用 Cohere 最新的生成模型 Command R+ 接收基础响应。
然后将查询和文档传递到 Chat API,并打印出响应。
response = co.chat(message=query, documents=ranked_documents, model='command-r-plus')
source_documents = []
for citation in response.citations:
for document_id in citation.document_ids:
if document_id not in source_documents:
source_documents.append(document_id)
print(f"Query: {query}")
print(f"Response: {response.text}")
print("Sources:")
for document in response.documents:
if document['id'] in source_documents:
print(f"{document['title']}: {document['text']}")
响应将类似于以下内容
Query: What is biosimilarity?
Response: Biosimilarity is based on the comparability concept, which has been used successfully for several decades to ensure close similarity of a biological product before and after a manufacturing change. Over the last 10 years, experience with biosimilars has shown that even complex biotechnology-derived proteins can be copied successfully.
Sources:
Interchangeability of Biosimilars: A European Perspective: (...)