Merge commit e95582a: PR #20 proactive chat, topic chips, low-info turn plan

- Merge staging workflow parent and resolve conflicts with English/i18n and WS pool
- Re-greeting: language-aware fallbacks and prompts; router passes user_language
- RealtimeSession: topic suggestion callbacks + TTS sync path preserved

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Kevin
2026-05-12 11:02:58 +08:00
21 changed files with 1047 additions and 97 deletions

View File

@@ -11,8 +11,14 @@ from fastapi import WebSocket, WebSocketDisconnect, status
from starlette.websockets import WebSocketState
from app.agents.chat.background_voice import infer_background_voice
from app.agents.chat.prompts_conversation import build_topic_chips
from app.agents.chat.prompts_profile import format_user_profile_context
from app.agents.stage_constants import STAGE_TO_ORDER
from app.agents.state_schema import (
interview_control_state,
narrative_coverage_state,
)
from app.core.config import settings
from app.core.db import AsyncSessionLocal
from app.core.dependencies import get_asr_provider
from app.core.logging import get_logger
@@ -45,6 +51,18 @@ from app.features.user.models import User
logger = get_logger(__name__)
def _idle_hours_since(ts) -> float | None:
"""计算距 ts 的小时数ts 为 None 或非 datetime 时返回 None。"""
if ts is None:
return None
if not isinstance(ts, datetime):
return None
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
delta = datetime.now(timezone.utc) - ts
return max(0.0, delta.total_seconds() / 3600.0)
async def websocket_endpoint(
websocket: WebSocket,
conversation_id: str,
@@ -163,6 +181,87 @@ async def websocket_endpoint(
if str(getattr(user, "language_preference", "zh") or "zh").lower() == "en"
else "zh"
)
async def _stream_ai_only_messages(
texts: list[str], log_label: str
) -> None:
"""统一:把一组 AI 消息落库并按 [SPLIT] 分段下发。"""
if not texts:
return
ai_msg_id = await ConversationHistoryStore(db).record_ai_only_turn(
conversation_id, texts
)
if not ai_msg_id:
return
total_n = len(texts)
for i, text in enumerate(texts):
await manager.send_message(
conversation_id,
{
"type": MessageType.AGENT_RESPONSE,
"conversation_id": conversation_id,
"data": {
"text": text,
"index": i,
"total": total_n,
"assistant_message_id": ai_msg_id,
},
"timestamp": datetime.now(timezone.utc).isoformat(),
},
)
if i < total_n - 1:
await asyncio.sleep(0.5)
logger.info(
"event=ws_auto_ai_sent label={} conversation_id={} segments={}",
log_label,
conversation_id,
total_n,
)
async def _maybe_send_topic_chips(reason: str) -> None:
"""根据当前阶段空 slot 生成 quick-start 话题 chips失败静默。"""
if not settings.chat_topic_chips_enabled:
return
# 资料未齐时不送 chipsprofile 收集走另一条流程chips 反而噪音
if get_missing_profile_fields(user):
return
try:
narrative_state = narrative_coverage_state(memoir_state)
control_state = interview_control_state(memoir_state)
empty_slots = control_state.prompt_empty_slots_for_stage(
narrative_state, memoir_state.current_stage
)
chips = build_topic_chips(
memoir_state.current_stage,
empty_slots,
max_chips=settings.chat_topic_chips_max,
)
if not chips:
return
await manager.send_message(
conversation_id,
{
"type": MessageType.TOPIC_SUGGESTIONS,
"conversation_id": conversation_id,
"data": {
"reason": reason,
"stage": memoir_state.current_stage,
"suggestions": chips,
},
"timestamp": datetime.now(timezone.utc).isoformat(),
},
)
logger.info(
"event=ws_topic_chips_sent reason={} conversation_id={} "
"stage={} count={}",
reason,
conversation_id,
memoir_state.current_stage,
len(chips),
)
except Exception as e:
logger.warning("发送话题 chips 失败: {}", e)
if not history:
missing_profile = get_missing_profile_fields(user)
if missing_profile:
@@ -173,35 +272,13 @@ async def websocket_endpoint(
nickname=user.nickname or "",
language=user_language,
)
ai_msg_id = await ConversationHistoryStore(
db
).record_ai_only_turn(conversation_id, greetings)
if ai_msg_id:
ng = len(greetings)
for i, text in enumerate(greetings):
await manager.send_message(
conversation_id,
{
"type": MessageType.AGENT_RESPONSE,
"conversation_id": conversation_id,
"data": {
"text": text,
"index": i,
"total": ng,
"assistant_message_id": ai_msg_id,
},
"timestamp": datetime.now(
timezone.utc
).isoformat(),
},
)
if i < ng - 1:
await asyncio.sleep(0.5)
await _stream_ai_only_messages(
greetings, log_label="profile_greeting"
)
except Exception as e:
logger.exception("发送资料收集开场白失败: {}", e)
else:
try:
state = memoir_state
user_profile_context = format_user_profile_context(
birth_year=user.birth_year,
birth_place=user.birth_place,
@@ -213,7 +290,7 @@ async def websocket_endpoint(
opening_messages = (
await chat_orchestrator.generate_opening_message(
conversation_id=conversation_id,
memoir_state=state,
memoir_state=memoir_state,
user_profile_context=user_profile_context,
background_voice=infer_background_voice(
user.occupation
@@ -224,32 +301,62 @@ async def websocket_endpoint(
language=user_language,
)
)
ai_msg_id = await ConversationHistoryStore(
db
).record_ai_only_turn(conversation_id, opening_messages)
if ai_msg_id:
no = len(opening_messages)
for i, text in enumerate(opening_messages):
await manager.send_message(
conversation_id,
{
"type": MessageType.AGENT_RESPONSE,
"conversation_id": conversation_id,
"data": {
"text": text,
"index": i,
"total": no,
"assistant_message_id": ai_msg_id,
},
"timestamp": datetime.now(
timezone.utc
).isoformat(),
},
)
if i < no - 1:
await asyncio.sleep(0.5)
await _stream_ai_only_messages(
opening_messages, log_label="opening"
)
await _maybe_send_topic_chips(reason="opening")
except Exception as e:
logger.exception("发送空对话开场白失败: {}", e)
else:
# 历史非空:判断是否需要回访问候(距上次消息超过阈值)
idle_hours = _idle_hours_since(conversation.last_message_at)
threshold = float(settings.chat_re_greeting_idle_hours)
if (
settings.chat_re_greeting_enabled
and not get_missing_profile_fields(user)
and idle_hours is not None
and idle_hours >= threshold
):
try:
user_profile_context = format_user_profile_context(
birth_year=user.birth_year,
birth_place=user.birth_place,
grew_up_place=user.grew_up_place,
occupation=user.occupation,
language=user_language,
)
era_place = (user.grew_up_place or user.birth_place or "") or ""
re_greetings = (
await chat_orchestrator.generate_re_greeting_message(
conversation_id=conversation_id,
memoir_state=memoir_state,
idle_hours=idle_hours,
user_profile_context=user_profile_context,
background_voice=infer_background_voice(
user.occupation
),
occupation=user.occupation or "",
profile_birth_year=user.birth_year,
profile_era_place=era_place,
language=user_language,
)
)
await _stream_ai_only_messages(
re_greetings, log_label="re_greeting"
)
logger.info(
"event=ws_re_greeting_emitted conversation_id={} "
"idle_hours={:.2f} threshold={:.2f}",
conversation_id,
idle_hours,
threshold,
)
await _maybe_send_topic_chips(reason="re_greeting")
except Exception as e:
logger.exception("发送回访问候失败: {}", e)
else:
# 不触发回访问候时,仍可下发 chips 以减少冷启动门槛
await _maybe_send_topic_chips(reason="resume")
while True:
try: