如何进行RAG
欢迎来到知识增强的魔法世界——RAG!🧠📚
1.为什么AI会说谎?
AI大模型本质是“概率生成器”,依赖训练数据的记忆。当问题超出其知识范围时,可能编造看似合理但错误的答案。
核心思想
纯生成式AI像“闭卷考试”,无法实时获取外部知识。
2.为什么需要RAG?
想象你有一个聪明的朋友,但他2021年后就没看过任何新闻和资料。无论他多聪明,问他2023年的事情,他也只能猜测或编造。这就是大语言模型的局限:它们的知识在训练截止日期就"冻结"了。
RAG解决了以下问题
- 知识时效性:获取最新信息和事实
- 专业领域知识:访问特定领域的专业知识
- 减少幻觉:引用实际文档而非"凭空想象"
- 信息透明度:能够提供信息来源和引用
- 私有数据访问:处理企业内部或私有资料
3.RAG是什么?
RAG全称是Retrieval-Augmented Generation,即检索增强生成,是一种结合了检索系统和生成模型的强大技术,能够让大语言模型访问外部知识库,从而生成更准确、更新、更专业的内容。
核心思想
让AI“开卷考试”——先查资料再回答,结合生成能力与实时检索能力。
4.RAG工作原理
RAG系统主要包含三个核心步骤:
- 检索(Retrieval):根据用户问题,从知识库中找出最相关的文档或片段
- 增强(Augmentation):将检索到的信息与用户问题一起发送给语言模型
- 生成(Generation):语言模型根据问题和检索到的信息生成回答
Example
场景:玩家询问一个游戏玩法
步骤1:检索(Retrieve)
-
玩家问题和知识库内容转化为向量。
-
通过向量相似度在向量知识库(存储游戏攻略、更新日志等)中检索和问题最相关的k个内容。
步骤2:生成(Generate)
-
将问题 + 检索到的知识上下文与提示词组合一起输入大模型
-
模型生成游戏玩法详解
5.构建简单的RAG应用程序
5.1 索引
从数据源提取数据并进行索引的管道。这通常在线下进行。
1.加载
首先需要加载博客文章内容。本例使用 WebBaseLoader,它是LangChain中的一个文档加载器,用于从网页加载数据。
import bs4
from langchain_community.document_loaders import WebBaseLoader
# Only keep post title, headers, and content from the full HTML.
bs4_strainer = bs4.SoupStrainer(class_=("post-title", "post-header", "post-content"))
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs={"parse_only": bs4_strainer},
)
docs = loader.load()
assert len(docs) == 1
print(f"Total characters: {len(docs[0].page_content)}")
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # chunk size (characters)
chunk_overlap=200, # chunk overlap (characters)
add_start_index=True, # track index in original document
)
all_splits = text_splitter.split_documents(docs)
print(f"Split blog post into {len(all_splits)} sub-documents.")
对这66个文本块进行索引,以便在运行时进行搜索。
from langchain_openai import OpenAIEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = InMemoryVectorStore(embeddings)
document_ids = vector_store.add_documents(documents=all_splits)
print(document_ids[:3])
结果:
['be2fadc8-bebd-462f-90a9-ed7045e22530', '8a318bfc-a44d-4d5d-8c49-1cb0ee2d9fe2', 'a8dab8c4-7177-4a2f-a520-3e6a6fcc6a33']
5.2 检索与生成
1.提示词
使用RAG提示词
2.使用LangGraph将检索和生成步骤整合到一个应用程序中
需定义三件事: 一是应用程序的状态; 二是应用步骤; 三是应用程序的“控制流”。
应用程序的状态:控制输入、数据怎么传递和输出
from langchain_core.documents import Document
from typing_extensions import List, TypedDict
class State(TypedDict):
question: str
context: List[Document]
answer: str
def retrieve(state: State):
retrieved_docs = vector_store.similarity_search(state["question"])
return {"context": retrieved_docs}
def generate(state: State):
docs_content = "\n\n".join(doc.page_content for doc in state["context"])
messages = prompt.invoke({"question": state["question"], "context": docs_content})
response = llm.invoke(messages)
return {"answer": response.content}
from langgraph.graph import START, StateGraph
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()
5.3 调用
import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"
from langchain_openai import ChatOpenAI
llm= ChatOpenAI(model="gpt-4.1-nano")
from langchain_openai import OpenAIEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = InMemoryVectorStore(embeddings)
import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(
class_=("post-content", "post-title", "post-header")
)
),
)
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)
_ = vector_store.add_documents(documents=all_splits)
prompt = hub.pull("rlm/rag-prompt")
class State(TypedDict):
question: str
context: List[Document]
answer: str
def retrieve(state: State):
retrieved_docs = vector_store.similarity_search(state["question"])
return {"context": retrieved_docs}
def generate(state: State):
docs_content = "\n\n".join(doc.page_content for doc in state["context"])
messages = prompt.invoke({"question": state["question"], "context": docs_content})
response = llm.invoke(messages)
return {"answer": response.content}
# Compile application and test
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()
response = graph.invoke({"question": "What is Task Decomposition?"})
print(response["answer"])
结果:
Task Decomposition is the process of breaking down complex tasks into smaller, manageable sub-steps or subgoals. It can be performed using simple prompting with LLMs, task-specific instructions, or external planning tools like classical planners with PDDL. This approach helps in organizing and solving complex problems more effectively.
6.RAG的高级优化技术
初级RAG主要在三个方面面临挑战
- 检索质量低
- 生成质量差
- 增强过程难
6.1 检索前处理
1.查询转换
查询转换是RAG系统中的一项高级技术,它通过以下方式提升检索质量:
- 复杂查询分解:将多部分问题拆解为独立的子查询
- 并行检索:同时执行多个简单查询提高效率
- 结果合成:将子查询结果合并生成最终答案
哪个框架在Github上,Langchain和mem0哪个star更多?
问题分解:“Langchain 在 Github 上有多少 star?”+“mem0在 Github 上有多少 star?”
结果展示
pip install rank_bm25
import asyncio
from typing import List, Dict
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"
class QueryTransformer:
def __init__(self, llm):
self.llm = llm
async def decompose_query(self, query: str) -> List[str]:
"""使用LLM分解复杂查询为子查询"""
decomposition_prompt = ChatPromptTemplate.from_template(
"将以下查询分解为可以独立回答的子查询。"
"每个子查询应该足够简单以便直接检索。"
"如果查询已经很简单,则无需分解。\n"
"查询: {query}\n"
"以JSON列表格式输出子查询,不要包含其他内容。"
"示例输出: [\"子查询1\", \"子查询2\"]"
)
chain = decomposition_prompt | self.llm | StrOutputParser()
result = await chain.ainvoke({"query": query})
try:
import json
sub_queries = json.loads(result)
if not sub_queries: # 如果LLM认为不需要分解
return [query]
return sub_queries
except:
# 如果解析失败,返回原始查询
return [query]
class ParallelRetriever:
def __init__(self, retriever: BaseRetriever):
self.retriever = retriever
async def retrieve_for_subquery(self, query: str) -> List[Document]:
"""为单个子查询执行检索"""
return await self.retriever.ainvoke(query)
async def parallel_retrieve(self, sub_queries: List[str]) -> Dict[str, List[Document]]:
"""并行执行所有子查询的检索"""
tasks = [self.retrieve_for_subquery(q) for q in sub_queries]
results = await asyncio.gather(*tasks)
return {query: docs for query, docs in zip(sub_queries, results)}
class AnswerSynthesizer:
def __init__(self, llm):
self.llm = llm
async def synthesize_answer(self, original_query: str, retrieval_results: Dict[str, List[Document]]) -> str:
"""基于原始查询和检索结果合成最终答案"""
synthesis_prompt = ChatPromptTemplate.from_template(
"你是一个专业的问答助手。请基于以下检索到的信息回答原始问题。\n"
"原始问题: {query}\n\n"
"检索到的信息:\n{context}\n\n"
"请给出一个完整、准确的答案。如果信息不足,请说明。"
)
# 将检索结果格式化为上下文字符串
context_str = ""
for sub_q, docs in retrieval_results.items():
context_str += f"子问题: {sub_q}\n"
context_str += "相关文档:\n" + "\n".join([d.page_content for d in docs]) + "\n\n"
chain = synthesis_prompt | self.llm | StrOutputParser()
return await chain.ainvoke({"query": original_query, "context": context_str})
class QueryTransformationRAG:
def __init__(self, retriever: BaseRetriever, llm):
self.query_transformer = QueryTransformer(llm)
self.parallel_retriever = ParallelRetriever(retriever)
self.answer_synthesizer = AnswerSynthesizer(llm)
async def answer_query(self, query: str) -> str:
"""完整的查询转换RAG流程"""
# 1. 查询分解
sub_queries = await self.query_transformer.decompose_query(query)
print(f"分解后的子查询: {sub_queries}")
# 2. 并行检索
retrieval_results = await self.parallel_retriever.parallel_retrieve(sub_queries)
print(f"检索结果: {retrieval_results}")
# 3. 答案合成
final_answer = await self.answer_synthesizer.synthesize_answer(query, retrieval_results)
return final_answer
# 使用示例
if __name__ == "__main__":
# 初始化组件 - 实际使用时需要替换为真实的retriever和LLM
from langchain_community.retrievers import BM25Retriever
from langchain_openai import ChatOpenAI
# 模拟文档库
documents = [
Document(page_content="LangChain has 109,000 stars on GitHub"),
Document(page_content="mem0 has 33,500 stars on GitHub"),
Document(page_content="LangChain is a popular framework for building LLM applications"),
Document(page_content="mem0 is an innovative memory optimization library designed for AI and machine learning applications.")
]
retriever = BM25Retriever.from_documents(documents)
llm= ChatOpenAI(model="gpt-4.1-nano")
# 创建RAG系统
rag_system = QueryTransformationRAG(retriever, llm)
# 示例查询
query = "哪个框架在Github上,Langchain和mem0哪个star更多?"
async def main():
answer = await rag_system.answer_query(query)
print("\n最终答案:")
print(answer)
# 在 Jupyter Notebook 中可以直接运行
await main()
6.2 索引优化
1.混合检索策略
结合多种检索方法以获得更好的结果:
from langchain.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
# BM25检索器 - 基于关键词匹配
bm25_retriever = BM25Retriever.from_documents(chunks)
bm25_retriever.k = 5
# 向量检索器 - 基于语义相似度
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# 集成检索器
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.5, 0.5]
)
docs = ensemble_retriever.get_relevant_documents(query)
import asyncio
from typing import List, Dict
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"
class QueryTransformer:
def __init__(self, llm):
self.llm = llm
async def decompose_query(self, query: str) -> List[str]:
"""使用LLM分解复杂查询为子查询"""
decomposition_prompt = ChatPromptTemplate.from_template(
"将以下查询分解为可以独立回答的子查询。"
"每个子查询应该足够简单以便直接检索。"
"如果查询已经很简单,则无需分解。\n"
"查询: {query}\n"
"以JSON列表格式输出子查询,不要包含其他内容。"
"示例输出: [\"子查询1\", \"子查询2\"]"
)
chain = decomposition_prompt | self.llm | StrOutputParser()
result = await chain.ainvoke({"query": query})
try:
import json
sub_queries = json.loads(result)
if not sub_queries: # 如果LLM认为不需要分解
return [query]
return sub_queries
except:
# 如果解析失败,返回原始查询
return [query]
class ParallelRetriever:
def __init__(self, retriever: BaseRetriever):
self.retriever = retriever
async def retrieve_for_subquery(self, query: str) -> List[Document]:
"""为单个子查询执行检索"""
return await self.retriever.ainvoke(query)
async def parallel_retrieve(self, sub_queries: List[str]) -> Dict[str, List[Document]]:
"""并行执行所有子查询的检索"""
tasks = [self.retrieve_for_subquery(q) for q in sub_queries]
results = await asyncio.gather(*tasks)
return {query: docs for query, docs in zip(sub_queries, results)}
class AnswerSynthesizer:
def __init__(self, llm):
self.llm = llm
async def synthesize_answer(self, original_query: str, retrieval_results: Dict[str, List[Document]]) -> str:
"""基于原始查询和检索结果合成最终答案"""
synthesis_prompt = ChatPromptTemplate.from_template(
"你是一个专业的问答助手。请基于以下检索到的信息回答原始问题。\n"
"原始问题: {query}\n\n"
"检索到的信息:\n{context}\n\n"
"请给出一个完整、准确的答案。如果信息不足,请说明。"
)
# 将检索结果格式化为上下文字符串
context_str = ""
for sub_q, docs in retrieval_results.items():
context_str += f"子问题: {sub_q}\n"
context_str += "相关文档:\n" + "\n".join([d.page_content for d in docs]) + "\n\n"
chain = synthesis_prompt | self.llm | StrOutputParser()
return await chain.ainvoke({"query": original_query, "context": context_str})
class QueryTransformationRAG:
def __init__(self, retriever: BaseRetriever, llm):
self.query_transformer = QueryTransformer(llm)
self.parallel_retriever = ParallelRetriever(retriever)
self.answer_synthesizer = AnswerSynthesizer(llm)
async def answer_query(self, query: str) -> str:
"""完整的查询转换RAG流程"""
# 1. 查询分解
sub_queries = await self.query_transformer.decompose_query(query)
print(f"分解后的子查询: {sub_queries}")
# 2. 并行检索
retrieval_results = await self.parallel_retriever.parallel_retrieve(sub_queries)
print(f"检索结果: {retrieval_results}")
# 3. 答案合成
final_answer = await self.answer_synthesizer.synthesize_answer(query, retrieval_results)
return final_answer
# 正确的占位符向量检索器实现
class PlaceholderVectorRetriever(BaseRetriever):
def _get_relevant_documents(self, query: str) -> List[Document]:
# 这里返回空列表作为占位符
return []
# 使用示例
if __name__ == "__main__":
# 初始化组件
from langchain_community.retrievers import BM25Retriever
from langchain_openai import ChatOpenAI
# 模拟文档库
documents = [
Document(page_content="LangChain has 109,000 stars on GitHub"),
Document(page_content="mem0 has 33,500 stars on GitHub"),
Document(page_content="LangChain is a popular framework for building LLM applications"),
Document(page_content="mem0 is an innovative memory optimization library designed for AI and machine learning applications.")
]
# 创建BM25检索器
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 5
# 创建占位符向量检索器
vector_retriever = PlaceholderVectorRetriever()
# 创建集成检索器
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.5, 0.5] # 可以根据需要调整权重
)
llm = ChatOpenAI(model="gpt-4.1-nano")
# 创建RAG系统
rag_system = QueryTransformationRAG(ensemble_retriever, llm)
# 示例查询
query = "哪个框架在Github上,Langchain和mem0哪个star更多?"
async def main():
answer = await rag_system.answer_query(query)
print("\n最终答案:")
print(answer)
# 在 Jupyter Notebook 中可以直接运行
await main()
2.分层检索(Parent-Child Chunking)
通过保持文档的层次结构来提高检索质量:
from langchain.retrievers import ParentDocumentRetriever
# 创建父子检索器
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
parent_splitter=parent_splitter,
child_splitter=child_splitter
)
docs = parent_retriever.get_relevant_documents(query)
6.3 检索后处理
1.再排序(Re-ranking)
检索到的文档可能不是最相关的,再排序可以改进结果:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank
# 使用Cohere的再排序API
compressor = CohereRerank()
reranking_retriever = ContextualCompressionRetriever(
base_retriever=vectorstore.as_retriever(search_kwargs={"k": 10}),
base_compressor=compressor
)
docs = reranking_retriever.get_relevant_documents(query)
2.检索增强提示工程
通过更好的提示模板来引导模型如何使用检索到的信息:
from langchain.prompts import PromptTemplate
template = """
你是一位专业的客服代表。使用以下检索到的文档片段来回答客户问题。
始终保持礼貌和专业,如果检索的文档中没有相关信息,请诚实地说明你不知道,不要编造信息。
文档:
{context}
客户问题: {question}
请按照以下格式回答:
1. 简明直接地回答问题
2. 如果适用,引用文档中的相关政策或规定
3. 如果客户需要更多帮助,提供他们可以联系的部门或人员
"""
PROMPT = PromptTemplate(
template=template,
input_variables=["context", "question"]
)
7.评估RAG系统
建立RAG系统后,评估其性能至关重要:
问题: 哪个框架在Github上,Langchain和mem0哪个star更多?
参考答案: LangChain的Star数明显多于mem0
系统回答: 根据提供的信息,LangChain在GitHub上的star数是109,000,而mem0的star数是33,500。因此,LangChain在GitHub上的star数更多。
GRADE: CORRECT
import asyncio
from typing import List, Dict
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"
class QueryTransformer:
def __init__(self, llm):
self.llm = llm
async def decompose_query(self, query: str) -> List[str]:
"""使用LLM分解复杂查询为子查询"""
decomposition_prompt = ChatPromptTemplate.from_template(
"将以下查询分解为可以独立回答的子查询。"
"每个子查询应该足够简单以便直接检索。"
"如果查询已经很简单,则无需分解。\n"
"查询: {query}\n"
"以JSON列表格式输出子查询,不要包含其他内容。"
"示例输出: [\"子查询1\", \"子查询2\"]"
)
chain = decomposition_prompt | self.llm | StrOutputParser()
result = await chain.ainvoke({"query": query})
try:
import json
sub_queries = json.loads(result)
if not sub_queries: # 如果LLM认为不需要分解
return [query]
return sub_queries
except:
# 如果解析失败,返回原始查询
return [query]
class ParallelRetriever:
def __init__(self, retriever: BaseRetriever):
self.retriever = retriever
async def retrieve_for_subquery(self, query: str) -> List[Document]:
"""为单个子查询执行检索"""
return await self.retriever.ainvoke(query)
async def parallel_retrieve(self, sub_queries: List[str]) -> Dict[str, List[Document]]:
"""并行执行所有子查询的检索"""
tasks = [self.retrieve_for_subquery(q) for q in sub_queries]
results = await asyncio.gather(*tasks)
return {query: docs for query, docs in zip(sub_queries, results)}
class AnswerSynthesizer:
def __init__(self, llm):
self.llm = llm
async def synthesize_answer(self, original_query: str, retrieval_results: Dict[str, List[Document]]) -> str:
"""基于原始查询和检索结果合成最终答案"""
synthesis_prompt = ChatPromptTemplate.from_template(
"你是一个专业的问答助手。请基于以下检索到的信息回答原始问题。\n"
"原始问题: {query}\n\n"
"检索到的信息:\n{context}\n\n"
"请给出一个完整、准确的答案。如果信息不足,请说明。"
)
# 将检索结果格式化为上下文字符串
context_str = ""
for sub_q, docs in retrieval_results.items():
context_str += f"子问题: {sub_q}\n"
context_str += "相关文档:\n" + "\n".join([d.page_content for d in docs]) + "\n\n"
chain = synthesis_prompt | self.llm | StrOutputParser()
return await chain.ainvoke({"query": original_query, "context": context_str})
class QueryTransformationRAG:
def __init__(self, retriever: BaseRetriever, llm):
self.query_transformer = QueryTransformer(llm)
self.parallel_retriever = ParallelRetriever(retriever)
self.answer_synthesizer = AnswerSynthesizer(llm)
async def answer_query(self, query: str) -> str:
"""完整的查询转换RAG流程"""
# 1. 查询分解
sub_queries = await self.query_transformer.decompose_query(query)
print(f"分解后的子查询: {sub_queries}")
# 2. 并行检索
retrieval_results = await self.parallel_retriever.parallel_retrieve(sub_queries)
print(f"检索结果: {retrieval_results}")
# 3. 答案合成
final_answer = await self.answer_synthesizer.synthesize_answer(query, retrieval_results)
return final_answer
# 准备评估数据
eval_data = [
{"query": "哪个框架在Github上,Langchain和mem0哪个star更多?", "answer": "LangChain的Star数明显多于mem0"},
]
# 初始化RAG系统
retriever = BM25Retriever.from_documents([
Document(page_content="LangChain has 109,000 stars on GitHub"),
Document(page_content="mem0 has 33,500 stars on GitHub"),
Document(page_content="LangChain is a popular framework for building LLM applications"),
Document(page_content="mem0 is an innovative memory optimization library designed for AI and machine learning applications.")
])
llm = ChatOpenAI(model="gpt-4.1-nano")
rag_system = QueryTransformationRAG(retriever, llm)
# 获取RAG系统的回答(异步调用)
async def get_rag_predictions(eval_data):
predictions = []
for example in eval_data:
result = await rag_system.answer_query(example["query"])
predictions.append({"query": example["query"], "result": result})
return predictions
# 主评估函数
async def main():
# 获取预测结果
predictions = await get_rag_predictions(eval_data)
# 创建评估链
eval_llm = ChatOpenAI(model="gpt-4.1-nano")
eval_chain = QAEvalChain.from_llm(eval_llm)
# 评估结果
graded_outputs = eval_chain.evaluate(
eval_data,
predictions,
question_key="query",
prediction_key="result",
answer_key="answer"
)
# 打印评估结果
for i, grade in enumerate(graded_outputs):
print(f"问题: {eval_data[i]['query']}")
print(f"参考答案: {eval_data[i]['answer']}")
print(f"系统回答: {predictions[i]['result']}")
# 直接访问 'results' 键
print(f" {grade['results']}")
print("---")
# 运行评估
await main()
8. 数据库句子嵌入与可视化
在实际应用中,RAG的一个重要任务是对用户输入的句子进行语义理解,以便匹配到适合的类别或意图。例如,在智能客服场景中,我们可能需要将用户的问题归类到预定义的类别中,以便进一步处理。以下是一个基于句子嵌入的流程,包含从数据加载到可视化的完整过程。
8.1 数据加载与预处理
第一步是加载意图识别数据库中的句子数据,并对句子进行清理和预处理。这是整个流程的基础,目的是确保数据质量和一致性,使后续的嵌入生成和分析更加准确。
为什么需要数据预处理?
数据库中的句子可能包含特殊字符、空值、格式错误等问题。如果不进行预处理,这些问题可能会影响嵌入的质量和分类的准确性。
数据加载与预处理代码示例
以下是加载数据集和清理句子的代码
# 数据集文件名(替换成自己的数据集)
dataset_names = []
# 拼接数据集
def load_and_merge_datasets(dataset_names):
dataframes = []
for dataset_name in dataset_names:
df = pd.read_csv(f"{dataset_name}_dataset.csv")
dataframes.append(df)
merged_df = pd.concat(dataframes, ignore_index=True)
return merged_df
# 预处理句子
def preprocess_sentence(sentence):
if not isinstance(sentence, str):
sentence = str(sentence) if pd.notna(sentence) else ""
sentence = re.sub(r"\[.*?\]", "", sentence)
sentence = re.sub(r"[^a-zA-Zа-яА-Я0-9\s]", "", sentence)
return sentence.strip()
8.2 生成句子嵌入
第二步是通过预训练模型将每个句子转换为向量表示(嵌入)。这种嵌入捕捉了句子的语义信息,让我们可以通过数学方法比较句子之间的相似性。
什么是句子嵌入
句子嵌入是一种将文本转换为固定长度向量的技术。这些向量可以表示句子的语义,类似于将语言转化为机器可以理解的数学形式。
使用预训练模型生成嵌入
以下代码使用sentence-transformers库生成句子嵌入:
from transformers import AutoTokenizer, AutoModel
import torch
# 加载模型和分词器
model_name = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name).to(device)
# 生成嵌入
def generate_embeddings(sentences, batch_size=32):
all_embeddings = []
for i in range(0, len(sentences), batch_size):
batch = sentences[i:i+batch_size]
inputs = tokenizer(batch, return_tensors="pt", padding=True, truncation=True).to(device)
with torch.no_grad():
outputs = model(**inputs)
embeddings = outputs.last_hidden_state.mean(dim=1).cpu().numpy()
all_embeddings.append(embeddings)
return np.vstack(all_embeddings)
8.3 类别匹配
第三步是将句子分配到预定义的类别。我们为每个类别生成一个嵌入向量,然后计算每个句子与类别之间的相似度,选择最匹配的类别。
为什么需要类别匹配?
类别匹配的目的是将用户输入的句子归类到具体的意图或主题中。例如,将“如何充值游戏币?”归类到“充值相关”类别。
类别匹配代码示例
以下代码展示如何生成类别嵌入并匹配句子:
# 生成类别嵌入
def compute_category_embeddings(categories):
category_embeddings = {}
for category in categories:
descriptive_text = f"Category {category}"
category_embeddings[category] = generate_embeddings([descriptive_text])[0]
return category_embeddings
# 分配类别
def assign_categories(embeddings, category_embeddings):
assigned_categories = []
for embedding in embeddings:
distances = {category: np.dot(embedding, category_embeddings[category]) /
(np.linalg.norm(embedding) * np.linalg.norm(category_embeddings[category]))
for category in category_embeddings}
closest_category = max(distances, key=distances.get)
assigned_categories.append(closest_category)
return assigned_categories
8.4 降维与聚类
为了更直观地观察句子嵌入和类别之间的关系,我们可以对嵌入向量进行降维处理(如使用t-SNE或PCA),并对类别嵌入进行聚类分析。
为什么需要降维与聚类?
句子嵌入通常是高维向量(例如512维)。降维可以将这些高维数据映射到2D或3D空间,方便可视化。聚类可以帮助我们发现类别之间的关系或模式。
降维与聚类代码示例
以下代码展示如何实现降维和聚类:
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
# 降维处理
def reduce_dimensions(embeddings, method="tsne", n_components=2):
if method == "tsne":
reducer = TSNE(n_components=n_components, perplexity=30, learning_rate=200, random_state=42)
elif method == "pca":
reducer = PCA(n_components=n_components)
else:
raise ValueError("Unsupported reduction method. Use 'tsne' or 'pca'.")
return reducer.fit_transform(embeddings)
# 聚类
def cluster_embeddings(reduced_embeddings, n_clusters=5):
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
return kmeans.fit_predict(reduced_embeddings)
8.5 可视化
最后,我们将降维后的嵌入向量进行可视化,展示句子与类别之间的关系,或类别点的聚类结果。
可视化句子与类别关系(2D)
以下代码展示如何绘制句子与类别的关系图:
import matplotlib.pyplot as plt
# 可视化句子与类别的关系
def visualize_sentence_category_relationship_2d(sentence_embeddings, category_embeddings, assigned_categories, categories):
all_embeddings = np.vstack([sentence_embeddings] + [category_embeddings[cat] for cat in categories])
labels = assigned_categories + categories
reduced_embeddings = reduce_dimensions(all_embeddings, method="tsne", n_components=2)
sentence_points = reduced_embeddings[:len(sentence_embeddings)]
category_points = reduced_embeddings[len(sentence_embeddings):]
colors = plt.cm.tab20.colors
plt.figure(figsize=(12, 8))
for i, category in enumerate(categories):
indices = [j for j, assigned_category in enumerate(assigned_categories) if assigned_category == category]
plt.scatter(sentence_points[indices, 0], sentence_points[indices, 1],
c=[colors[i]], alpha=0.6, label=f"Category: {category}")
plt.scatter(category_points[i, 0], category_points[i, 1],
c=[colors[i]], alpha=1.0, s=100, edgecolor="black")
plt.title("Sentence Embeddings and Closest Categories (2D)")
plt.legend(loc="upper right")
plt.xlabel("Dimension 1")
plt.ylabel("Dimension 2")
plt.show()
#模拟数据生成:
#创建3个文本类别:体育、技术、政治
#为每个类别生成10维的嵌入向量(作为类别中心)
#围绕每个类别中心生成20个带噪声的句子嵌入(共60个句子)
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
# 模拟降维函数
def reduce_dimensions(embeddings, method="tsne", n_components=2):
tsne = TSNE(n_components=n_components, random_state=42)
return tsne.fit_transform(embeddings)
# 可视化函数
def visualize_sentence_category_relationship_2d(sentence_embeddings, category_embeddings, assigned_categories, categories):
all_embeddings = np.vstack([sentence_embeddings] + [category_embeddings[cat] for cat in categories])
labels = assigned_categories + categories
reduced_embeddings = reduce_dimensions(all_embeddings, method="tsne", n_components=2)
sentence_points = reduced_embeddings[:len(sentence_embeddings)]
category_points = reduced_embeddings[len(sentence_embeddings):]
colors = plt.cm.tab20.colors
plt.figure(figsize=(12, 8))
for i, category in enumerate(categories):
# 绘制属于该类别的句子点
indices = [j for j, assigned_category in enumerate(assigned_categories) if assigned_category == category]
plt.scatter(sentence_points[indices, 0], sentence_points[indices, 1],
c=[colors[i]], alpha=0.6, label=f"Category: {category}")
# 绘制类别中心点
plt.scatter(category_points[i, 0], category_points[i, 1],
c=[colors[i]], alpha=1.0, s=200, edgecolor="black", marker="X")
plt.title("Sentence Embeddings and Category Centers (2D T-SNE Projection)")
plt.legend(loc="best")
plt.xlabel("Dimension 1")
plt.ylabel("Dimension 2")
plt.grid(alpha=0.2)
plt.show()
# 生成模拟数据
np.random.seed(42)
# 定义3个类别
categories = ["Sports", "Technology", "Politics"]
# 生成类别嵌入 (3个10维向量)
category_embeddings = {
"Sports": np.random.normal(1.0, 0.1, 10),
"Technology": np.random.normal(2.0, 0.2, 10),
"Politics": np.random.normal(0.5, 0.3, 10)
}
# 生成句子嵌入 (每个类别生成20个句子)
sentence_embeddings = []
assigned_categories = []
for category in categories:
for _ in range(20):
# 在类别中心附近添加随机噪声
base = category_embeddings[category]
noise = np.random.normal(0, 0.2, 10)
sentence_embeddings.append(base + noise)
assigned_categories.append(category)
sentence_embeddings = np.array(sentence_embeddings)
# 执行可视化
visualize_sentence_category_relationship_2d(
sentence_embeddings,
category_embeddings,
assigned_categories,
categories
)

可视化句子与类别关系(3D)
以下代码展示如何绘制类别嵌入的聚类结果:
from mpl_toolkits.mplot3d import Axes3D
# 可视化类别点的聚类结果(3D)
def visualize_clusters_3d(category_points, cluster_labels, categories):
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
colors = plt.cm.tab20.colors for i, category in enumerate(categories):
ax.scatter(category_points[i, 0], category_points[i, 1], category_points[i, 2],
c=[colors[cluster_labels[i]]], marker='X', s=200, label=f"Category: {category}")
ax.set_title("Category Embedding Clusters (3D)")
ax.set_xlabel("Dimension 1")
ax.set_ylabel("Dimension 2")
ax.set_zlabel("Dimension 3")
ax.legend(loc="upper right")
plt.show()
# 数据模拟:
# 创建了10个文本类别(如体育、科技、政治等)
# 将类别分为3个语义相关的簇:
#科技簇(技术/科学/教育)
#生活簇(健康/食物/旅行)
#综合簇(体育/娱乐/商业/政治)
#为每个类别生成三维坐标(添加随机噪声模拟真实分布)
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# 可视化函数
def visualize_clusters_3d(category_points, cluster_labels, categories):
fig = plt.figure(figsize=(12, 10))
ax = fig.add_subplot(111, projection='3d')
colors = plt.cm.tab10.colors # 使用10种不同颜色
# 为每个类别绘制点
for i, category in enumerate(categories):
ax.scatter(category_points[i, 0], category_points[i, 1], category_points[i, 2],
c=[colors[cluster_labels[i]]],
marker='X',
s=200,
edgecolor='black',
depthshade=True,
label=f"{category} (Cluster {cluster_labels[i]})")
# 添加连接线(相同簇的点之间)
for cluster_id in set(cluster_labels):
cluster_indices = [i for i, label in enumerate(cluster_labels) if label == cluster_id]
# 绘制簇中心
cluster_center = np.mean(category_points[cluster_indices], axis=0)
ax.scatter(cluster_center[0], cluster_center[1], cluster_center[2],
c=[colors[cluster_id]],
marker='o',
s=300,
alpha=0.3)
# 连接簇内点
for i in cluster_indices:
ax.plot([category_points[i, 0], cluster_center[0]],
[category_points[i, 1], cluster_center[1]],
[category_points[i, 2], cluster_center[2]],
c=colors[cluster_id], alpha=0.3, linestyle='--')
ax.set_title("3D Visualization of Category Clusters", fontsize=16)
ax.set_xlabel("Dimension 1", fontsize=12)
ax.set_ylabel("Dimension 2", fontsize=12)
ax.set_zlabel("Dimension 3", fontsize=12)
# 调整图例位置避免遮挡
ax.legend(loc="upper left", bbox_to_anchor=(0.05, 0.95))
# 添加网格和美化
ax.xaxis.pane.fill = False
ax.yaxis.pane.fill = False
ax.zaxis.pane.fill = False
ax.grid(True, linestyle='--', alpha=0.4)
plt.tight_layout()
plt.show()
# 生成模拟数据
np.random.seed(42)
# 定义类别名称
categories = ["Sports", "Technology", "Politics",
"Health", "Entertainment", "Education",
"Business", "Science", "Travel", "Food"]
# 生成类别点 (10个类别,每个类别有3维坐标)
category_points = np.zeros((10, 3))
# 创建三个明显的聚类簇
# 簇0:科技相关
category_points[0] = [1.2, 0.8, 2.5] # Technology
category_points[1] = [1.5, 1.0, 2.3] # Science
category_points[2] = [0.9, 0.7, 2.7] # Education
# 簇1:生活相关
category_points[3] = [3.0, 4.2, 1.0] # Health
category_points[4] = [3.5, 4.5, 0.8] # Food
category_points[5] = [2.8, 3.9, 1.2] # Travel
# 簇2:其他
category_points[6] = [0.5, 2.5, 0.5] # Sports
category_points[7] = [0.3, 2.2, 0.7] # Entertainment
category_points[8] = [0.6, 2.8, 0.3] # Business
category_points[9] = [0.4, 2.0, 0.6] # Politics
# 添加随机噪声使分布更自然
category_points += np.random.normal(0, 0.15, category_points.shape)
# 定义聚类标签 (3个簇)
cluster_labels = [0, 0, 0, # Technology, Science, Education
1, 1, 1, # Health, Food, Travel
2, 2, 2, 2] # Sports, Entertainment, Business, Politics
# 执行可视化
visualize_clusters_3d(category_points, cluster_labels, categories)

8.6 总结
通过上述步骤,我们实现了从句子嵌入生成到类别匹配,再到降维与可视化的完整流程。这种方法不仅帮助我们理解句子与类别之间的语义关系,还为后续的分类或意图识别任务提供了技术基础。
9. 常见问题与解决方案
问题 | 可能原因 | 解决方案 |
---|---|---|
检索结果不相关 | 文本块太大或太小 | 调整chunk_size,通常500-1500字符较适合 |
回答包含错误信息 | 检索内容不足或模型幻觉 | 增加检索文档数量,改进提示以减少幻觉 |
系统响应速度慢 | 检索或嵌入计算开销大 | 使用批处理,缓存结果,优化向量数据库 |
无法处理特定领域问题 | 领域知识不足 | 添加更多领域专业文档,使用领域微调模型 |
回答格式不一致 | 提示模板不够明确 | 改进提示模板,明确指定输出格式 |
10. 最佳实践
- 文档处理:确保文档质量好,删除无关内容,标准化格式
- 文本分块:根据内容性质调整分块大小和重叠程度
- 检索策略:实验不同的k值和检索类型(相似度、MMR等)
- 动态系统:定期更新知识库以保持信息最新
- 引用来源:让回答包含信息来源,提高透明度和可信度
- 人机协作:对于关键应用,加入人类审核环节
11. 下一步
完成本章后,你已经掌握了RAG的基本原理和实现方法。接下来,你可以:
- 探索更先进的向量数据库(如Pinecone、Weaviate)
- 学习基于多查询技术的RAG,提高召回率
- 尝试结合Agent和RAG,创建能自主搜索信息的系统
- 构建领域专用的RAG系统,如法律、医疗或金融顾问
祝你在RAG的世界里找到知识的力量!🚀