Files
life-echo/api/agents/conversation_agent.py
penghanyuan dbbb924625 feat: 添加Redis支持和Celery任务处理
- 新增Redis服务模块用于会话状态存储和缓存
- 集成Celery用于后台任务处理
- 更新Docker Compose配置以支持开发环境
- 优化API以支持异步调用和Redis会话存储
- 更新文档以反映新的开发环境配置和使用方法
2026-01-21 23:06:47 +01:00

198 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
对话 Agent基于访谈问题清单动态选择问题实时生成回应
支持异步调用和 Redis 会话存储
"""
import logging
from typing import List, Optional, Dict, Any
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from services.llm_service import llm_service
from services.redis_service import redis_service
from .prompts import ConversationStage, get_conversation_prompt, get_guided_conversation_prompt
from .state_schema import MemoirStateSchema
logger = logging.getLogger(__name__)
class ConversationAgent:
"""对话 Agent支持异步和 Redis 存储)"""
def __init__(self):
# 使用 LLM 服务获取 LLM 实例
self.llm = llm_service.get_llm()
async def _get_history_messages(self, conversation_id: str) -> List[Any]:
"""从 Redis 获取对话历史并转换为 LangChain 消息格式"""
history = await redis_service.get_conversation_history(conversation_id)
messages = []
for msg in history:
if msg["role"] == "human":
messages.append(HumanMessage(content=msg["content"]))
elif msg["role"] == "ai":
messages.append(AIMessage(content=msg["content"]))
return messages
async def _save_message(self, conversation_id: str, role: str, content: str):
"""保存消息到 Redis"""
await redis_service.add_message(conversation_id, role, content)
def _format_history_string(self, messages: List[Any]) -> str:
"""将消息列表格式化为字符串(用于 prompt"""
history_parts = []
for msg in messages:
if isinstance(msg, HumanMessage):
history_parts.append(f"Human: {msg.content}")
elif isinstance(msg, AIMessage):
history_parts.append(f"Assistant: {msg.content}")
return "\n\n".join(history_parts)
async def generate_response(
self,
conversation_id: str,
user_message: str,
current_stage: Optional[ConversationStage] = None,
covered_topics: Optional[List[str]] = None
) -> str:
"""
异步生成 Agent 回应
Args:
conversation_id: 对话 ID
user_message: 用户消息
current_stage: 当前对话阶段
covered_topics: 已聊过的话题列表
Returns:
Agent 回应文本
"""
if current_stage is None:
current_stage = ConversationStage.CHILDHOOD
if covered_topics is None:
covered_topics = []
# 如果没有配置 LLM返回默认回应
if not self.llm:
return "抱歉LLM 服务未配置。请设置 DEEPSEEK_API_KEY 或 LLM_API_KEY 环境变量。"
try:
# 获取系统提示词
system_prompt = get_conversation_prompt(current_stage, covered_topics, user_message)
# 从 Redis 获取对话历史
history_messages = await self._get_history_messages(conversation_id)
history_string = self._format_history_string(history_messages)
# 构建完整 prompt
full_prompt = f"{system_prompt}\n\n{history_string}\n\nHuman: {user_message}\n\nAssistant:"
# 异步调用 LLM
response = await self.llm.ainvoke(full_prompt)
response_text = response.content if hasattr(response, 'content') else str(response)
# 保存对话到 Redis
await self._save_message(conversation_id, "human", user_message)
await self._save_message(conversation_id, "ai", response_text)
return response_text
except Exception as e:
logger.error(f"生成回应失败: {e}")
return f"抱歉,生成回应时出现错误: {str(e)}"
async def generate_response_with_state(
self,
conversation_id: str,
user_message: str,
memoir_state: MemoirStateSchema
) -> List[str]:
"""
基于共享状态异步生成引导式回复
Args:
conversation_id: 对话 ID
user_message: 用户消息
memoir_state: 共享状态
Returns:
Agent 回应文本列表(支持多条消息)
"""
if not self.llm:
return ["抱歉LLM 服务未配置。请设置 DEEPSEEK_API_KEY 或 LLM_API_KEY 环境变量。"]
try:
empty_slots = memoir_state.empty_slots_for_current_stage()
filled_slots = {
key: value.snippet
for key, value in memoir_state.slots.get(memoir_state.current_stage, {}).items()
if value.snippet
}
system_prompt = get_guided_conversation_prompt(
current_stage=memoir_state.current_stage,
empty_slots=empty_slots,
filled_slots=filled_slots,
user_message=user_message,
)
# 从 Redis 获取对话历史
history_messages = await self._get_history_messages(conversation_id)
history_string = self._format_history_string(history_messages)
# 构建完整 prompt
full_prompt = f"{system_prompt}\n\n{history_string}\n\nHuman: {user_message}\n\nAssistant:"
# 异步调用 LLM
response = await self.llm.ainvoke(full_prompt)
response_text = response.content if hasattr(response, 'content') else str(response)
# 保存对话到 Redis
await self._save_message(conversation_id, "human", user_message)
await self._save_message(conversation_id, "ai", response_text)
# 支持多条消息,用 [SPLIT] 分隔
messages = [msg.strip() for msg in response_text.split("[SPLIT]") if msg.strip()]
# 最多返回 3 条
return messages[:3] if messages else [response_text]
except Exception as e:
logger.error(f"生成回应失败: {e}")
return [f"抱歉,生成回应时出现错误: {str(e)}"]
def detect_stage(self, conversation_id: str, user_message: str) -> ConversationStage:
"""
检测对话阶段
Args:
conversation_id: 对话 ID
user_message: 用户消息
Returns:
检测到的对话阶段
"""
# 简单的关键词检测(实际应该使用更智能的方法)
message_lower = user_message.lower()
if any(word in message_lower for word in ["童年", "小时候", "出生", "家庭背景"]):
return ConversationStage.CHILDHOOD
elif any(word in message_lower for word in ["上学", "学校", "老师", "同学", "教育"]):
return ConversationStage.EDUCATION
elif any(word in message_lower for word in ["工作", "职业", "事业", "公司", "同事"]):
return ConversationStage.CAREER
elif any(word in message_lower for word in ["伴侣", "孩子", "家庭", "家人", "结婚"]):
return ConversationStage.FAMILY
elif any(word in message_lower for word in ["信念", "价值观", "座右铭", "坚持", "原则"]):
return ConversationStage.BELIEFS
elif any(word in message_lower for word in ["总结", "回顾", "感激", "希望", "未来"]):
return ConversationStage.SUMMARY
else:
# 默认返回当前阶段或童年阶段
return ConversationStage.CHILDHOOD
async def clear_memory(self, conversation_id: str):
"""清除对话记忆(从 Redis"""
await redis_service.clear_conversation_history(conversation_id)