第七章:mcp高级Server能力——构建RAG系统
摘要: RAG(检索增强生成)技术通过结合大型语言模型的生成能力与外部知识库的检索能力,为AI赋予"长期记忆"。其核心流程包括:1)索引文档并向量化存储;2)检索与问题最相关的文档块;3)基于上下文生成精准回答。本章将构建一个RAG服务器,使用LangChain工具链实现文档加载、分块、嵌入(采用all-MiniLM-L6-v2模型)和FAISS向量存储,最终通过GPT-4生成
7.1 让AI拥有“长期记忆”:RAG简介
到目前为止,我们的AI助理已经能够访问文件、调用API和查询数据库。但这些交互在某种程度上是被动的。当我们问“总结一下attention_is_all_you_need.md”时,它只是读取整个文件。如果我们有一个包含数百篇研究论文的文件夹,AI要如何高效地找到与我们问题最相关的几篇,并基于它们的内容进行回答呢?
这就是**检索增强生成(Retrieval-Augmented Generation, RAG)**技术大显身手的舞台。RAG是一种将大型语言模型 (LLM) 的强大生成能力与外部知识库的精准检索能力相结合的架构。它赋予了AI一种“长期记忆”和“开卷考试”的能力。
RAG的核心思想:
- 索引 (Indexing):预先将大量的文档(如PDF、Markdown、网页)分割成小块(chunks),然后使用一个嵌入模型 (Embedding Model) 将每个小块转换成一个高维向量(vector)。这些向量捕捉了文本块的语义含义。最后,将这些向量和它们对应的原文存储在一个向量数据库 (Vector Database) 中。
- 检索 (Retrieval):当用户提出一个问题时,首先使用同一个嵌入模型将问题也转换成一个向量。
- 搜索 (Search):在向量数据库中,搜索与问题向量最相似的文档向量。这些最相似的文档块就是与问题最相关的内容。
- 增强 (Augmentation):将检索到的这些相关文档块作为上下文 (context),与原始问题一起,构建一个新的、更丰富的提示 (prompt)。
- 生成 (Generation):将这个增强后的提示发送给LLM,让它基于提供的上下文来生成最终的、精准的答案。
在本章中,我们将构建一个RAG MCP Server。这个Server将把上述复杂的RAG流程,封装成一个简单的rag/query工具。AI Host只需要调用这个工具并传入问题,就能获得基于私有知识库的智能回答,而完全无需关心背后发生了什么——文档切分、向量化、相似度搜索等等。
7.2 项目准备与RAG架构
我们将构建一个全新的RAG Server,它可以索引一个指定的文档目录,并提供问答服务。
7.2.1 安装必要的库
RAG系统需要一些专门的库。我们将使用langchain来简化文档处理和RAG流程的构建,faiss-cpu作为我们的本地向量数据库,以及sentence-transformers来获取一个高质量的开源嵌入模型。
# 确保你位于ai-research-assistant目录下并激活了虚拟环境
cd ai-research-assistant
# 安装RAG相关的库
pip install langchain faiss-cpu sentence-transformers
# 创建RAG Server目录
mkdir rag_server
# 创建一个用于存放知识库文档的目录
mkdir -p rag_server/knowledge_base
# 创建一些示例文档
echo "The Model Context Protocol (MCP) is an open standard for AI context. It uses JSON-RPC." > rag_server/knowledge_base/mcp_overview.md
echo "RAG stands for Retrieval-Augmented Generation. It combines retrieval with generation to provide accurate answers from a knowledge base." > rag_server/knowledge_base/rag_intro.md
echo "A vector database stores data as high-dimensional vectors, enabling efficient similarity search." > rag_server/knowledge_base/vector_db.md
# 创建RAG Server的主程序文件
touch rag_server/main.py
7.2.2 RAG Server架构设计
我们的rag_server在启动时,会执行一次性的索引过程,然后持续监听来自Host的查询请求。
7.3 构建RAG MCP Server (rag_server/main.py)
这个Server的代码会比之前的复杂一些,因为它包含了完整的RAG流程的实现。
# rag_server/main.py
import asyncio
import json
import logging
import os
from typing import Dict, Any, List
# LangChain组件
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_text_splitters import CharacterTextSplitter
from langchain_openai import ChatOpenAI
KNOWLEDGE_BASE_DIR = "knowledge_base"
class RagServer:
def __init__(self, knowledge_dir: str, openai_api_key: str):
self.knowledge_dir = knowledge_dir
self.qa_chain = None
self.openai_api_key = openai_api_key
logging.basicConfig(level=logging.INFO, format='%(asctime)s - RAG_SERVER - %(levelname)s - %(message)s')
async def initialize(self):
"""执行一次性的索引过程"""
logging.info("Initializing RAG server...")
# 1. 加载文档
logging.info(f"Loading documents from '{self.knowledge_dir}'...")
loader = DirectoryLoader(self.knowledge_dir, glob="**/*.md", loader_cls=TextLoader)
documents = loader.load()
if not documents:
logging.warning("No documents found in knowledge base. RAG will not be effective.")
return
# 2. 切分文档
logging.info("Splitting documents into chunks...")
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(documents)
# 3. 设置嵌入模型 (使用开源模型)
logging.info("Loading embedding model...")
embeddings = HuggingFaceEmbeddings(model_name='all-MiniLM-L6-v2')
# 4. 创建向量数据库
logging.info("Creating vector store...")
vector_store = FAISS.from_documents(texts, embeddings)
# 5. 创建RAG链
logging.info("Creating RAG chain...")
llm = ChatOpenAI(openai_api_key=self.openai_api_key, model_name="gpt-4")
self.qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vector_store.as_retriever()
)
logging.info("RAG server initialized successfully.")
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
try:
if method == "project/listTools":
result = self._list_tools()
elif method == "project/executeTool":
tool_name = params.get("name")
tool_params = params.get("parameters", {})
if tool_name == "rag/query":
result = self._query(tool_params.get("question"))
else:
raise ValueError(f"Tool '{tool_name}' not found.")
else:
raise NotImplementedError(f"Method '{method}' not supported.")
return {"jsonrpc": "2.0", "id": request_id, "result": result}
except Exception as e:
logging.error(f"Error handling request: {e}", exc_info=True)
return {"jsonrpc": "2.0", "id": request_id, "error": {"code": -32000, "message": str(e)}}
def _list_tools(self) -> List[Dict[str, Any]]:
return [
{
"name": "rag/query",
"description": "Answers a question based on the internal knowledge base.",
"parameters": [
{"name": "question", "type": "string", "required": True, "description": "The question to ask."}
]
}
]
def _query(self, question: str) -> Dict[str, Any]:
if not self.qa_chain:
raise RuntimeError("RAG chain is not initialized. Is the knowledge base empty?")
if not question:
raise ValueError("Question cannot be empty.")
logging.info(f"Executing RAG query: '{question}'")
# LangChain的RAG链封装了所有步骤
response = self.qa_chain.invoke({"query": question})
return {
"result": {"answer": response.get('result')},
"stdout": f"Successfully answered question.",
"stderr": None
}
async def main():
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
print("Error: OPENAI_API_KEY environment variable not set.", file=sys.stderr)
return
server = RagServer(KNOWLEDGE_BASE_DIR, api_key)
await server.initialize() # 在开始监听前,先完成索引
# main通信循环与之前的Server完全相同
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await asyncio.get_running_loop().connect_read_pipe(lambda: protocol, asyncio.get_event_loop()._default_reader)
writer = asyncio.StreamWriter(protocol, asyncio.get_event_loop(), None, None)
logging.info("RAG Server started. Waiting for requests...")
while True:
line = await reader.readline()
if not line:
break
request = json.loads(line.decode('utf-8'))
response = await server.handle_request(request)
response_line = json.dumps(response) + '\n'
sys.stdout.write(response_line)
await sys.stdout.flush()
if __name__ == "__main__":
import sys
sys.stderr = sys.stdout
asyncio.run(main())
7.3.1 代码解析
-
initialize方法: 这是本Server的特色。它在服务启动前执行一次,负责构建RAG的核心——qa_chain。它使用langchain库优雅地完成了加载、切分、嵌入和存储的全过程。这确保了在处理查询请求时,向量数据库已经准备就绪,可以实现快速检索。 -
开源嵌入模型: 我们使用了
HuggingFaceEmbeddings和all-MiniLM-L6-v2模型。这是一个轻量级但效果很好的开源模型,可以在本地运行,避免了将我们的私有文档发送到外部API进行向量化的需要,增强了数据隐私性。 -
_query方法: 这个方法的核心非常简单:self.qa_chain.invoke(...)。langchain的RetrievalQA链将RAG的“检索-增强-生成”三步曲封装得天衣无缝。我们只需要调用它,就能得到最终的答案。 -
依赖注入:
RagServer的构造函数接收openai_api_key。这是一种良好的实践,将密钥管理与核心业务逻辑分离,使得代码更易于测试和配置。
7.4 将RAG能力集成到AI研究助理
最后一步,让我们的AI助理能够使用这个强大的新能力。
7.4.1 更新Host主程序 (assistant_host/main.py)
修改非常直接,与上一章类似:添加一个新的Server连接,并在answer方法中添加路由逻辑。
# assistant_host/main.py (部分修改)
# ... (StdioMcpConnection, McpMultiplexer, AiResearchAssistant类定义保持不变) ...
class AiResearchAssistant:
# ...
async def answer(self, question: str) -> str:
# ...
tasks = []
# --- 原有的路由逻辑保持不变 ---
# ...
# --- 新增:RAG查询路由逻辑 ---
# 如果问题看起来是关于知识库内容的,就使用RAG
# 这是一个简单的启发式规则,真实应用会更复杂
if "what is" in question.lower() or "explain" in question.lower() or "based on the knowledge base" in question.lower():
params = {"name": "rag/query", "parameters": {"question": question}}
tasks.append(self.mux.request("rag", "project/executeTool", params))
# ... (任务执行和上下文构建逻辑需要微调以处理新格式) ...
if not tasks:
# 如果没有匹配到特定任务,可以默认使用RAG作为兜底
logging.info("No specific tool matched, defaulting to RAG query.")
params = {"name": "rag/query", "parameters": {"question": question}}
tasks.append(self.mux.request("rag", "project/executeTool", params))
logging.info(f"Gathering context from {len(tasks)} sources...")
responses = await asyncio.gather(*tasks, return_exceptions=True)
# 更新上下文构建逻辑
for i, res in enumerate(responses):
# ... (原有逻辑) ...
if 'result' in res:
tool_result = res['result'].get('result', {})
# ... (原有逻辑) ...
elif 'answer' in tool_result:
# 新增:处理RAG的回答
context_parts.append(f"[Knowledge Base Answer]:\n{tool_result['answer']}")
# 如果只有一个RAG任务,可以直接返回其结果,无需再调用LLM
if len(tasks) == 1 and tasks[0].cr_code.__name__ == 'request' and tasks[0].cr_frame.f_locals.get('method') == 'project/executeTool' and tasks[0].cr_frame.f_locals.get('params', {}).get('name') == 'rag/query':
if context_parts and "Knowledge Base Answer" in context_parts[0]:
logging.info("Returning direct answer from RAG server.")
return context_parts[0].replace("[Knowledge Base Answer]:\n", "")
# ... (构建prompt和调用OpenAI的部分保持不变,用于多源信息综合) ...
# ...
async def main():
# ...
mux = McpMultiplexer()
# --- 更新Server连接配置 ---
fs_conn = StdioMcpConnection("fs", "python3 main.py", "../fs_server")
wiki_conn = StdioMcpConnection("wiki", "python3 main.py", "../wiki_server")
db_conn = StdioMcpConnection("db", "python3 main.py", "../db_server")
# 新增:RAG Server连接
rag_conn = StdioMcpConnection("rag", "python3 main.py", "../rag_server")
mux.add_connection(fs_conn)
mux.add_connection(wiki_conn)
mux.add_connection(db_conn)
mux.add_connection(rag_conn) # 添加新的连接
assistant = AiResearchAssistant(mux, api_key)
try:
await mux.start_all()
await asyncio.sleep(10) # RAG初始化需要时间,增加等待
logging.info("All servers started. AI Research Assistant is ready.")
# ... (交互循环保持不变) ...
finally:
# ...
7.4.2 运行与测试
启动AI研究助理。这次启动会慢一些,因为RAG Server需要下载嵌入模型并建立索引。
-
启动Host:
cd assistant_host python3 main.py -
观察日志:你会看到RAG Server正在进行初始化的日志,然后所有4个Server都进入就绪状态。
-
提出基于知识库的问题:
Ask your research question (or type 'exit'): What is RAG?执行流程:
- Host的
answer方法将问题路由到rag_server。 rag_server接收到问题What is RAG?。- 它在内部执行RAG流程:将问题向量化,在FAISS中找到最相关的文档块(即
rag_intro.md的内容),然后将这个文档块和问题一起发给GPT-4。 - GPT-4基于上下文生成答案,例如:“RAG stands for Retrieval-Augmented Generation. It is a technique that combines retrieval of relevant information from a knowledge base with the generation capabilities of a large language model to produce accurate and contextually grounded answers.”
- 这个答案被返回给Host。
- Host的
answer方法发现这是一个直接来自RAG的回答,于是直接将其呈现给用户。
- Host的
7.5 总结:MCP赋能模块化AI系统
通过将RAG系统封装成一个独立的MCP Server,我们再次验证了MCP在构建模块化、可扩展AI系统中的核心价值。
- 能力即服务 (Capability as a Service):RAG Server就像一个微服务,它提供了一种高度专业化的能力(基于私有文档的问答)。任何MCP Host都可以按需使用这项服务,而无需关心其内部复杂的实现。
- 关注点分离:Host(AI研究助理)的核心职责是任务路由和用户交互。Server(RAG Server)的职责是数据处理和知识检索。这种清晰的关注点分离使得整个系统更容易开发、测试和维护。
- 可组合性:现在,我们可以轻易地组合各种能力。例如,提出一个问题:“根据知识库,解释一下MCP,并从数据库里找出所有关于‘Book’的产品。” AI助理可以并行地向RAG Server和DB Server发送请求,然后综合两者的结果,给出一个全面的答案。
至此,我们的AI研究助理已经具备了访问文件、API、数据库和私有知识库的能力,成为了一个名副其实的多才多艺的智能助手。这为我们探索更高级的AI应用,如自主Agent和多模态交互,奠定了坚实的基础。
「智能机器人开发者大赛」官方平台,致力于为开发者和参赛选手提供赛事技术指导、行业标准解读及团队实战案例解析;聚焦智能机器人开发全栈技术闭环,助力开发者攻克技术瓶颈,促进软硬件集成、场景应用及商业化落地的深度研讨。 加入智能机器人开发者社区iRobot Developer,与全球极客并肩突破技术边界,定义机器人开发的未来范式!
更多推荐



所有评论(0)