Files
life-echo/api/app/agents/chat/profile_agent.py
Kevin 59d4b19d7d feat(api): 回忆录管线简化、路由延迟池与相关加固
- Phase1/2:移除 MemoirOrchestrator.run 与 process_memoir_segments 别名;文档改为 process_memoir_phase1。
- 槽位校验集中到 stage_constants(filter_stage_slots),批处理与顺序路径及 state_service 写库一致。
- StoryRoute:no_llm/parse_error/invalid_target 保守 new_story;短篇护栏不覆盖这些 fallback。
- Phase2 低置信单路径可选延迟(StoryPipelineResult.deferred):不写 Chapter/Story,Segment 记录 defer 元数据,冷却内不重复消费;上限后停自动重试,Phase1 同类目新段唤醒池内段。
- Alembic 0017:segments 表 narrative_defer_* 列。
- ProfileAgent:经 LlmGateway/注入 Provider 统一聊天与 JSON,新增测试。
- ImagePromptOrchestrator:LLM 初始化失败可依配置降级或硬失败;补充策略测试。
- 配套单测与 README/本地开发文档表述更新。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-06 13:18:02 +08:00

362 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ProfileAgent用户资料收集 Specialist
负责提取资料、资料追问、资料收集开场白,不负责 Redis 持久化(由 Orchestrator 统一处理)
"""
import time
from typing import Any, Dict, List, Optional
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from app.agents.chat.helpers import format_history_string, get_history_with_window
from app.agents.chat.prompts_profile import (
get_profile_extraction_prompt,
get_profile_followup_prompt,
get_profile_greeting_prompt,
)
from app.agents.chat.reply_limits import (
nonempty_segments_or_fallback,
segments_from_llm_response,
truncate_chat_segments,
)
from app.agents.chat.schemas import ProfileExtractionOutput
from app.core.agent_logging import agent_span, log_agent_payload, log_agent_summary
from app.core.config import settings
from app.core.llm_call import allm_json_call
from app.core.llm_gateway import LlmGateway, LlmUseCase
from app.core.logging import get_logger
from app.ports.llm import LLMProvider
logger = get_logger(__name__)
class _ProviderBackedProfileGateway:
def __init__(self, provider: LLMProvider) -> None:
self._provider = provider
async def chat_text(
self,
messages: list[dict],
*,
use_case: LlmUseCase | None = None,
temperature: float | None = None,
model: str | None = None,
max_tokens: int | None = None,
) -> str:
resolved_temperature = temperature
if resolved_temperature is None:
resolved_temperature = (
use_case.temperature
if use_case and use_case.temperature is not None
else 0.7
)
return await self._provider.complete(
messages,
temperature=resolved_temperature,
model=model if model is not None else (use_case.model if use_case else None),
max_tokens=(
max_tokens
if max_tokens is not None
else (use_case.max_tokens if use_case else None)
),
)
async def json_object(
self,
prompt: str,
schema: type[ProfileExtractionOutput],
*,
use_case: LlmUseCase,
fallback_factory: Any = None,
) -> ProfileExtractionOutput:
return await allm_json_call(
getattr(self._provider, "langchain_llm", None),
prompt,
schema,
max_tokens=use_case.max_tokens or 1024,
agent=use_case.name,
fallback_factory=fallback_factory,
)
def _langchain_messages_to_port(messages: List[Any]) -> list[dict]:
"""LangChain message 列表 → ``LLMProvider.complete`` 的 ``role/content`` 结构。"""
out: list[dict] = []
for m in messages:
if isinstance(m, SystemMessage):
out.append({"role": "system", "content": str(m.content)})
elif isinstance(m, HumanMessage):
out.append({"role": "user", "content": str(m.content)})
elif isinstance(m, AIMessage):
out.append({"role": "assistant", "content": str(m.content)})
else:
c = getattr(m, "content", None)
out.append({"role": "user", "content": str(c) if c is not None else ""})
return out
def _message_contents_char_count(messages: List[Any]) -> int:
n = 0
for m in messages:
c = getattr(m, "content", None)
if isinstance(c, str):
n += len(c)
return n
class ProfileAgent:
"""用户资料收集 Specialist Agent"""
def __init__(
self,
llm_provider: LLMProvider | None = None,
llm_gateway: Any | None = None,
) -> None:
if llm_gateway is not None:
self._llm_gateway = llm_gateway
elif llm_provider is not None:
self._llm_gateway = _ProviderBackedProfileGateway(llm_provider)
else:
self._llm_gateway = LlmGateway()
async def _invoke_chat(
self,
messages: List[Any],
*,
max_tokens: int,
conversation_id: Optional[str],
agent_name: str,
) -> str:
port_messages = _langchain_messages_to_port(messages)
llm_t0 = time.perf_counter()
with agent_span(
logger, f"{agent_name}.llm", conversation_id=conversation_id or ""
):
response_text = await self._llm_gateway.chat_text(
port_messages,
use_case=LlmUseCase("chat.profile", max_tokens=max_tokens),
max_tokens=max_tokens,
)
logger.info(
"event=chat_llm_done agent={} response_latency_ms={:.2f}",
agent_name,
(time.perf_counter() - llm_t0) * 1000,
)
return response_text or ""
async def _segments_from_response(
self,
response_text: str,
*,
max_segments: int,
max_chars_per_segment: int,
fallback: str,
) -> List[str]:
log_agent_payload(
logger,
"ProfileAgent._segments_from_response.raw_response",
response_text,
)
raw_list = segments_from_llm_response(response_text, max_segments=max_segments)
if not raw_list:
raw_list = [response_text.strip()]
out = truncate_chat_segments(
raw_list,
max_segments=max_segments,
max_chars_per_segment=max_chars_per_segment,
)
segments = out if out else [response_text.strip()[:max_chars_per_segment]]
return nonempty_segments_or_fallback(segments, fallback=fallback)
async def extract_profile_from_message(
self,
user_message: str,
missing_fields: List[str],
conversation_id: Optional[str] = None,
) -> Dict[str, Any]:
"""从用户消息中提取资料字段,不持久化"""
if not missing_fields:
return {}
recent_dialogue = ""
if conversation_id:
hw = await get_history_with_window(
conversation_id,
max_pairs=settings.chat_history_max_pairs,
max_chars=settings.chat_history_max_chars,
)
recent = hw.window[-4:] if len(hw.window) > 4 else hw.window
parts = []
for msg in recent:
if isinstance(msg, HumanMessage):
parts.append(f"用户: {msg.content}")
elif isinstance(msg, AIMessage):
parts.append(f"助手: {msg.content}")
recent_dialogue = "\n".join(parts) if parts else ""
try:
prompt = get_profile_extraction_prompt(
user_message, missing_fields, recent_dialogue=recent_dialogue or None
)
parsed = await self._llm_gateway.json_object(
prompt,
ProfileExtractionOutput,
use_case=LlmUseCase(
"ProfileAgent.extract_profile_from_message",
max_tokens=settings.chat_profile_extract_max_tokens,
),
fallback_factory=lambda: ProfileExtractionOutput(),
)
result = {}
if parsed.birth_year is not None:
raw = parsed.birth_year
if isinstance(raw, int) and 1900 <= raw <= 2100:
result["birth_year"] = raw
elif isinstance(raw, str) and raw.isdigit():
y = int(raw)
if y < 100:
y = 1900 + y if y >= 50 else 2000 + y
if 1900 <= y <= 2100:
result["birth_year"] = y
if parsed.birth_place:
result["birth_place"] = str(parsed.birth_place)
if parsed.grew_up_place:
result["grew_up_place"] = str(parsed.grew_up_place)
if parsed.occupation:
result["occupation"] = str(parsed.occupation)
bp = result.get("birth_place")
gp = result.get("grew_up_place")
if bp and not gp:
result["grew_up_place"] = bp
elif gp and not bp:
result["birth_place"] = gp
return result
except Exception as e:
logger.error("提取资料信息失败: {}", e)
return {}
async def generate_profile_followup(
self,
conversation_id: str,
user_message: str,
missing_fields: List[str],
filled_fields: Dict[str, str],
nickname: str = "",
interview_stage_hint: str = "",
) -> List[str]:
"""生成资料追问回复,不持久化(由 Orchestrator 负责)"""
try:
prompt = get_profile_followup_prompt(
missing_fields,
filled_fields,
nickname,
interview_stage_hint=interview_stage_hint,
)
hw = await get_history_with_window(
conversation_id,
max_pairs=settings.chat_history_max_pairs,
max_chars=settings.chat_history_max_chars,
)
messages: List[Any] = [SystemMessage(content=prompt)]
messages.extend(hw.window)
messages.append(HumanMessage(content=user_message))
log_agent_payload(
logger,
"ProfileAgent.followup.prompt",
format_history_string(
messages,
omit_system_body=settings.agent_log_omit_system_message_body,
),
)
prompt_chars = _message_contents_char_count(messages)
logger.info(
"event=chat_prompt_built agent=ProfileAgent.generate_profile_followup "
"prompt_chars={} history_pairs_total={} history_pairs_windowed={}",
prompt_chars,
hw.turn_total,
len(hw.window) // 2,
)
response_text = await self._invoke_chat(
messages,
max_tokens=settings.chat_profile_followup_max_tokens,
conversation_id=conversation_id,
agent_name="ProfileAgent.generate_profile_followup",
)
segments = await self._segments_from_response(
response_text,
max_segments=3,
max_chars_per_segment=settings.chat_interview_max_chars_per_segment,
fallback="谢谢分享!能再告诉我一些吗?",
)
log_agent_summary(
logger,
"ProfileAgent.followup segments={} conversation_id={}",
len(segments),
conversation_id,
)
return segments
except Exception as e:
logger.error("生成资料跟进回复失败: {}", e)
return ["谢谢分享!能再告诉我一些吗?"]
async def generate_profile_greeting(
self,
conversation_id: str,
missing_fields: List[str],
nickname: str = "",
) -> List[str]:
"""生成资料收集开场白,不持久化(由 Orchestrator 负责)"""
try:
prompt = get_profile_greeting_prompt(missing_fields, nickname)
hw = await get_history_with_window(
conversation_id,
max_pairs=settings.chat_history_max_pairs,
max_chars=settings.chat_history_max_chars,
)
messages: List[Any] = [SystemMessage(content=prompt)]
messages.extend(hw.window)
if hw.window:
messages.append(
HumanMessage(content="(请根据上文自然接话,继续资料收集开场。)")
)
else:
messages.append(HumanMessage(content="(请说出资料收集开场白。)"))
log_agent_payload(
logger,
"ProfileAgent.greeting.prompt",
format_history_string(
messages,
omit_system_body=settings.agent_log_omit_system_message_body,
),
)
prompt_chars = _message_contents_char_count(messages)
logger.info(
"event=chat_prompt_built agent=ProfileAgent.generate_profile_greeting "
"prompt_chars={} history_pairs_total={} history_pairs_windowed={}",
prompt_chars,
hw.turn_total,
len(hw.window) // 2,
)
response_text = await self._invoke_chat(
messages,
max_tokens=settings.chat_profile_followup_max_tokens,
conversation_id=conversation_id,
agent_name="ProfileAgent.generate_profile_greeting",
)
segments = await self._segments_from_response(
response_text,
max_segments=2,
max_chars_per_segment=settings.chat_interview_max_chars_per_segment,
fallback="你好!在开始之前,能告诉我你是哪一年出生的吗?",
)
log_agent_summary(
logger,
"ProfileAgent.greeting segments={} conversation_id={}",
len(segments),
conversation_id,
)
return segments
except Exception as e:
logger.error("生成资料收集开场白失败: {}", e)
return [
"你好!在我们开始聊人生故事之前,能先简单介绍一下你自己吗?比如你是哪一年出生的?"
]