Files
life-echo/api/app/agents/chat/profile_agent.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

406 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ProfileAgent用户资料收集 Specialist
负责提取资料、资料追问、资料收集开场白,不负责 Redis 持久化(由 Orchestrator 统一处理)
"""
import time
from typing import Any, Dict, List, Optional
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from app.agents.chat.helpers import format_history_string, get_history_with_window
from app.agents.chat.prompts_profile import (
get_profile_extraction_prompt,
get_profile_followup_prompt,
get_profile_greeting_prompt,
)
from app.agents.chat.reply_limits import (
nonempty_segments_or_fallback,
segments_from_llm_response,
truncate_chat_segments,
)
from app.agents.chat.schemas import ProfileExtractionOutput
from app.core.agent_logging import agent_span, log_agent_payload, log_agent_summary
from app.core.config import settings
from app.core.llm_call import allm_json_call
from app.core.llm_gateway import LlmGateway, LlmUseCase
from app.core.logging import get_logger
from app.ports.llm import LLMProvider
from app.core.runtime_constants import agent_log_defaults
from app.features.conversation.constants import chat
from app.features.story.constants import story
logger = get_logger(__name__)
_FOLLOWUP_FALLBACK_ZH = "谢谢分享!能再告诉我一些吗?"
_FOLLOWUP_FALLBACK_EN = "Thanks for sharing — could you tell me a bit more?"
_GREETING_FALLBACK_ZH = "你好!在开始之前,能告诉我你是哪一年出生的吗?"
_GREETING_FALLBACK_EN = (
"Hi! Before we get started, could you tell me what year you were born?"
)
_GREETING_FALLBACK_FULL_ZH = (
"你好!在我们开始聊人生故事之前,能先简单介绍一下你自己吗?比如你是哪一年出生的?"
)
_GREETING_FALLBACK_FULL_EN = (
"Hi! Before we dive into life stories, could you introduce yourself a little — for example, what year were you born?"
)
def _profile_followup_fallback(language: str) -> str:
return _FOLLOWUP_FALLBACK_EN if language == "en" else _FOLLOWUP_FALLBACK_ZH
def _profile_greeting_fallback(language: str) -> str:
return _GREETING_FALLBACK_EN if language == "en" else _GREETING_FALLBACK_ZH
def _profile_greeting_fallback_full(language: str) -> str:
return _GREETING_FALLBACK_FULL_EN if language == "en" else _GREETING_FALLBACK_FULL_ZH
class _ProviderBackedProfileGateway:
def __init__(self, provider: LLMProvider) -> None:
self._provider = provider
async def chat_text(
self,
messages: list[dict],
*,
use_case: LlmUseCase | None = None,
temperature: float | None = None,
model: str | None = None,
max_tokens: int | None = None,
) -> str:
resolved_temperature = temperature
if resolved_temperature is None:
resolved_temperature = (
use_case.temperature
if use_case and use_case.temperature is not None
else 0.7
)
return await self._provider.complete(
messages,
temperature=resolved_temperature,
model=model if model is not None else (use_case.model if use_case else None),
max_tokens=(
max_tokens
if max_tokens is not None
else (use_case.max_tokens if use_case else None)
),
)
async def json_object(
self,
prompt: str,
schema: type[ProfileExtractionOutput],
*,
use_case: LlmUseCase,
fallback_factory: Any = None,
) -> ProfileExtractionOutput:
return await allm_json_call(
getattr(self._provider, "langchain_llm", None),
prompt,
schema,
max_tokens=use_case.max_tokens or 1024,
agent=use_case.name,
fallback_factory=fallback_factory,
)
def _langchain_messages_to_port(messages: List[Any]) -> list[dict]:
"""LangChain message 列表 → ``LLMProvider.complete`` 的 ``role/content`` 结构。"""
out: list[dict] = []
for m in messages:
if isinstance(m, SystemMessage):
out.append({"role": "system", "content": str(m.content)})
elif isinstance(m, HumanMessage):
out.append({"role": "user", "content": str(m.content)})
elif isinstance(m, AIMessage):
out.append({"role": "assistant", "content": str(m.content)})
else:
c = getattr(m, "content", None)
out.append({"role": "user", "content": str(c) if c is not None else ""})
return out
def _message_contents_char_count(messages: List[Any]) -> int:
n = 0
for m in messages:
c = getattr(m, "content", None)
if isinstance(c, str):
n += len(c)
return n
class ProfileAgent:
"""用户资料收集 Specialist Agent"""
def __init__(
self,
llm_provider: LLMProvider | None = None,
llm_gateway: Any | None = None,
) -> None:
if llm_gateway is not None:
self._llm_gateway = llm_gateway
elif llm_provider is not None:
self._llm_gateway = _ProviderBackedProfileGateway(llm_provider)
else:
self._llm_gateway = LlmGateway()
async def _invoke_chat(
self,
messages: List[Any],
*,
max_tokens: int,
conversation_id: Optional[str],
agent_name: str,
) -> str:
port_messages = _langchain_messages_to_port(messages)
llm_t0 = time.perf_counter()
with agent_span(
logger, f"{agent_name}.llm", conversation_id=conversation_id or ""
):
response_text = await self._llm_gateway.chat_text(
port_messages,
use_case=LlmUseCase("chat.profile", max_tokens=max_tokens),
max_tokens=max_tokens,
)
logger.info(
"event=chat_llm_done agent={} response_latency_ms={:.2f}",
agent_name,
(time.perf_counter() - llm_t0) * 1000,
)
return response_text or ""
async def _segments_from_response(
self,
response_text: str,
*,
max_segments: int,
max_chars_per_segment: int,
fallback: str,
) -> List[str]:
log_agent_payload(
logger,
"ProfileAgent._segments_from_response.raw_response",
response_text,
)
raw_list = segments_from_llm_response(response_text, max_segments=max_segments)
if not raw_list:
raw_list = [response_text.strip()]
out = truncate_chat_segments(
raw_list,
max_segments=max_segments,
max_chars_per_segment=max_chars_per_segment,
)
segments = out if out else [response_text.strip()[:max_chars_per_segment]]
return nonempty_segments_or_fallback(segments, fallback=fallback)
async def extract_profile_from_message(
self,
user_message: str,
missing_fields: List[str],
conversation_id: Optional[str] = None,
language: str = "zh",
) -> Dict[str, Any]:
"""从用户消息中提取资料字段,不持久化"""
if not missing_fields:
return {}
recent_dialogue = ""
if conversation_id:
hw = await get_history_with_window(
conversation_id,
max_pairs=chat.history_max_pairs,
max_chars=chat.history_max_chars,
)
recent = hw.window[-4:] if len(hw.window) > 4 else hw.window
parts = []
user_label = "User" if language == "en" else "用户"
asst_label = "Assistant" if language == "en" else "助手"
for msg in recent:
if isinstance(msg, HumanMessage):
parts.append(f"{user_label}: {msg.content}")
elif isinstance(msg, AIMessage):
parts.append(f"{asst_label}: {msg.content}")
recent_dialogue = "\n".join(parts) if parts else ""
try:
prompt = get_profile_extraction_prompt(
user_message,
missing_fields,
recent_dialogue=recent_dialogue or None,
language=language,
)
parsed = await self._llm_gateway.json_object(
prompt,
ProfileExtractionOutput,
use_case=LlmUseCase(
"ProfileAgent.extract_profile_from_message",
max_tokens=chat.profile_extract_max_tokens,
),
fallback_factory=lambda: ProfileExtractionOutput(),
)
result = {}
if parsed.birth_year is not None:
raw = parsed.birth_year
if isinstance(raw, int) and 1900 <= raw <= 2100:
result["birth_year"] = raw
elif isinstance(raw, str) and raw.isdigit():
y = int(raw)
if y < 100:
y = 1900 + y if y >= 50 else 2000 + y
if 1900 <= y <= 2100:
result["birth_year"] = y
if parsed.birth_place:
result["birth_place"] = str(parsed.birth_place)
if parsed.grew_up_place:
result["grew_up_place"] = str(parsed.grew_up_place)
if parsed.occupation:
result["occupation"] = str(parsed.occupation)
bp = result.get("birth_place")
gp = result.get("grew_up_place")
if bp and not gp:
result["grew_up_place"] = bp
elif gp and not bp:
result["birth_place"] = gp
return result
except Exception as e:
logger.error("提取资料信息失败: {}", e)
return {}
async def generate_profile_followup(
self,
conversation_id: str,
user_message: str,
missing_fields: List[str],
filled_fields: Dict[str, str],
nickname: str = "",
interview_stage_hint: str = "",
language: str = "zh",
) -> List[str]:
"""生成资料追问回复,不持久化(由 Orchestrator 负责)"""
try:
prompt = get_profile_followup_prompt(
missing_fields,
filled_fields,
nickname,
interview_stage_hint=interview_stage_hint,
language=language,
)
hw = await get_history_with_window(
conversation_id,
max_pairs=chat.history_max_pairs,
max_chars=chat.history_max_chars,
)
messages: List[Any] = [SystemMessage(content=prompt)]
messages.extend(hw.window)
messages.append(HumanMessage(content=user_message))
log_agent_payload(
logger,
"ProfileAgent.followup.prompt",
format_history_string(
messages,
omit_system_body=agent_log_defaults.omit_system_message_body,
),
)
prompt_chars = _message_contents_char_count(messages)
logger.info(
"event=chat_prompt_built agent=ProfileAgent.generate_profile_followup "
"prompt_chars={} history_pairs_total={} history_pairs_windowed={}",
prompt_chars,
hw.turn_total,
len(hw.window) // 2,
)
response_text = await self._invoke_chat(
messages,
max_tokens=chat.profile_followup_max_tokens,
conversation_id=conversation_id,
agent_name="ProfileAgent.generate_profile_followup",
)
segments = await self._segments_from_response(
response_text,
max_segments=3,
max_chars_per_segment=chat.interview_max_chars_per_segment,
fallback=_profile_followup_fallback(language),
)
log_agent_summary(
logger,
"ProfileAgent.followup segments={} conversation_id={}",
len(segments),
conversation_id,
)
return segments
except Exception as e:
logger.error("生成资料跟进回复失败: {}", e)
return [_profile_followup_fallback(language)]
async def generate_profile_greeting(
self,
conversation_id: str,
missing_fields: List[str],
nickname: str = "",
language: str = "zh",
) -> List[str]:
"""生成资料收集开场白,不持久化(由 Orchestrator 负责)"""
try:
prompt = get_profile_greeting_prompt(
missing_fields, nickname, language=language
)
hw = await get_history_with_window(
conversation_id,
max_pairs=chat.history_max_pairs,
max_chars=chat.history_max_chars,
)
messages: List[Any] = [SystemMessage(content=prompt)]
messages.extend(hw.window)
if language == "en":
kickoff = (
"(Continue from the context above and warmly carry on the profile-gathering opener.)"
if hw.window
else "(Please deliver your profile-gathering opener.)"
)
else:
kickoff = (
"(请根据上文自然接话,继续资料收集开场。)"
if hw.window
else "(请说出资料收集开场白。)"
)
messages.append(HumanMessage(content=kickoff))
log_agent_payload(
logger,
"ProfileAgent.greeting.prompt",
format_history_string(
messages,
omit_system_body=agent_log_defaults.omit_system_message_body,
),
)
prompt_chars = _message_contents_char_count(messages)
logger.info(
"event=chat_prompt_built agent=ProfileAgent.generate_profile_greeting "
"prompt_chars={} history_pairs_total={} history_pairs_windowed={}",
prompt_chars,
hw.turn_total,
len(hw.window) // 2,
)
response_text = await self._invoke_chat(
messages,
max_tokens=chat.profile_followup_max_tokens,
conversation_id=conversation_id,
agent_name="ProfileAgent.generate_profile_greeting",
)
segments = await self._segments_from_response(
response_text,
max_segments=2,
max_chars_per_segment=chat.interview_max_chars_per_segment,
fallback=_profile_greeting_fallback(language),
)
log_agent_summary(
logger,
"ProfileAgent.greeting segments={} conversation_id={}",
len(segments),
conversation_id,
)
return segments
except Exception as e:
logger.error("生成资料收集开场白失败: {}", e)
return [_profile_greeting_fallback_full(language)]