最近也在思考这个问题,如果你的agent有一些未定义的预期行为,又要决策的准,怎么办呢?下面以eepwiki-open蓝本介绍一下实现思路
架构设计
数据结构设计
长期记忆的核心数据结构由以下几个类组成:
- UserQuery类:存储用户的查询文本 AssistantResponse类:存储系统的回答文本DialogTurn类:表示一轮完整的对话,包含用户查询和系统回答 CustomConversation类:管理多轮对话的容器
Memory类实现
Memory
类是长期记忆的核心实现,它继承自adal.core.component.DataComponent
,负责存储和管理对话历史
Memory类主要提供两个关键功能:
- 检索对话历史:通过
call()
方法返回所有存储的对话轮次 添加新对话:通过add_dialog_turn()
方法将新的用户查询和系统回答添加到对话历史中 长期记忆在RAG中的集成
RAG
类将长期记忆与检索和生成组件集成在一起,初始化过程中,RAG类创建了Memory实例
长期记忆的工作流程
1. 初始化记忆组件
当RAG系统初始化时,会创建一个Memory实例来管理对话历史:
2. 在生成过程中使用记忆
在设置生成器时,系统将记忆组件的输出作为提示的一部分:
特别是在prompt_kwargs
中,系统通过调用self.memory()
获取对话历史:
3. 在查询处理中使用记忆
当处理新的查询时,系统会从记忆中获取历史对话作为上下文
4. 更新记忆
在生成回答后,系统会将新的对话轮次添加到记忆中
长期记忆的核心实现
DeepWiki 项目中的 rag.py
文件实现了基于长期记忆的检索增强生成(RAG)系统。该系统通过两个关键层面实现长期记忆:对话记忆和知识记忆,下面我将详细分析其实现方案。
对话记忆实现
RAG 系统中的对话记忆通过 Memory
类实现,该类继承自 adal.core.component.DataComponent
:
class Memory(adal.core.component.DataComponent): """Simple conversation management with a list of dialog turns.""" def __init__(self): super().__init__() # Use our custom implementation instead of the original Conversation class self.current_conversation = CustomConversation()
Memory
类使用 CustomConversation
类来存储对话历史,每个对话轮次以 DialogTurn
对象的形式保存,包含用户查询和系统响应:
@dataclass class DialogTurn: id: str user_query: UserQuery assistant_response: AssistantResponse
当系统生成新的响应后,会通过 add_dialog_turn
方法将当前的对话轮次添加到记忆中:
def add_dialog_turn(self, user_query: str, assistant_response: str) -> bool: """ Add a dialog turn to the conversation history. Args: user_query: The user's query assistant_response: The assistant's response Returns: bool: True if successful, False otherwise """ try: # Create a new dialog turn using our custom implementation dialog_turn = DialogTurn( id=str(uuid4()), user_query=UserQuery(query_str=user_query), assistant_response=AssistantResponse(response_str=assistant_response), ) # Make sure the current_conversation has the append_dialog_turn method if not hasattr(self.current_conversation, 'append_dialog_turn'): logger.warning("current_conversation does not have append_dialog_turn method, creating new one") # Initialize a new conversation if needed self.current_conversation = CustomConversation() # Ensure dialog_turns exists if not hasattr(self.current_conversation, 'dialog_turns'): logger.warning("dialog_turns not found, initializing empty list") self.current_conversation.dialog_turns = [] # Safely append the dialog turn self.current_conversation.dialog_turns.append(dialog_turn) logger.info(f"Successfully added dialog turn, now have {len(self.current_conversation.dialog_turns)} turns") return True # ...省略异常处理部分
知识记忆实现
知识记忆通过 DatabaseManager
类实现,该类负责将代码库内容转换为可检索的嵌入向量并持久化存储:
class DatabaseManager: """ Manages the creation, loading, transformation, and persistence of LocalDB instances. """ def __init__(self): self.db = None self.repo_url_or_path = None self.repo_paths = None
DatabaseManager
类通过 prepare_database
方法加载或创建知识库:
def prepare_database(self, repo_url_or_path: str, access_token: str = None, local_ollama: bool = False) -> List[Document]: """ Create a new database from the repository. Args: repo_url_or_path (str): The URL or local path of the repository access_token (str, optional): Access token for private repositories local_ollama (bool): Whether to use local Ollama for embedding (default: False) Returns: List[Document]: List of Document objects """ self.reset_database() self._create_repo(repo_url_or_path, access_token) return self.prepare_db_index(local_ollama=local_ollama)
系统会优先检查是否有现有数据库,如果有则直接加载,否则创建新的数据库:
def prepare_db_index(self, local_ollama: bool = False) -> List[Document]: """ Prepare the indexed database for the repository. Args: local_ollama (bool): Whether to use local Ollama for embedding (default: False) Returns: List[Document]: List of Document objects """ # check the database if self.repo_paths and os.path.exists(self.repo_paths["save_db_file"]): logger.info("Loading existing database...") try: self.db = LocalDB.load_state(self.repo_paths["save_db_file"]) documents = self.db.get_transformed_data(key="split_and_embed") if documents: logger.info(f"Loaded {len(documents)} documents from existing database") return documents except Exception as e: logger.error(f"Error loading existing database: {e}") # Continue to create a new database # prepare the database logger.info("Creating new database...") documents = read_all_documents(self.repo_paths["save_repo_dir"], local_ollama=local_ollama) self.db = transform_documents_and_save_to_db( documents, self.repo_paths["save_db_file"], local_ollama=local_ollama ) logger.info(f"Total documents: {len(documents)}") transformed_docs = self.db.get_transformed_data(key="split_and_embed") logger.info(f"Total transformed documents: {len(transformed_docs)}") return transformed_docs
RAG 系统的工作流程
RAG
类是整个系统的核心,它结合了对话记忆和知识记忆,实现了完整的检索增强生成功能:
class RAG(adal.Component): """RAG with one repo. If you want to load a new repos, call prepare_retriever(repo_url_or_path) first.""" def __init__(self, use_s3: bool = False, local_ollama: bool = False): # noqa: F841 - use_s3 is kept for compatibility """ Initialize the RAG component. Args: use_s3: Whether to use S3 for database storage (default: False) local_ollama: Whether to use local Ollama for embedding (default: False) """ super().__init__() self.local_ollama = local_ollama # Initialize components self.memory = Memory()
RAG 系统的核心查询处理流程如下:
def call(self, query: str) -> Tuple[Any, List]: """ Process a query using RAG. Args: query: The user's query Returns: Tuple of (RAGAnswer, retrieved_documents) """ try: retrieved_documents = self.retriever(query) # Fill in the documents retrieved_documents[0].documents = [ self.transformed_docs[doc_index] for doc_index in retrieved_documents[0].doc_indices ] # Prepare generation parameters prompt_kwargs = { "input_str": query, "contexts": retrieved_documents[0].documents, "conversation_history": self.memory(), } # Generate response response = self.generator(prompt_kwargs=prompt_kwargs) final_response = response.data # ...省略错误处理部分 # Add to conversation memory self.memory.add_dialog_turn(user_query=query, assistant_response=final_response.answer) return final_response, retrieved_documents
长期记忆机制的关键点
整个 RAG 系统的长期记忆机制有以下几个关键点:
对话历史持久化:
Memory
类将对话历史以 DialogTurn
对象的形式存储在内存中每次生成新的回答后,都会将用户查询和系统响应添加到对话历史中知识库持久化:
- 通过
LocalDB
类将文档内容和嵌入向量持久化存储到本地文件系统所有文档按照仓库名进行组织存储在 ~/.adalflow/databases/{repo_name}.pkl
路径下检索时的记忆利用:
- 在生成回答时,系统会同时使用检索到的相关文档和对话历史作为上下文对话记忆通过模板中的
conversation_history
参数传递给生成器模板集成记忆:
- RAG 系统使用了包含对话历史的模板
RAG_TEMPLATE
,确保生成的回答考虑之前的对话内容模板中会循环所有保存的对话轮次,形成完整的上下文RAG模板
RAG_TEMPLATE = r"""<START_OF_SYS_PROMPT> {{system_prompt}} {{output_format_str}} <END_OF_SYS_PROMPT> {# OrderedDict of DialogTurn #} {% if conversation_history %} <START_OF_CONVERSATION_HISTORY> {% for key, dialog_turn in conversation_history.items() %} {{key}}. User: {{dialog_turn.user_query.query_str}} You: {{dialog_turn.assistant_response.response_str}} {% endfor %} <END_OF_CONVERSATION_HISTORY> {% endif %} {% if contexts %} <START_OF_CONTEXT> {% for context in contexts %} {{loop.index }}. File Path: {{context.meta_data.get('file_path', 'unknown')}} Content: {{context.text}} {% endfor %} <END_OF_CONTEXT> {% endif %} <START_OF_USER_PROMPT> {{input_str}} <END_OF_USER_PROMPT> """