7.1 让AI拥有“长期记忆”:RAG简介

到目前为止,我们的AI助理已经能够访问文件、调用API和查询数据库。但这些交互在某种程度上是被动的。当我们问“总结一下attention_is_all_you_need.md”时,它只是读取整个文件。如果我们有一个包含数百篇研究论文的文件夹,AI要如何高效地找到与我们问题最相关的几篇,并基于它们的内容进行回答呢?

这就是**检索增强生成(Retrieval-Augmented Generation, RAG)**技术大显身手的舞台。RAG是一种将大型语言模型 (LLM) 的强大生成能力与外部知识库的精准检索能力相结合的架构。它赋予了AI一种“长期记忆”和“开卷考试”的能力。

RAG的核心思想:

  1. 索引 (Indexing):预先将大量的文档(如PDF、Markdown、网页)分割成小块(chunks),然后使用一个嵌入模型 (Embedding Model) 将每个小块转换成一个高维向量(vector)。这些向量捕捉了文本块的语义含义。最后,将这些向量和它们对应的原文存储在一个向量数据库 (Vector Database) 中。
  2. 检索 (Retrieval):当用户提出一个问题时,首先使用同一个嵌入模型将问题也转换成一个向量。
  3. 搜索 (Search):在向量数据库中,搜索与问题向量最相似的文档向量。这些最相似的文档块就是与问题最相关的内容。
  4. 增强 (Augmentation):将检索到的这些相关文档块作为上下文 (context),与原始问题一起,构建一个新的、更丰富的提示 (prompt)。
  5. 生成 (Generation):将这个增强后的提示发送给LLM,让它基于提供的上下文来生成最终的、精准的答案。

在本章中,我们将构建一个RAG MCP Server。这个Server将把上述复杂的RAG流程,封装成一个简单的rag/query工具。AI Host只需要调用这个工具并传入问题,就能获得基于私有知识库的智能回答,而完全无需关心背后发生了什么——文档切分、向量化、相似度搜索等等。


7.2 项目准备与RAG架构

我们将构建一个全新的RAG Server,它可以索引一个指定的文档目录,并提供问答服务。

7.2.1 安装必要的库

RAG系统需要一些专门的库。我们将使用langchain来简化文档处理和RAG流程的构建,faiss-cpu作为我们的本地向量数据库,以及sentence-transformers来获取一个高质量的开源嵌入模型。

# 确保你位于ai-research-assistant目录下并激活了虚拟环境
cd ai-research-assistant

# 安装RAG相关的库
pip install langchain faiss-cpu sentence-transformers

# 创建RAG Server目录
mkdir rag_server

# 创建一个用于存放知识库文档的目录
mkdir -p rag_server/knowledge_base

# 创建一些示例文档
echo "The Model Context Protocol (MCP) is an open standard for AI context. It uses JSON-RPC." > rag_server/knowledge_base/mcp_overview.md
echo "RAG stands for Retrieval-Augmented Generation. It combines retrieval with generation to provide accurate answers from a knowledge base." > rag_server/knowledge_base/rag_intro.md
echo "A vector database stores data as high-dimensional vectors, enabling efficient similarity search." > rag_server/knowledge_base/vector_db.md

# 创建RAG Server的主程序文件
touch rag_server/main.py

7.2.2 RAG Server架构设计

我们的rag_server在启动时,会执行一次性的索引过程,然后持续监听来自Host的查询请求。

Serving Phase (持续运行)
Initialization Phase (一次性)
rag/query
8. 向量化问题
9. 在FAISS中搜索相似向量
10. 返回相关文档Chunks
11. 构建增强Prompt
12. 生成答案
13. 返回答案
RAG MCP Server
🤖 AI Host
🧠 LLM
2. 加载文档
1. 读取 knowledge_base 目录
3. 切分文档成Chunks
4. 嵌入模型
5. 将Chunks向量化
6. 存入FAISS向量数据库

7.3 构建RAG MCP Server (rag_server/main.py)

这个Server的代码会比之前的复杂一些,因为它包含了完整的RAG流程的实现。

# rag_server/main.py

import asyncio
import json
import logging
import os
from typing import Dict, Any, List

# LangChain组件
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_text_splitters import CharacterTextSplitter
from langchain_openai import ChatOpenAI

KNOWLEDGE_BASE_DIR = "knowledge_base"

class RagServer:
    def __init__(self, knowledge_dir: str, openai_api_key: str):
        self.knowledge_dir = knowledge_dir
        self.qa_chain = None
        self.openai_api_key = openai_api_key
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - RAG_SERVER - %(levelname)s - %(message)s')

    async def initialize(self):
        """执行一次性的索引过程"""
        logging.info("Initializing RAG server...")
        
        # 1. 加载文档
        logging.info(f"Loading documents from '{self.knowledge_dir}'...")
        loader = DirectoryLoader(self.knowledge_dir, glob="**/*.md", loader_cls=TextLoader)
        documents = loader.load()
        if not documents:
            logging.warning("No documents found in knowledge base. RAG will not be effective.")
            return

        # 2. 切分文档
        logging.info("Splitting documents into chunks...")
        text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
        texts = text_splitter.split_documents(documents)

        # 3. 设置嵌入模型 (使用开源模型)
        logging.info("Loading embedding model...")
        embeddings = HuggingFaceEmbeddings(model_name='all-MiniLM-L6-v2')

        # 4. 创建向量数据库
        logging.info("Creating vector store...")
        vector_store = FAISS.from_documents(texts, embeddings)

        # 5. 创建RAG链
        logging.info("Creating RAG chain...")
        llm = ChatOpenAI(openai_api_key=self.openai_api_key, model_name="gpt-4")
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=llm,
            chain_type="stuff",
            retriever=vector_store.as_retriever()
        )
        logging.info("RAG server initialized successfully.")

    async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        method = request.get("method")
        params = request.get("params", {})
        request_id = request.get("id")

        try:
            if method == "project/listTools":
                result = self._list_tools()
            elif method == "project/executeTool":
                tool_name = params.get("name")
                tool_params = params.get("parameters", {})
                if tool_name == "rag/query":
                    result = self._query(tool_params.get("question"))
                else:
                    raise ValueError(f"Tool '{tool_name}' not found.")
            else:
                raise NotImplementedError(f"Method '{method}' not supported.")
            
            return {"jsonrpc": "2.0", "id": request_id, "result": result}
        except Exception as e:
            logging.error(f"Error handling request: {e}", exc_info=True)
            return {"jsonrpc": "2.0", "id": request_id, "error": {"code": -32000, "message": str(e)}}

    def _list_tools(self) -> List[Dict[str, Any]]:
        return [
            {
                "name": "rag/query",
                "description": "Answers a question based on the internal knowledge base.",
                "parameters": [
                    {"name": "question", "type": "string", "required": True, "description": "The question to ask."}
                ]
            }
        ]

    def _query(self, question: str) -> Dict[str, Any]:
        if not self.qa_chain:
            raise RuntimeError("RAG chain is not initialized. Is the knowledge base empty?")
        if not question:
            raise ValueError("Question cannot be empty.")

        logging.info(f"Executing RAG query: '{question}'")
        # LangChain的RAG链封装了所有步骤
        response = self.qa_chain.invoke({"query": question})
        
        return {
            "result": {"answer": response.get('result')},
            "stdout": f"Successfully answered question.",
            "stderr": None
        }

async def main():
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        print("Error: OPENAI_API_KEY environment variable not set.", file=sys.stderr)
        return

    server = RagServer(KNOWLEDGE_BASE_DIR, api_key)
    await server.initialize() # 在开始监听前,先完成索引

    # main通信循环与之前的Server完全相同
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await asyncio.get_running_loop().connect_read_pipe(lambda: protocol, asyncio.get_event_loop()._default_reader)
    writer = asyncio.StreamWriter(protocol, asyncio.get_event_loop(), None, None)

    logging.info("RAG Server started. Waiting for requests...")

    while True:
        line = await reader.readline()
        if not line:
            break
        request = json.loads(line.decode('utf-8'))
        response = await server.handle_request(request)
        response_line = json.dumps(response) + '\n'
        sys.stdout.write(response_line)
        await sys.stdout.flush()

if __name__ == "__main__":
    import sys
    sys.stderr = sys.stdout
    asyncio.run(main())

7.3.1 代码解析

  1. initialize 方法: 这是本Server的特色。它在服务启动前执行一次,负责构建RAG的核心——qa_chain。它使用langchain库优雅地完成了加载、切分、嵌入和存储的全过程。这确保了在处理查询请求时,向量数据库已经准备就绪,可以实现快速检索。

  2. 开源嵌入模型: 我们使用了HuggingFaceEmbeddingsall-MiniLM-L6-v2模型。这是一个轻量级但效果很好的开源模型,可以在本地运行,避免了将我们的私有文档发送到外部API进行向量化的需要,增强了数据隐私性。

  3. _query 方法: 这个方法的核心非常简单:self.qa_chain.invoke(...)langchainRetrievalQA链将RAG的“检索-增强-生成”三步曲封装得天衣无缝。我们只需要调用它,就能得到最终的答案。

  4. 依赖注入: RagServer的构造函数接收openai_api_key。这是一种良好的实践,将密钥管理与核心业务逻辑分离,使得代码更易于测试和配置。


7.4 将RAG能力集成到AI研究助理

最后一步,让我们的AI助理能够使用这个强大的新能力。

7.4.1 更新Host主程序 (assistant_host/main.py)

修改非常直接,与上一章类似:添加一个新的Server连接,并在answer方法中添加路由逻辑。

# assistant_host/main.py (部分修改)

# ... (StdioMcpConnection, McpMultiplexer, AiResearchAssistant类定义保持不变) ...

class AiResearchAssistant:
    # ...
    async def answer(self, question: str) -> str:
        # ...
        tasks = []

        # --- 原有的路由逻辑保持不变 ---
        # ...

        # --- 新增:RAG查询路由逻辑 ---
        # 如果问题看起来是关于知识库内容的,就使用RAG
        # 这是一个简单的启发式规则,真实应用会更复杂
        if "what is" in question.lower() or "explain" in question.lower() or "based on the knowledge base" in question.lower():
            params = {"name": "rag/query", "parameters": {"question": question}}
            tasks.append(self.mux.request("rag", "project/executeTool", params))

        # ... (任务执行和上下文构建逻辑需要微调以处理新格式) ...
        if not tasks:
            # 如果没有匹配到特定任务,可以默认使用RAG作为兜底
            logging.info("No specific tool matched, defaulting to RAG query.")
            params = {"name": "rag/query", "parameters": {"question": question}}
            tasks.append(self.mux.request("rag", "project/executeTool", params))

        logging.info(f"Gathering context from {len(tasks)} sources...")
        responses = await asyncio.gather(*tasks, return_exceptions=True)

        # 更新上下文构建逻辑
        for i, res in enumerate(responses):
            # ... (原有逻辑) ...
            if 'result' in res:
                tool_result = res['result'].get('result', {})
                # ... (原有逻辑) ...
                elif 'answer' in tool_result:
                    # 新增:处理RAG的回答
                    context_parts.append(f"[Knowledge Base Answer]:\n{tool_result['answer']}")
        
        # 如果只有一个RAG任务,可以直接返回其结果,无需再调用LLM
        if len(tasks) == 1 and tasks[0].cr_code.__name__ == 'request' and tasks[0].cr_frame.f_locals.get('method') == 'project/executeTool' and tasks[0].cr_frame.f_locals.get('params', {}).get('name') == 'rag/query':
            if context_parts and "Knowledge Base Answer" in context_parts[0]:
                logging.info("Returning direct answer from RAG server.")
                return context_parts[0].replace("[Knowledge Base Answer]:\n", "")

        # ... (构建prompt和调用OpenAI的部分保持不变,用于多源信息综合) ...
        # ...

async def main():
    # ...
    mux = McpMultiplexer()

    # --- 更新Server连接配置 ---
    fs_conn = StdioMcpConnection("fs", "python3 main.py", "../fs_server")
    wiki_conn = StdioMcpConnection("wiki", "python3 main.py", "../wiki_server")
    db_conn = StdioMcpConnection("db", "python3 main.py", "../db_server")
    # 新增:RAG Server连接
    rag_conn = StdioMcpConnection("rag", "python3 main.py", "../rag_server")
    
    mux.add_connection(fs_conn)
    mux.add_connection(wiki_conn)
    mux.add_connection(db_conn)
    mux.add_connection(rag_conn) # 添加新的连接

    assistant = AiResearchAssistant(mux, api_key)

    try:
        await mux.start_all()
        await asyncio.sleep(10) # RAG初始化需要时间,增加等待
        logging.info("All servers started. AI Research Assistant is ready.")
        # ... (交互循环保持不变) ...
    finally:
        # ...

7.4.2 运行与测试

启动AI研究助理。这次启动会慢一些,因为RAG Server需要下载嵌入模型并建立索引。

  1. 启动Host

    cd assistant_host
    python3 main.py
    
  2. 观察日志:你会看到RAG Server正在进行初始化的日志,然后所有4个Server都进入就绪状态。

  3. 提出基于知识库的问题

    Ask your research question (or type 'exit'): What is RAG?
    

    执行流程

    • Host的answer方法将问题路由到rag_server
    • rag_server接收到问题What is RAG?
    • 它在内部执行RAG流程:将问题向量化,在FAISS中找到最相关的文档块(即rag_intro.md的内容),然后将这个文档块和问题一起发给GPT-4。
    • GPT-4基于上下文生成答案,例如:“RAG stands for Retrieval-Augmented Generation. It is a technique that combines retrieval of relevant information from a knowledge base with the generation capabilities of a large language model to produce accurate and contextually grounded answers.”
    • 这个答案被返回给Host。
    • Host的answer方法发现这是一个直接来自RAG的回答,于是直接将其呈现给用户。

7.5 总结:MCP赋能模块化AI系统

通过将RAG系统封装成一个独立的MCP Server,我们再次验证了MCP在构建模块化、可扩展AI系统中的核心价值。

  • 能力即服务 (Capability as a Service):RAG Server就像一个微服务,它提供了一种高度专业化的能力(基于私有文档的问答)。任何MCP Host都可以按需使用这项服务,而无需关心其内部复杂的实现。
  • 关注点分离:Host(AI研究助理)的核心职责是任务路由和用户交互。Server(RAG Server)的职责是数据处理和知识检索。这种清晰的关注点分离使得整个系统更容易开发、测试和维护。
  • 可组合性:现在,我们可以轻易地组合各种能力。例如,提出一个问题:“根据知识库,解释一下MCP,并从数据库里找出所有关于‘Book’的产品。” AI助理可以并行地向RAG Server和DB Server发送请求,然后综合两者的结果,给出一个全面的答案。

至此,我们的AI研究助理已经具备了访问文件、API、数据库和私有知识库的能力,成为了一个名副其实的多才多艺的智能助手。这为我们探索更高级的AI应用,如自主Agent和多模态交互,奠定了坚实的基础。

Logo

「智能机器人开发者大赛」官方平台,致力于为开发者和参赛选手提供赛事技术指导、行业标准解读及团队实战案例解析;聚焦智能机器人开发全栈技术闭环,助力开发者攻克技术瓶颈,促进软硬件集成、场景应用及商业化落地的深度研讨。 加入智能机器人开发者社区iRobot Developer,与全球极客并肩突破技术边界,定义机器人开发的未来范式!

更多推荐