配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
406 lines
15 KiB
Python
406 lines
15 KiB
Python
"""
|
||
ProfileAgent:用户资料收集 Specialist
|
||
负责提取资料、资料追问、资料收集开场白,不负责 Redis 持久化(由 Orchestrator 统一处理)
|
||
"""
|
||
|
||
import time
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
|
||
|
||
from app.agents.chat.helpers import format_history_string, get_history_with_window
|
||
from app.agents.chat.prompts_profile import (
|
||
get_profile_extraction_prompt,
|
||
get_profile_followup_prompt,
|
||
get_profile_greeting_prompt,
|
||
)
|
||
from app.agents.chat.reply_limits import (
|
||
nonempty_segments_or_fallback,
|
||
segments_from_llm_response,
|
||
truncate_chat_segments,
|
||
)
|
||
from app.agents.chat.schemas import ProfileExtractionOutput
|
||
from app.core.agent_logging import agent_span, log_agent_payload, log_agent_summary
|
||
from app.core.config import settings
|
||
from app.core.llm_call import allm_json_call
|
||
from app.core.llm_gateway import LlmGateway, LlmUseCase
|
||
from app.core.logging import get_logger
|
||
from app.ports.llm import LLMProvider
|
||
from app.core.runtime_constants import agent_log_defaults
|
||
from app.features.conversation.constants import chat
|
||
from app.features.story.constants import story
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
_FOLLOWUP_FALLBACK_ZH = "谢谢分享!能再告诉我一些吗?"
|
||
_FOLLOWUP_FALLBACK_EN = "Thanks for sharing — could you tell me a bit more?"
|
||
_GREETING_FALLBACK_ZH = "你好!在开始之前,能告诉我你是哪一年出生的吗?"
|
||
_GREETING_FALLBACK_EN = (
|
||
"Hi! Before we get started, could you tell me what year you were born?"
|
||
)
|
||
_GREETING_FALLBACK_FULL_ZH = (
|
||
"你好!在我们开始聊人生故事之前,能先简单介绍一下你自己吗?比如你是哪一年出生的?"
|
||
)
|
||
_GREETING_FALLBACK_FULL_EN = (
|
||
"Hi! Before we dive into life stories, could you introduce yourself a little — for example, what year were you born?"
|
||
)
|
||
|
||
|
||
def _profile_followup_fallback(language: str) -> str:
|
||
return _FOLLOWUP_FALLBACK_EN if language == "en" else _FOLLOWUP_FALLBACK_ZH
|
||
|
||
|
||
def _profile_greeting_fallback(language: str) -> str:
|
||
return _GREETING_FALLBACK_EN if language == "en" else _GREETING_FALLBACK_ZH
|
||
|
||
|
||
def _profile_greeting_fallback_full(language: str) -> str:
|
||
return _GREETING_FALLBACK_FULL_EN if language == "en" else _GREETING_FALLBACK_FULL_ZH
|
||
|
||
|
||
class _ProviderBackedProfileGateway:
|
||
def __init__(self, provider: LLMProvider) -> None:
|
||
self._provider = provider
|
||
|
||
async def chat_text(
|
||
self,
|
||
messages: list[dict],
|
||
*,
|
||
use_case: LlmUseCase | None = None,
|
||
temperature: float | None = None,
|
||
model: str | None = None,
|
||
max_tokens: int | None = None,
|
||
) -> str:
|
||
resolved_temperature = temperature
|
||
if resolved_temperature is None:
|
||
resolved_temperature = (
|
||
use_case.temperature
|
||
if use_case and use_case.temperature is not None
|
||
else 0.7
|
||
)
|
||
return await self._provider.complete(
|
||
messages,
|
||
temperature=resolved_temperature,
|
||
model=model if model is not None else (use_case.model if use_case else None),
|
||
max_tokens=(
|
||
max_tokens
|
||
if max_tokens is not None
|
||
else (use_case.max_tokens if use_case else None)
|
||
),
|
||
)
|
||
|
||
async def json_object(
|
||
self,
|
||
prompt: str,
|
||
schema: type[ProfileExtractionOutput],
|
||
*,
|
||
use_case: LlmUseCase,
|
||
fallback_factory: Any = None,
|
||
) -> ProfileExtractionOutput:
|
||
return await allm_json_call(
|
||
getattr(self._provider, "langchain_llm", None),
|
||
prompt,
|
||
schema,
|
||
max_tokens=use_case.max_tokens or 1024,
|
||
agent=use_case.name,
|
||
fallback_factory=fallback_factory,
|
||
)
|
||
|
||
|
||
def _langchain_messages_to_port(messages: List[Any]) -> list[dict]:
|
||
"""LangChain message 列表 → ``LLMProvider.complete`` 的 ``role/content`` 结构。"""
|
||
out: list[dict] = []
|
||
for m in messages:
|
||
if isinstance(m, SystemMessage):
|
||
out.append({"role": "system", "content": str(m.content)})
|
||
elif isinstance(m, HumanMessage):
|
||
out.append({"role": "user", "content": str(m.content)})
|
||
elif isinstance(m, AIMessage):
|
||
out.append({"role": "assistant", "content": str(m.content)})
|
||
else:
|
||
c = getattr(m, "content", None)
|
||
out.append({"role": "user", "content": str(c) if c is not None else ""})
|
||
return out
|
||
|
||
|
||
def _message_contents_char_count(messages: List[Any]) -> int:
|
||
n = 0
|
||
for m in messages:
|
||
c = getattr(m, "content", None)
|
||
if isinstance(c, str):
|
||
n += len(c)
|
||
return n
|
||
|
||
|
||
class ProfileAgent:
|
||
"""用户资料收集 Specialist Agent"""
|
||
|
||
def __init__(
|
||
self,
|
||
llm_provider: LLMProvider | None = None,
|
||
llm_gateway: Any | None = None,
|
||
) -> None:
|
||
if llm_gateway is not None:
|
||
self._llm_gateway = llm_gateway
|
||
elif llm_provider is not None:
|
||
self._llm_gateway = _ProviderBackedProfileGateway(llm_provider)
|
||
else:
|
||
self._llm_gateway = LlmGateway()
|
||
|
||
async def _invoke_chat(
|
||
self,
|
||
messages: List[Any],
|
||
*,
|
||
max_tokens: int,
|
||
conversation_id: Optional[str],
|
||
agent_name: str,
|
||
) -> str:
|
||
port_messages = _langchain_messages_to_port(messages)
|
||
llm_t0 = time.perf_counter()
|
||
with agent_span(
|
||
logger, f"{agent_name}.llm", conversation_id=conversation_id or ""
|
||
):
|
||
response_text = await self._llm_gateway.chat_text(
|
||
port_messages,
|
||
use_case=LlmUseCase("chat.profile", max_tokens=max_tokens),
|
||
max_tokens=max_tokens,
|
||
)
|
||
logger.info(
|
||
"event=chat_llm_done agent={} response_latency_ms={:.2f}",
|
||
agent_name,
|
||
(time.perf_counter() - llm_t0) * 1000,
|
||
)
|
||
return response_text or ""
|
||
|
||
async def _segments_from_response(
|
||
self,
|
||
response_text: str,
|
||
*,
|
||
max_segments: int,
|
||
max_chars_per_segment: int,
|
||
fallback: str,
|
||
) -> List[str]:
|
||
log_agent_payload(
|
||
logger,
|
||
"ProfileAgent._segments_from_response.raw_response",
|
||
response_text,
|
||
)
|
||
raw_list = segments_from_llm_response(response_text, max_segments=max_segments)
|
||
if not raw_list:
|
||
raw_list = [response_text.strip()]
|
||
out = truncate_chat_segments(
|
||
raw_list,
|
||
max_segments=max_segments,
|
||
max_chars_per_segment=max_chars_per_segment,
|
||
)
|
||
segments = out if out else [response_text.strip()[:max_chars_per_segment]]
|
||
return nonempty_segments_or_fallback(segments, fallback=fallback)
|
||
|
||
async def extract_profile_from_message(
|
||
self,
|
||
user_message: str,
|
||
missing_fields: List[str],
|
||
conversation_id: Optional[str] = None,
|
||
language: str = "zh",
|
||
) -> Dict[str, Any]:
|
||
"""从用户消息中提取资料字段,不持久化"""
|
||
if not missing_fields:
|
||
return {}
|
||
recent_dialogue = ""
|
||
if conversation_id:
|
||
hw = await get_history_with_window(
|
||
conversation_id,
|
||
max_pairs=chat.history_max_pairs,
|
||
max_chars=chat.history_max_chars,
|
||
)
|
||
recent = hw.window[-4:] if len(hw.window) > 4 else hw.window
|
||
parts = []
|
||
user_label = "User" if language == "en" else "用户"
|
||
asst_label = "Assistant" if language == "en" else "助手"
|
||
for msg in recent:
|
||
if isinstance(msg, HumanMessage):
|
||
parts.append(f"{user_label}: {msg.content}")
|
||
elif isinstance(msg, AIMessage):
|
||
parts.append(f"{asst_label}: {msg.content}")
|
||
recent_dialogue = "\n".join(parts) if parts else ""
|
||
try:
|
||
prompt = get_profile_extraction_prompt(
|
||
user_message,
|
||
missing_fields,
|
||
recent_dialogue=recent_dialogue or None,
|
||
language=language,
|
||
)
|
||
parsed = await self._llm_gateway.json_object(
|
||
prompt,
|
||
ProfileExtractionOutput,
|
||
use_case=LlmUseCase(
|
||
"ProfileAgent.extract_profile_from_message",
|
||
max_tokens=chat.profile_extract_max_tokens,
|
||
),
|
||
fallback_factory=lambda: ProfileExtractionOutput(),
|
||
)
|
||
result = {}
|
||
if parsed.birth_year is not None:
|
||
raw = parsed.birth_year
|
||
if isinstance(raw, int) and 1900 <= raw <= 2100:
|
||
result["birth_year"] = raw
|
||
elif isinstance(raw, str) and raw.isdigit():
|
||
y = int(raw)
|
||
if y < 100:
|
||
y = 1900 + y if y >= 50 else 2000 + y
|
||
if 1900 <= y <= 2100:
|
||
result["birth_year"] = y
|
||
if parsed.birth_place:
|
||
result["birth_place"] = str(parsed.birth_place)
|
||
if parsed.grew_up_place:
|
||
result["grew_up_place"] = str(parsed.grew_up_place)
|
||
if parsed.occupation:
|
||
result["occupation"] = str(parsed.occupation)
|
||
bp = result.get("birth_place")
|
||
gp = result.get("grew_up_place")
|
||
if bp and not gp:
|
||
result["grew_up_place"] = bp
|
||
elif gp and not bp:
|
||
result["birth_place"] = gp
|
||
return result
|
||
except Exception as e:
|
||
logger.error("提取资料信息失败: {}", e)
|
||
return {}
|
||
|
||
async def generate_profile_followup(
|
||
self,
|
||
conversation_id: str,
|
||
user_message: str,
|
||
missing_fields: List[str],
|
||
filled_fields: Dict[str, str],
|
||
nickname: str = "",
|
||
interview_stage_hint: str = "",
|
||
language: str = "zh",
|
||
) -> List[str]:
|
||
"""生成资料追问回复,不持久化(由 Orchestrator 负责)"""
|
||
try:
|
||
prompt = get_profile_followup_prompt(
|
||
missing_fields,
|
||
filled_fields,
|
||
nickname,
|
||
interview_stage_hint=interview_stage_hint,
|
||
language=language,
|
||
)
|
||
hw = await get_history_with_window(
|
||
conversation_id,
|
||
max_pairs=chat.history_max_pairs,
|
||
max_chars=chat.history_max_chars,
|
||
)
|
||
messages: List[Any] = [SystemMessage(content=prompt)]
|
||
messages.extend(hw.window)
|
||
messages.append(HumanMessage(content=user_message))
|
||
log_agent_payload(
|
||
logger,
|
||
"ProfileAgent.followup.prompt",
|
||
format_history_string(
|
||
messages,
|
||
omit_system_body=agent_log_defaults.omit_system_message_body,
|
||
),
|
||
)
|
||
prompt_chars = _message_contents_char_count(messages)
|
||
logger.info(
|
||
"event=chat_prompt_built agent=ProfileAgent.generate_profile_followup "
|
||
"prompt_chars={} history_pairs_total={} history_pairs_windowed={}",
|
||
prompt_chars,
|
||
hw.turn_total,
|
||
len(hw.window) // 2,
|
||
)
|
||
response_text = await self._invoke_chat(
|
||
messages,
|
||
max_tokens=chat.profile_followup_max_tokens,
|
||
conversation_id=conversation_id,
|
||
agent_name="ProfileAgent.generate_profile_followup",
|
||
)
|
||
segments = await self._segments_from_response(
|
||
response_text,
|
||
max_segments=3,
|
||
max_chars_per_segment=chat.interview_max_chars_per_segment,
|
||
fallback=_profile_followup_fallback(language),
|
||
)
|
||
log_agent_summary(
|
||
logger,
|
||
"ProfileAgent.followup segments={} conversation_id={}",
|
||
len(segments),
|
||
conversation_id,
|
||
)
|
||
return segments
|
||
except Exception as e:
|
||
logger.error("生成资料跟进回复失败: {}", e)
|
||
return [_profile_followup_fallback(language)]
|
||
|
||
async def generate_profile_greeting(
|
||
self,
|
||
conversation_id: str,
|
||
missing_fields: List[str],
|
||
nickname: str = "",
|
||
language: str = "zh",
|
||
) -> List[str]:
|
||
"""生成资料收集开场白,不持久化(由 Orchestrator 负责)"""
|
||
try:
|
||
prompt = get_profile_greeting_prompt(
|
||
missing_fields, nickname, language=language
|
||
)
|
||
hw = await get_history_with_window(
|
||
conversation_id,
|
||
max_pairs=chat.history_max_pairs,
|
||
max_chars=chat.history_max_chars,
|
||
)
|
||
messages: List[Any] = [SystemMessage(content=prompt)]
|
||
messages.extend(hw.window)
|
||
if language == "en":
|
||
kickoff = (
|
||
"(Continue from the context above and warmly carry on the profile-gathering opener.)"
|
||
if hw.window
|
||
else "(Please deliver your profile-gathering opener.)"
|
||
)
|
||
else:
|
||
kickoff = (
|
||
"(请根据上文自然接话,继续资料收集开场。)"
|
||
if hw.window
|
||
else "(请说出资料收集开场白。)"
|
||
)
|
||
messages.append(HumanMessage(content=kickoff))
|
||
log_agent_payload(
|
||
logger,
|
||
"ProfileAgent.greeting.prompt",
|
||
format_history_string(
|
||
messages,
|
||
omit_system_body=agent_log_defaults.omit_system_message_body,
|
||
),
|
||
)
|
||
prompt_chars = _message_contents_char_count(messages)
|
||
logger.info(
|
||
"event=chat_prompt_built agent=ProfileAgent.generate_profile_greeting "
|
||
"prompt_chars={} history_pairs_total={} history_pairs_windowed={}",
|
||
prompt_chars,
|
||
hw.turn_total,
|
||
len(hw.window) // 2,
|
||
)
|
||
response_text = await self._invoke_chat(
|
||
messages,
|
||
max_tokens=chat.profile_followup_max_tokens,
|
||
conversation_id=conversation_id,
|
||
agent_name="ProfileAgent.generate_profile_greeting",
|
||
)
|
||
segments = await self._segments_from_response(
|
||
response_text,
|
||
max_segments=2,
|
||
max_chars_per_segment=chat.interview_max_chars_per_segment,
|
||
fallback=_profile_greeting_fallback(language),
|
||
)
|
||
log_agent_summary(
|
||
logger,
|
||
"ProfileAgent.greeting segments={} conversation_id={}",
|
||
len(segments),
|
||
conversation_id,
|
||
)
|
||
return segments
|
||
except Exception as e:
|
||
logger.error("生成资料收集开场白失败: {}", e)
|
||
return [_profile_greeting_fallback_full(language)]
|