""" 回忆录整理 Agent:基于传记结构,将口语改写为书面语,归类到章节 """ import os import json from typing import List, Dict, Optional from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate from .prompts import ( get_memory_prompt, get_chapter_classification_prompt, get_text_rewrite_prompt, CHAPTER_CATEGORIES, CHAPTER_ORDER ) class MemoryAgent: """回忆录整理 Agent""" def __init__(self): # 初始化 LLM # 优先使用 LLM_API_KEY 和 LLM_BASE_URL,如果没有则使用 OPENAI_API_KEY api_key = os.getenv("LLM_API_KEY") or os.getenv("OPENAI_API_KEY", "") base_url = os.getenv("LLM_BASE_URL", "") model_name = os.getenv("OPENAI_MODEL", "gpt-4o") if not api_key: self.llm = None return # 如果提供了 base_url,需要处理路径(langchain 会自动添加 /v1/chat/completions) llm_kwargs = { "temperature": 0.3, # 较低温度,更稳定 "model": model_name, "openai_api_key": api_key, } if base_url: # 移除可能的 /v1/chat/completions 路径,langchain 会自动添加 if base_url.endswith("/v1/chat/completions"): base_url = base_url[:-20] # 移除 "/v1/chat/completions" elif base_url.endswith("/v1"): base_url = base_url[:-3] # 移除 "/v1" # 确保 base_url 以 / 结尾(如果没有) if base_url and not base_url.endswith("/"): base_url += "/" llm_kwargs["openai_api_base"] = base_url try: self.llm = ChatOpenAI(**llm_kwargs) except Exception: self.llm = None def classify_chapter(self, segments_text: str) -> str: """ 分类章节 Args: segments_text: 对话段落文本 Returns: 章节类别(如:childhood) """ if not self.llm: # 如果没有配置 LLM,返回默认类别 return "childhood" prompt = get_chapter_classification_prompt(segments_text) response = self.llm.invoke(prompt) # 提取类别 category = response.content.strip().lower() # 验证类别是否有效 if category in CHAPTER_CATEGORIES: return category # 默认返回 childhood return "childhood" def rewrite_to_literary( self, segments_text: str, chapter_category: str, existing_content: Optional[str] = None ) -> Dict: """ 将口语改写为书面语 Args: segments_text: 对话段落文本 chapter_category: 章节类别 existing_content: 已有章节内容(可选) Returns: 包含 title, content, summary, image_suggestions 的字典 """ if not self.llm: # 如果没有配置 LLM,返回基本结构 return { "title": CHAPTER_CATEGORIES.get(chapter_category, "章节"), "content": segments_text, "summary": "", "image_suggestions": [] } prompt = get_text_rewrite_prompt(segments_text, chapter_category, existing_content or "") response = self.llm.invoke(prompt) # 尝试解析 JSON try: # 提取 JSON 部分 content = response.content.strip() # 移除可能的 markdown 代码块标记 if content.startswith("```json"): content = content[7:] if content.startswith("```"): content = content[3:] if content.endswith("```"): content = content[:-3] content = content.strip() result = json.loads(content) return result except json.JSONDecodeError: # 如果解析失败,返回基本结构 return { "title": CHAPTER_CATEGORIES.get(chapter_category, "章节"), "content": response.content, "summary": "", "image_suggestions": [] } def process_segments( self, segments: List[Dict], existing_chapters: Optional[Dict[str, Dict]] = None ) -> Dict[str, Dict]: """ 处理对话段落,生成或更新章节 Args: segments: 对话段落列表,每个包含 transcript_text existing_chapters: 已有章节字典,key 为 category Returns: 更新后的章节字典 """ if existing_chapters is None: existing_chapters = {} # 按章节分类组织段落 segments_by_category: Dict[str, List[str]] = {} for segment in segments: text = segment.get("transcript_text", "") if not text: continue # 分类 category = self.classify_chapter(text) if category not in segments_by_category: segments_by_category[category] = [] segments_by_category[category].append(text) # 为每个类别生成或更新章节 updated_chapters = existing_chapters.copy() for category, texts in segments_by_category.items(): combined_text = "\n\n".join(texts) existing_content = existing_chapters.get(category, {}).get("content", "") # 改写为书面语 result = self.rewrite_to_literary(combined_text, category, existing_content) # 更新章节 updated_chapters[category] = { "title": result.get("title", CHAPTER_CATEGORIES.get(category, "章节")), "content": result.get("content", ""), "summary": result.get("summary", ""), "image_suggestions": result.get("image_suggestions", []), "category": category, "order_index": CHAPTER_ORDER.index(category) if category in CHAPTER_ORDER else 999 } return updated_chapters